123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507 |
- #!@PYSHEBANG@
- # @GENERATED@
- """gpsfake -- test harness for gpsd.
- Simulates one or more GPSes, playing back logfiles.
- Most of the logic for this now lives in gps.fake,
- factored out so we can write other test programs with it.
- """
- #
- #
- # This file is Copyright 2010 by the GPSD project
- # SPDX-License-Identifier: BSD-2-clause
- # This code runs compatibly under Python 2 and 3.x for x >= 2.
- # Preserve this property!
- # Codacy D203 and D211 conflict, I choose D203
- # Codacy D212 and D213 conflict, I choose D212
- from __future__ import absolute_import, print_function, division
- from distutils import spawn
- import argparse
- import os
- import platform
- import pty
- import socket
- import sys
- import time
- # pylint wants local modules last
- try:
- import gps
- import gps.fake as gpsfake # The "as" pacifies pychecker
- import gps.misc # for polybyte() polystr()
- except ImportError as e:
- sys.stderr.write(
- "gpsfake: can't load Python gps libraries -- check PYTHONPATH.\n")
- sys.stderr.write("%s\n" % e)
- sys.exit(1)
- gps_version = '@VERSION@'
- if gps.__version__ != gps_version:
- sys.stderr.write("gpsfake: ERROR: need gps module version %s, got %s\n" %
- (gps_version, gps.__version__))
- sys.exit(1)
- try:
- my_input = raw_input
- except NameError:
- my_input = input
- # Get version of stdout for bytes data (NOP in Python 2)
- bytesout = gps.get_bytes_stream(sys.stdout)
- class Baton(object):
- """Ship progress indications to stderr."""
- # By setting this > 1 we reduce the frequency of the twirl
- # and speed up test runs. Should be relatively prime to the
- # number of baton states, otherwise it will cause beat artifacts
- # in the twirling.
- SPINNER_INTERVAL = 11
- def __init__(self, prompt, endmsg=None):
- """Init class Baton."""
- self.stream = sys.stderr
- self.stream.write(prompt + "...")
- if os.isatty(self.stream.fileno()):
- self.stream.write(" \b")
- self.stream.flush()
- self.count = 0
- self.endmsg = endmsg
- self.time = time.time()
- def twirl(self, ch=None):
- """Twirl the baton."""
- if self.stream is None:
- return
- if os.isatty(self.stream.fileno()):
- if ch:
- self.stream.write(ch)
- self.stream.flush()
- elif self.count % Baton.SPINNER_INTERVAL == 0:
- self.stream.write("-/|\\"[self.count % 4])
- self.stream.write("\b")
- self.stream.flush()
- self.count = self.count + 1
- def end(self, mesg=None):
- """Write end message."""
- if mesg is None:
- mesg = self.endmsg
- if self.stream:
- self.stream.write("...(%2.2f sec) %s.\n"
- % (time.time() - self.time, mesg))
- def hexdump(s):
- """Convert string to hex"""
- rep = ""
- for c in s:
- rep += "%02x" % ord(c)
- return rep
- def check_xterm():
- """Check whether xterm and DISPLAY are available."""
- xterm = spawn.find_executable('xterm')
- return xterm and os.access(xterm, os.X_OK) and os.environ.get('DISPLAY')
- def fakehook(linenumber, fakegps):
- """Do the real work."""
- if not fakegps.testload.sentences:
- sys.stderr.write("fakegps: no sentences in test load.\n")
- raise SystemExit(1)
- if linenumber % len(fakegps.testload.sentences) == 0:
- if options.singleshot and linenumber > 0:
- return False
- if options.progress:
- baton.twirl('*\b')
- elif not options.singleshot:
- if not options.quiet:
- sys.stderr.write("gpsfake: log cycle of %s begins.\n"
- % fakegps.testload.name)
- time.sleep(options.cycle)
- if options.linedump and fakegps.testload.legend:
- ml = fakegps.testload.sentences[
- linenumber % len(fakegps.testload.sentences)].strip()
- if not fakegps.testload.textual:
- ml = hexdump(ml)
- announce = ((fakegps.testload.legend %
- (linenumber % len(fakegps.testload.sentences) + 1)) +
- gps.polystr(ml))
- if options.promptme:
- my_input(announce + "? ")
- else:
- print(announce)
- if options.progress:
- baton.twirl()
- return True
- if __name__ == '__main__':
- description = 'Fake gpsd from a log file.'
- usage = '%(prog)s [OPTIONS] logfile...'
- epilog = ('BSD terms apply: see the file COPYING in the distribution root'
- ' for details.')
- parser = argparse.ArgumentParser(
- description=description,
- epilog=epilog,
- formatter_class=argparse.RawDescriptionHelpFormatter,
- usage=usage)
- parser.add_argument(
- '-?',
- action="help",
- help='show this help message and exit'
- )
- parser.add_argument(
- '-1',
- '--singleshot',
- dest='singleshot',
- default=False,
- action="store_true",
- help=('Logfile is interpreted once only rather than repeatedly. '
- '[Default %(default)s]')
- )
- parser.add_argument(
- '-b',
- '--baton',
- dest='progress',
- default=False,
- action="store_true",
- help=('Enable a twirling-baton progress indicator. '
- '[Default %(default)s]')
- )
- parser.add_argument(
- '-c',
- '--cycle',
- default=0.0,
- dest='cycle',
- metavar='CYCLE',
- type=float,
- help=('Sets the delay between sentences in seconds. '
- '[Default %(default)s]'),
- )
- parser.add_argument(
- '-D',
- '--debug',
- action='append',
- dest='debug',
- metavar='OPT',
- help='Pass a -D option OPT to the daemon.',
- )
- parser.add_argument(
- '-g',
- '--gdb',
- dest='gdb',
- default=False,
- action="store_true",
- help='Run the gpsd instance within gpsfake under control of gdb.',
- )
- parser.add_argument(
- '-G',
- '--lldb',
- dest='lldb',
- default=False,
- action="store_true",
- help='Run the gpsd instance within gpsfake under control of lldb.',
- )
- parser.add_argument(
- '-i',
- '--promptme',
- dest='promptme',
- default=False,
- action="store_true",
- help=('Single-stepping through logfile. [Default %(default)s]'),
- )
- parser.add_argument(
- '-l',
- '--linedump',
- dest='linedump',
- default=False,
- action="store_true",
- help=('Dump a line or packet number just before each sentence. '
- '[Default %(default)s]'),
- )
- parser.add_argument(
- '-m',
- '--monitor',
- default=None,
- dest='mon',
- metavar='PROG',
- help='Specifies a monitor program under which the daemon is run.',
- )
- parser.add_argument(
- '-n',
- '--nowait',
- dest='nowait',
- default=False,
- action="store_true",
- help=('Start the daemon reading from gpsfake without '
- 'waiting for a client.'),
- )
- parser.add_argument(
- '-o',
- '--options',
- dest='options',
- metavar='="OPT"',
- help=('Pass options ="OPT" to the daemon.\n'
- 'The equal sign and Quotes required.'),
- )
- parser.add_argument(
- '-p',
- '--pipe',
- dest='pipe',
- default=False,
- action="store_true",
- help='Sets watcher mode and dump to stdout. [Default %(default)s]',
- )
- parser.add_argument(
- '-P',
- '--port',
- default=None,
- dest='port',
- metavar='PORT',
- type=int,
- help="Sets the daemon's listening port to PORT [Default %(default)s]",
- )
- parser.add_argument(
- '-q',
- '--quiet',
- dest='quiet',
- default=False,
- action="store_true",
- help='Act in a quiet manner. [Default %(default)s]',
- )
- parser.add_argument(
- '-r',
- '--clientinit',
- default='?WATCH={"json":true,"nmea":true}',
- dest='client_init',
- metavar='STR',
- help=('Specifies an initialization command to use in pipe mode. '
- '[Default %(default)s]'),
- )
- parser.add_argument(
- '-s',
- '--speed',
- default=4800,
- dest='speed',
- metavar='SPEED',
- type=int,
- help='Sets the baud rate for the slave tty. [Default %(default)s]',
- )
- parser.add_argument(
- '-S',
- '--slow',
- dest='slow',
- default=False,
- action="store_true",
- help=('Insert realistic delays in the test input. '
- '[Default %(default)s]'),
- )
- parser.add_argument(
- '-t',
- '--tcp',
- dest='tcp',
- default=False,
- action="store_true",
- help='Force TCP. [Default %(default)s]',
- )
- parser.add_argument(
- '-T',
- '--sysinfo',
- dest='sysinfo',
- default=False,
- action="store_true",
- help='Print some system information and exit.',
- )
- parser.add_argument(
- '-u',
- '--udp',
- dest='udp',
- default=False,
- action="store_true",
- help='Force UDP. [Default %(default)s]',
- )
- parser.add_argument(
- '-v',
- '--verbose',
- dest='verbose',
- default=0,
- action='count',
- help='Verbose. Repeat for more verbosity. [Default %(default)s]',
- )
- parser.add_argument(
- '-W',
- '--timeout',
- default=60,
- dest='timeout',
- metavar='SEC',
- type=int,
- help='Specify timeout. [Default %(default)s]',
- )
- parser.add_argument(
- '-V', '--version',
- action='version',
- version="%(prog)s: Version " + gps_version + "\n",
- help='Output version to stderr, then exit'
- )
- parser.add_argument(
- '-x',
- '--predump',
- dest='predump',
- default=False,
- action="store_true",
- help='Dump packets as gpsfake gathers them. [Default %(default)s]',
- )
- parser.add_argument(
- 'arguments',
- metavar='logfile',
- nargs='*',
- help='Logfile(s) to read and feed to gpsd. At least one required.'
- )
- options = parser.parse_args()
- if options.sysinfo:
- sys.stdout.write("sys %s platform %s\nWRITE_PAD = %.5f\n"
- % (sys.platform, platform.platform(),
- gpsfake.GetDelay(options.slow)))
- raise SystemExit(0)
- if not options.arguments:
- sys.stderr.write("gpsfake: requires at least one logfile argument.\n")
- raise SystemExit(0)
- if options.promptme:
- options.linedump = True
- # debug options to pass to gpsd
- doptions = ''
- if options.debug:
- doptions += "-D " + options.debug[0] + " "
- if options.nowait:
- doptions += "-n "
- if options.options:
- doptions += options.options
- monitor = ()
- timeout = 0
- if options.gdb:
- if check_xterm():
- monitor = "xterm -e gdb -tui --args "
- else:
- monitor = "gdb --args "
- timeout = 0
- elif options.lldb:
- if check_xterm():
- monitor = "xterm -e lldb -- "
- else:
- monitor = "lldb -- "
- timeout = 0
- elif options.mon:
- monitor = options.mon + " "
- if options.timeout:
- timeout = options.timeout
- if not options.tcp and not options.udp:
- try:
- pty.openpty()
- except (AttributeError, OSError):
- sys.stderr.write('gpsfake: ptys not available, falling back'
- ' to UDP.\n')
- options.udp = True
- if options.progress:
- baton = Baton("Processing %s" % ",".join(options.arguments), "done")
- elif not options.quiet:
- sys.stderr.write("Processing %s\n" % ",".join(options.arguments))
- # Don't allocate a private port when cycling logs for client testing.
- if options.port is None and not options.pipe:
- options.port = int(gps.GPSD_PORT)
- test = gpsfake.TestSession(options=doptions,
- prefix=monitor,
- predump=options.predump,
- port=options.port,
- slow=options.slow,
- tcp=options.tcp,
- timeout=timeout,
- udp=options.udp,
- verbose=options.verbose)
- if options.pipe:
- test.reporter = bytesout.write
- if options.verbose:
- options.progress = False
- test.progress = sys.stderr.write
- test.spawn()
- try:
- for logfile in options.arguments:
- try:
- test.gps_add(logfile, speed=options.speed, pred=fakehook,
- oneshot=options.singleshot)
- except gpsfake.TestLoadError as e:
- sys.stderr.write("gpsfake: " + e.msg + "\n")
- raise SystemExit(1)
- except gpsfake.PacketError as e:
- sys.stderr.write("gpsfake: " + e.msg + "\n")
- raise SystemExit(1)
- except gpsfake.DaemonError as e:
- sys.stderr.write("gpsfake: " + e.msg + "\n")
- raise SystemExit(1)
- except IOError as e:
- if e.filename is None:
- sys.stderr.write("gpsfake: unknown internal I/O error %s\n"
- % e)
- else:
- sys.stderr.write("gpsfake: no such file as %s or "
- "file unreadable\n" % e.filename)
- raise SystemExit(1)
- except OSError:
- sys.stderr.write("gpsfake: can't open pty.\n")
- raise SystemExit(1)
- try:
- if options.pipe:
- test.client_add(options.client_init + "\n")
- # Give daemon time to get ready for the feeds.
- # Without a delay here there's a window for test
- # sentences to arrive before the watch takes effect.
- # This needs to increase if leading sentences in
- # test loads aren't being processed.
- # Until the ISYNC driver was introduced, 1 sec was
- # sufficient here. The extra 0.4s allows for the
- # additional two 200ms delays introduced by the
- # calls to gpsd_set_speed() in isync_detect()
- time.sleep(1.4)
- test.run()
- except socket.error as msg:
- sys.stderr.write("gpsfake: socket error %s.\n" % msg)
- raise SystemExit(1)
- except gps.client.json_error as e:
- sys.stderr.write("gpsfake: JSON error on line %s is %s.\n"
- % (repr(e.data), e.explanation))
- raise SystemExit(1)
- except KeyboardInterrupt:
- sys.stderr.write("gpsfake: aborted\n")
- raise SystemExit(1)
- finally:
- test.cleanup()
- if options.progress:
- baton.end()
- # The following sets edit modes for GNU EMACS
- # Local Variables:
- # mode:python
- # End:
- # vim: set expandtab shiftwidth=4
|