gpsfake 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. #!/usr/bin/env python
  2. #
  3. '''
  4. gpsfake -- test harness for gpsd
  5. Simulates one or more GPSes, playing back logfiles.
  6. Most of the logic for this now lives in gps.fake,
  7. factored out so we can write other test programs with it.
  8. '''
  9. #
  10. # This file is Copyright (c) 2010 by the GPSD project
  11. # SPDX-License-Identifier: BSD-2-clause
  12. # This code runs compatibly under Python 2 and 3.x for x >= 2.
  13. # Preserve this property!
  14. from __future__ import absolute_import, print_function, division
  15. import getopt
  16. import os
  17. import platform
  18. import pty
  19. import socket
  20. import sys
  21. import time
  22. # pylint wants local modules last
  23. try:
  24. import gps
  25. import gps.fake as gpsfake # The "as" pacifies pychecker
  26. except ImportError as e:
  27. sys.stderr.write(
  28. "gpsfake: can't load Python gps libraries -- check PYTHONPATH.\n")
  29. sys.stderr.write("%s\n" % e)
  30. sys.exit(1)
  31. gps_version = '3.19.1~dev'
  32. if gps.__version__ != gps_version:
  33. sys.stderr.write("gpsfake: ERROR: need gps module version %s, got %s\n" %
  34. (gps_version, gps.__version__))
  35. sys.exit(1)
  36. try:
  37. my_input = raw_input
  38. except NameError:
  39. my_input = input
  40. # Get version of stdout for bytes data (NOP in Python 2)
  41. bytesout = gps.get_bytes_stream(sys.stdout)
  42. class Baton(object):
  43. "Ship progress indications to stderr."
  44. # By setting this > 1 we reduce the frequency of the twirl
  45. # and speed up test runs. Should be relatively prime to the
  46. # nunber of baton states, otherwise it will cause beat artifacts
  47. # in the twirling.
  48. SPINNER_INTERVAL = 11
  49. def __init__(self, prompt, endmsg=None):
  50. self.stream = sys.stderr
  51. self.stream.write(prompt + "...")
  52. if os.isatty(self.stream.fileno()):
  53. self.stream.write(" \b")
  54. self.stream.flush()
  55. self.count = 0
  56. self.endmsg = endmsg
  57. self.time = time.time()
  58. def twirl(self, ch=None):
  59. "Twirl the baton"
  60. if self.stream is None:
  61. return
  62. if os.isatty(self.stream.fileno()):
  63. if ch:
  64. self.stream.write(ch)
  65. self.stream.flush()
  66. elif self.count % Baton.SPINNER_INTERVAL == 0:
  67. self.stream.write("-/|\\"[self.count % 4])
  68. self.stream.write("\b")
  69. self.stream.flush()
  70. self.count = self.count + 1
  71. def end(self, mesg=None):
  72. "Write end message"
  73. if mesg is None:
  74. mesg = self.endmsg
  75. if self.stream:
  76. self.stream.write("...(%2.2f sec) %s.\n"
  77. % (time.time() - self.time, mesg))
  78. def hexdump(s):
  79. "Convert string to hex"
  80. rep = ""
  81. for c in s:
  82. rep += "%02x" % ord(c)
  83. return rep
  84. def fakehook(linenumber, fakegps):
  85. "Do the real work"
  86. if not fakegps.testload.sentences:
  87. sys.stderr.write("fakegps: no sentences in test load.\n")
  88. raise SystemExit(1)
  89. if linenumber % len(fakegps.testload.sentences) == 0:
  90. if singleshot and linenumber > 0:
  91. return False
  92. if progress:
  93. baton.twirl('*\b')
  94. elif not singleshot:
  95. if not quiet:
  96. sys.stderr.write("gpsfake: log cycle of %s begins.\n"
  97. % fakegps.testload.name)
  98. time.sleep(cycle)
  99. if linedump and fakegps.testload.legend:
  100. ml = fakegps.testload.sentences[
  101. linenumber % len(fakegps.testload.sentences)].strip()
  102. if not fakegps.testload.textual:
  103. ml = hexdump(ml)
  104. announce = fakegps.testload.legend \
  105. % (linenumber % len(fakegps.testload.sentences) + 1) + ml
  106. if promptme:
  107. my_input(announce + "? ")
  108. else:
  109. print(announce)
  110. if progress:
  111. baton.twirl()
  112. return True
  113. if __name__ == '__main__':
  114. def usage():
  115. "Print usage and exit"
  116. sys.stderr.write("""usage: gpsfake [OPTIONS] logfile...
  117. [-1] logfile is interpreted once only rather than repeatedly
  118. [-b] enable a twirling-baton progress indicator
  119. [-c cycle] sets the delay between sentences in seconds
  120. [-D debug] passes a -D option to the daemon
  121. [-g] run the gpsd instance within gpsfake under control of gdb
  122. [-G] run the gpsd instance within gpsfake under control of lldb
  123. [-h] print a usage message and exit
  124. [-i] single-stepping through logfile
  125. [-l] dump a line or packet number just before each sentence
  126. [-m monitor] specifies a monitor program under which the daemon is run
  127. [-n] start the daemon reading the GPS without waiting for a client
  128. [-o options] specifies options to pass to the daemon
  129. [-p] sets watcher mode and dump to stdout
  130. [-P port] sets the daemon's listening port
  131. [-q] act in a quiet manner
  132. [-r initcmd] specifies an initialization command to use in pipe mode
  133. [-S] insert realistic delays in the test input
  134. [-s speed] sets the baud rate for the slave tty
  135. [-t] force TCP
  136. [-T] print some system information and exit
  137. [-v] verbose
  138. [-V] Version
  139. [-W] specify timeout (default 60s), 0 means none
  140. [-x] dump packets as gpsfake gathers them
  141. """)
  142. raise SystemExit(0)
  143. try:
  144. (options, arguments) = getopt.getopt(
  145. sys.argv[1:],
  146. "1bc:D:gGhilm:no:pP:qr:s:StTuvxVW:"
  147. )
  148. except getopt.GetoptError as msg:
  149. print("gpsfake: " + str(msg))
  150. raise SystemExit(1)
  151. port = None
  152. progress = False
  153. cycle = 0.0
  154. monitor = ""
  155. speed = 4800
  156. linedump = False
  157. predump = False
  158. pipe = False
  159. singleshot = False
  160. promptme = False
  161. client_init = '?WATCH={"json":true,"nmea":true}'
  162. doptions = ""
  163. tcp = False
  164. udp = False
  165. verbose = 0
  166. slow = False
  167. quiet = False
  168. timeout = None # Really means default
  169. for (switch, val) in options:
  170. if switch == '-1':
  171. singleshot = True
  172. elif switch == '-b':
  173. progress = True
  174. elif switch == '-c':
  175. cycle = float(val)
  176. elif switch == '-D':
  177. doptions += " -D " + val
  178. elif switch == '-g':
  179. monitor = "xterm -e gdb -tui --args "
  180. timeout = 0
  181. elif switch == '-G':
  182. monitor = "xterm -e lldb -- "
  183. timeout = 0
  184. elif switch == '-h':
  185. usage()
  186. elif switch == '-i':
  187. linedump = promptme = True
  188. elif switch == '-l':
  189. linedump = True
  190. elif switch == '-m':
  191. monitor = val + " "
  192. elif switch == '-n':
  193. doptions += " -n"
  194. elif switch == '-o':
  195. doptions = val
  196. elif switch == '-p':
  197. pipe = True
  198. elif switch == '-P':
  199. port = int(val)
  200. elif switch == '-q':
  201. quiet = True
  202. elif switch == '-r':
  203. client_init = val
  204. elif switch == '-s':
  205. speed = int(val)
  206. elif switch == '-S':
  207. slow = True
  208. elif switch == '-t':
  209. tcp = True
  210. elif switch == '-T':
  211. sys.stdout.write("sys %s platform %s: WRITE_PAD = %.5f\n"
  212. % (sys.platform, platform.platform(),
  213. gpsfake.GetDelay(slow)))
  214. raise SystemExit(0)
  215. elif switch == '-u':
  216. udp = True
  217. elif switch == '-v':
  218. verbose += 1
  219. elif switch == '-V':
  220. sys.stderr.write("gpsfake: Version %s\n" % gps_version)
  221. sys.exit(0)
  222. elif switch == '-W':
  223. try:
  224. timeout = int(val)
  225. except:
  226. sys.stderr.write("gpsfake: bad timeout value.\n")
  227. raise SystemExit(1)
  228. elif switch == '-x':
  229. predump = True
  230. try:
  231. pty.openpty()
  232. except (AttributeError, OSError):
  233. sys.stderr.write("gpsfake: ptys not available, falling back to UDP.\n")
  234. udp = True
  235. if not arguments:
  236. sys.stderr.write("gpsfake: requires at least one logfile argument.\n")
  237. raise SystemExit(1)
  238. if progress:
  239. baton = Baton("Processing %s" % ",".join(arguments), "done")
  240. elif not quiet:
  241. sys.stderr.write("Processing %s\n" % ",".join(arguments))
  242. # Don't allocate a private port when cycling logs for client testing.
  243. if port is None and not pipe:
  244. port = int(gps.GPSD_PORT)
  245. test = gpsfake.TestSession(prefix=monitor, port=port, options=doptions,
  246. tcp=tcp, udp=udp, verbose=verbose,
  247. predump=predump, slow=slow, timeout=timeout)
  248. if pipe:
  249. test.reporter = bytesout.write
  250. if verbose:
  251. progress = False
  252. test.progress = sys.stderr.write
  253. test.spawn()
  254. try:
  255. for logfile in arguments:
  256. try:
  257. test.gps_add(logfile, speed=speed, pred=fakehook,
  258. oneshot=singleshot)
  259. except gpsfake.TestLoadError as e:
  260. sys.stderr.write("gpsfake: " + e.msg + "\n")
  261. raise SystemExit(1)
  262. except gpsfake.PacketError as e:
  263. sys.stderr.write("gpsfake: " + e.msg + "\n")
  264. raise SystemExit(1)
  265. except gpsfake.DaemonError as e:
  266. sys.stderr.write("gpsfake: " + e.msg + "\n")
  267. raise SystemExit(1)
  268. except IOError as e:
  269. if e.filename is None:
  270. sys.stderr.write("gpsfake: unknown internal I/O error %s\n"
  271. % e)
  272. else:
  273. sys.stderr.write("gpsfake: no such file as %s or "
  274. "file unreadable\n" % e.filename)
  275. raise SystemExit(1)
  276. except OSError:
  277. sys.stderr.write("gpsfake: can't open pty.\n")
  278. raise SystemExit(1)
  279. try:
  280. if pipe:
  281. test.client_add(client_init + "\n")
  282. # Give daemon time to get ready for the feeds.
  283. # Without a delay here there's a window for test
  284. # sentences to arrive before the watch takes effect.
  285. # This needs to increase if leading sentences in
  286. # test loads aren't being processed.
  287. # Until the ISYNC driver was introduced, 1 sec was
  288. # sufficient here. The extra 0.4s allows for the
  289. # additional two 200ms delays introduced by the
  290. # calls to gpsd_set_speed() in isync_detect()
  291. time.sleep(1.4)
  292. test.run()
  293. except socket.error as msg:
  294. sys.stderr.write("gpsfake: socket error %s.\n" % msg)
  295. raise SystemExit(1)
  296. except gps.client.json_error as e:
  297. sys.stderr.write("gpsfake: JSON error on line %s is %s.\n"
  298. % (repr(e.data), e.explanation))
  299. raise SystemExit(1)
  300. except KeyboardInterrupt:
  301. sys.stderr.write("gpsfake: aborted\n")
  302. raise SystemExit(1)
  303. finally:
  304. test.cleanup()
  305. if progress:
  306. baton.end()
  307. # The following sets edit modes for GNU EMACS
  308. # Local Variables:
  309. # mode:python
  310. # End: