gpscat.py.in 6.3 KB


  1. #!@PYSHEBANG@
  2. #
  3. # @GENERATED@
  4. # This file is Copyright 2010 by the GPSD project
  5. # SPDX-License-Identifier: BSD-2-clause
  6. # This code runs compatibly under Python 2 and 3.x for x >= 2.
  7. # Preserve this property!
  8. # Codacy D203 and D211 conflict, I choose D203
  9. # Codacy D212 and D213 conflict, I choose D212
  10. """Display GPS output. Hexify it if necessary."""
  11. from __future__ import absolute_import, print_function, division
  12. import argparse
  13. import os
  14. import select
  15. import socket
  16. import string
  17. import sys
  18. import termios
  19. # pylint wants local modules last
  20. try:
  21. import gps
  22. import gps.packet as sniffer
  23. from gps.misc import BINARY_ENCODING
  24. except ImportError as e:
  25. sys.stderr.write(
  26. "gpscat: can't load Python gps libraries -- check PYTHONPATH.\n")
  27. sys.stderr.write("%s\n" % e)
  28. sys.exit(1)
  29. gps_version = '@VERSION@'
  30. if gps.__version__ != gps_version:
  31. sys.stderr.write("gpscat: ERROR: need gps module version %s, got %s\n" %
  32. (gps_version, gps.__version__))
  33. sys.exit(1)
  34. # The spec says 82, but some receivers (TN-200, GSW 2.3.2) output 86 characters
  35. # the Skyrtaq S2525F8 emits 100 chars
  36. NMEA_MAX = 102
  37. # Lowest debug level at which packet getter begins to emit messages, minus one
  38. BASELEVEL = sniffer.LOG_IO
  39. highhalf_latch = True
  40. PRINTABLE = set(bytearray(string.printable, encoding=BINARY_ENCODING))
  41. def hexdump(st):
  42. """Convert string to hex string."""
  43. dmp = ""
  44. for ch in bytearray(st): # bytearray gets array of ints in Python 2 and 3
  45. if ch in PRINTABLE:
  46. dmp += chr(ch)
  47. else:
  48. dmp += "\\x%02x" % ch
  49. return dmp
  50. def reporter(errlevel, mesg):
  51. """Report errors, depending on log level."""
  52. if errlevel <= options.debug:
  53. sys.stdout.write(mesg)
  54. if __name__ == '__main__':
  55. usage = '%(prog)s [OPTIONS]'
  56. epilog = ('BSD terms apply: see the file COPYING in the distribution root'
  57. ' for details.')
  58. parser = argparse.ArgumentParser(usage=usage, epilog=epilog)
  59. parser.add_argument(
  60. '-?',
  61. action="help",
  62. help='show this help message and exit'
  63. )
  64. parser.add_argument(
  65. '-D',
  66. '--debug',
  67. dest='debug',
  68. default=0,
  69. type=int,
  70. help='Set level of debug. Must be integer. [Default %(default)s]',
  71. )
  72. parser.add_argument(
  73. '-p', '--packetizer',
  74. action='store_true',
  75. dest='packetizer',
  76. default=False,
  77. help="Set packetizer mode",
  78. )
  79. parser.add_argument(
  80. '-s', '--speed',
  81. dest='speed_str',
  82. default=None,
  83. help=('Set port speed. '
  84. ' Optionally parity (N, O, E) and stop bits (1, 2)'),
  85. )
  86. parser.add_argument(
  87. '-t', '--typeflag',
  88. action='store_true',
  89. dest='typeflag',
  90. default=False,
  91. help="Select packetizer mode with type and length",
  92. )
  93. parser.add_argument(
  94. '-V', '--version',
  95. action='version',
  96. version="%(prog)s: Version " + gps_version + "\n",
  97. help='Output version to stderr, then exit',
  98. )
  99. parser.add_argument(
  100. 'target',
  101. help='File or Serial port to read from',
  102. )
  103. options = parser.parse_args()
  104. options.speed = None
  105. options.parity = None
  106. options.stopbits = None
  107. if options.speed_str:
  108. if options.speed_str[-2] in ('N', 'E', 'O'):
  109. options.parity = options.speed_str[-2]
  110. options.stopbits = int(options.speed_str[-1])
  111. options.speed_str = options.speed_str[:-2]
  112. options.speed = int(options.speed_str)
  113. # adjust debug level
  114. options.debug += BASELEVEL
  115. if options.packetizer or options.typeflag:
  116. options.rawmode = False
  117. else:
  118. options.rawmode = True
  119. try:
  120. if "rfcomm" in options.target: # Bluetooth special case
  121. s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM,
  122. socket.BTPROTO_RFCOMM)
  123. s.connect((options.target, 1))
  124. tty = s.fileno()
  125. else: # Ordinary device
  126. tty = os.open(options.target, os.O_RDWR)
  127. if options.speed is not None:
  128. (iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = \
  129. termios.tcgetattr(tty)
  130. try:
  131. ispeed = ospeed = eval("termios.B%d" % options.speed)
  132. except AttributeError:
  133. sys.stderr.write("gpscat: unknown baud rate %d\n" %
  134. options.speed)
  135. raise SystemExit(1)
  136. if options.stopbits:
  137. cflag &= ~termios.CSIZE
  138. cflag |= (termios.CS8, termios.CS7)[options.stopbits - 1]
  139. if options.parity:
  140. if options.parity == 'N':
  141. iflag &= ~termios.PARENB
  142. iflag &= ~termios.INPCK
  143. elif options.parity == 'O':
  144. iflag |= termios.INPCK
  145. cflag |= termios.PARENB
  146. cflag |= termios.PARODD
  147. elif options.parity == 'E':
  148. iflag |= termios.INPCK
  149. cflag |= termios.PARENB
  150. cflag &= ~termios.PARODD
  151. termios.tcsetattr(tty, termios.TCSANOW,
  152. [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
  153. if not options.rawmode:
  154. getter = sniffer.new()
  155. sniffer.register_report(reporter)
  156. seqno = 0
  157. while True:
  158. rlist, wlist, xlist = select.select([tty], [], [])
  159. if rlist == [tty]:
  160. if options.rawmode:
  161. buf = os.read(tty, NMEA_MAX)
  162. if not buf:
  163. break
  164. sys.stdout.write(hexdump(buf))
  165. else:
  166. (length, ptype, packet, counter) = getter.get(tty)
  167. seqno += 1
  168. if length == 0:
  169. break
  170. if options.typeflag:
  171. sys.stdout.write('%d (%d@%d): %s\n' % (ptype, length,
  172. counter - length, hexdump(packet)))
  173. else:
  174. sys.stdout.write(hexdump(packet) + "\n")
  175. except KeyboardInterrupt:
  176. if options.rawmode:
  177. sys.stdout.write("\n")
  178. raise SystemExit(0)
  179. # Local variables:
  180. # mode: python
  181. # end:
  182. # vim: set expandtab shiftwidth=4