123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- #!/usr/bin/env python
- #
- '''
- gpsfake -- test harness for gpsd
- Simulates one or more GPSes, playing back logfiles.
- Most of the logic for this now lives in gps.fake,
- factored out so we can write other test programs with it.
- '''
- #
- # This file is Copyright (c) 2010 by the GPSD project
- # SPDX-License-Identifier: BSD-2-clause
- # This code runs compatibly under Python 2 and 3.x for x >= 2.
- # Preserve this property!
- from __future__ import absolute_import, print_function, division
- import getopt
- import os
- import platform
- import pty
- import socket
- import sys
- import time
- # pylint wants local modules last
- try:
- import gps
- import gps.fake as gpsfake # The "as" pacifies pychecker
- except ImportError as e:
- sys.stderr.write(
- "gpsfake: can't load Python gps libraries -- check PYTHONPATH.\n")
- sys.stderr.write("%s\n" % e)
- sys.exit(1)
- gps_version = '3.19.1~dev'
- if gps.__version__ != gps_version:
- sys.stderr.write("gpsfake: ERROR: need gps module version %s, got %s\n" %
- (gps_version, gps.__version__))
- sys.exit(1)
- try:
- my_input = raw_input
- except NameError:
- my_input = input
- # Get version of stdout for bytes data (NOP in Python 2)
- bytesout = gps.get_bytes_stream(sys.stdout)
- class Baton(object):
- "Ship progress indications to stderr."
- # By setting this > 1 we reduce the frequency of the twirl
- # and speed up test runs. Should be relatively prime to the
- # nunber of baton states, otherwise it will cause beat artifacts
- # in the twirling.
- SPINNER_INTERVAL = 11
- def __init__(self, prompt, endmsg=None):
- self.stream = sys.stderr
- self.stream.write(prompt + "...")
- if os.isatty(self.stream.fileno()):
- self.stream.write(" \b")
- self.stream.flush()
- self.count = 0
- self.endmsg = endmsg
- self.time = time.time()
- def twirl(self, ch=None):
- "Twirl the baton"
- if self.stream is None:
- return
- if os.isatty(self.stream.fileno()):
- if ch:
- self.stream.write(ch)
- self.stream.flush()
- elif self.count % Baton.SPINNER_INTERVAL == 0:
- self.stream.write("-/|\\"[self.count % 4])
- self.stream.write("\b")
- self.stream.flush()
- self.count = self.count + 1
- def end(self, mesg=None):
- "Write end message"
- if mesg is None:
- mesg = self.endmsg
- if self.stream:
- self.stream.write("...(%2.2f sec) %s.\n"
- % (time.time() - self.time, mesg))
- def hexdump(s):
- "Convert string to hex"
- rep = ""
- for c in s:
- rep += "%02x" % ord(c)
- return rep
- def fakehook(linenumber, fakegps):
- "Do the real work"
- if not fakegps.testload.sentences:
- sys.stderr.write("fakegps: no sentences in test load.\n")
- raise SystemExit(1)
- if linenumber % len(fakegps.testload.sentences) == 0:
- if singleshot and linenumber > 0:
- return False
- if progress:
- baton.twirl('*\b')
- elif not singleshot:
- if not quiet:
- sys.stderr.write("gpsfake: log cycle of %s begins.\n"
- % fakegps.testload.name)
- time.sleep(cycle)
- if linedump and fakegps.testload.legend:
- ml = fakegps.testload.sentences[
- linenumber % len(fakegps.testload.sentences)].strip()
- if not fakegps.testload.textual:
- ml = hexdump(ml)
- announce = fakegps.testload.legend \
- % (linenumber % len(fakegps.testload.sentences) + 1) + ml
- if promptme:
- my_input(announce + "? ")
- else:
- print(announce)
- if progress:
- baton.twirl()
- return True
- if __name__ == '__main__':
- def usage():
- "Print usage and exit"
- sys.stderr.write("""usage: gpsfake [OPTIONS] logfile...
- [-1] logfile is interpreted once only rather than repeatedly
- [-b] enable a twirling-baton progress indicator
- [-c cycle] sets the delay between sentences in seconds
- [-D debug] passes a -D option to the daemon
- [-g] run the gpsd instance within gpsfake under control of gdb
- [-G] run the gpsd instance within gpsfake under control of lldb
- [-h] print a usage message and exit
- [-i] single-stepping through logfile
- [-l] dump a line or packet number just before each sentence
- [-m monitor] specifies a monitor program under which the daemon is run
- [-n] start the daemon reading the GPS without waiting for a client
- [-o options] specifies options to pass to the daemon
- [-p] sets watcher mode and dump to stdout
- [-P port] sets the daemon's listening port
- [-q] act in a quiet manner
- [-r initcmd] specifies an initialization command to use in pipe mode
- [-S] insert realistic delays in the test input
- [-s speed] sets the baud rate for the slave tty
- [-t] force TCP
- [-T] print some system information and exit
- [-v] verbose
- [-V] Version
- [-W] specify timeout (default 60s), 0 means none
- [-x] dump packets as gpsfake gathers them
- """)
- raise SystemExit(0)
- try:
- (options, arguments) = getopt.getopt(
- sys.argv[1:],
- "1bc:D:gGhilm:no:pP:qr:s:StTuvxVW:"
- )
- except getopt.GetoptError as msg:
- print("gpsfake: " + str(msg))
- raise SystemExit(1)
- port = None
- progress = False
- cycle = 0.0
- monitor = ""
- speed = 4800
- linedump = False
- predump = False
- pipe = False
- singleshot = False
- promptme = False
- client_init = '?WATCH={"json":true,"nmea":true}'
- doptions = ""
- tcp = False
- udp = False
- verbose = 0
- slow = False
- quiet = False
- timeout = None # Really means default
- for (switch, val) in options:
- if switch == '-1':
- singleshot = True
- elif switch == '-b':
- progress = True
- elif switch == '-c':
- cycle = float(val)
- elif switch == '-D':
- doptions += " -D " + val
- elif switch == '-g':
- monitor = "xterm -e gdb -tui --args "
- timeout = 0
- elif switch == '-G':
- monitor = "xterm -e lldb -- "
- timeout = 0
- elif switch == '-h':
- usage()
- elif switch == '-i':
- linedump = promptme = True
- elif switch == '-l':
- linedump = True
- elif switch == '-m':
- monitor = val + " "
- elif switch == '-n':
- doptions += " -n"
- elif switch == '-o':
- doptions = val
- elif switch == '-p':
- pipe = True
- elif switch == '-P':
- port = int(val)
- elif switch == '-q':
- quiet = True
- elif switch == '-r':
- client_init = val
- elif switch == '-s':
- speed = int(val)
- elif switch == '-S':
- slow = True
- elif switch == '-t':
- tcp = True
- elif switch == '-T':
- sys.stdout.write("sys %s platform %s: WRITE_PAD = %.5f\n"
- % (sys.platform, platform.platform(),
- gpsfake.GetDelay(slow)))
- raise SystemExit(0)
- elif switch == '-u':
- udp = True
- elif switch == '-v':
- verbose += 1
- elif switch == '-V':
- sys.stderr.write("gpsfake: Version %s\n" % gps_version)
- sys.exit(0)
- elif switch == '-W':
- try:
- timeout = int(val)
- except:
- sys.stderr.write("gpsfake: bad timeout value.\n")
- raise SystemExit(1)
- elif switch == '-x':
- predump = True
- try:
- pty.openpty()
- except (AttributeError, OSError):
- sys.stderr.write("gpsfake: ptys not available, falling back to UDP.\n")
- udp = True
- if not arguments:
- sys.stderr.write("gpsfake: requires at least one logfile argument.\n")
- raise SystemExit(1)
- if progress:
- baton = Baton("Processing %s" % ",".join(arguments), "done")
- elif not quiet:
- sys.stderr.write("Processing %s\n" % ",".join(arguments))
- # Don't allocate a private port when cycling logs for client testing.
- if port is None and not pipe:
- port = int(gps.GPSD_PORT)
- test = gpsfake.TestSession(prefix=monitor, port=port, options=doptions,
- tcp=tcp, udp=udp, verbose=verbose,
- predump=predump, slow=slow, timeout=timeout)
- if pipe:
- test.reporter = bytesout.write
- if verbose:
- progress = False
- test.progress = sys.stderr.write
- test.spawn()
- try:
- for logfile in arguments:
- try:
- test.gps_add(logfile, speed=speed, pred=fakehook,
- oneshot=singleshot)
- except gpsfake.TestLoadError as e:
- sys.stderr.write("gpsfake: " + e.msg + "\n")
- raise SystemExit(1)
- except gpsfake.PacketError as e:
- sys.stderr.write("gpsfake: " + e.msg + "\n")
- raise SystemExit(1)
- except gpsfake.DaemonError as e:
- sys.stderr.write("gpsfake: " + e.msg + "\n")
- raise SystemExit(1)
- except IOError as e:
- if e.filename is None:
- sys.stderr.write("gpsfake: unknown internal I/O error %s\n"
- % e)
- else:
- sys.stderr.write("gpsfake: no such file as %s or "
- "file unreadable\n" % e.filename)
- raise SystemExit(1)
- except OSError:
- sys.stderr.write("gpsfake: can't open pty.\n")
- raise SystemExit(1)
- try:
- if pipe:
- test.client_add(client_init + "\n")
- # Give daemon time to get ready for the feeds.
- # Without a delay here there's a window for test
- # sentences to arrive before the watch takes effect.
- # This needs to increase if leading sentences in
- # test loads aren't being processed.
- # Until the ISYNC driver was introduced, 1 sec was
- # sufficient here. The extra 0.4s allows for the
- # additional two 200ms delays introduced by the
- # calls to gpsd_set_speed() in isync_detect()
- time.sleep(1.4)
- test.run()
- except socket.error as msg:
- sys.stderr.write("gpsfake: socket error %s.\n" % msg)
- raise SystemExit(1)
- except gps.client.json_error as e:
- sys.stderr.write("gpsfake: JSON error on line %s is %s.\n"
- % (repr(e.data), e.explanation))
- raise SystemExit(1)
- except KeyboardInterrupt:
- sys.stderr.write("gpsfake: aborted\n")
- raise SystemExit(1)
- finally:
- test.cleanup()
- if progress:
- baton.end()
- # The following sets edit modes for GNU EMACS
- # Local Variables:
- # mode:python
- # End:
|