123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- #!/usr/bin/env python
- #
- "Display GPS output. Hexify it if necessary."
- # This file is Copyright (c) 2010 by the GPSD project
- # SPDX-License-Identifier: BSD-2-clause
- # This code runs compatibly under Python 2 and 3.x for x >= 2.
- # Preserve this property!
- from __future__ import absolute_import, print_function, division
- import getopt
- import os
- import select
- import socket
- import string
- import sys
- import termios
- # pylint wants local modules last
- try:
- import gps
- import gps.packet as sniffer
- from gps.misc import BINARY_ENCODING
- except ImportError as e:
- sys.stderr.write(
- "gpscat: can't load Python gps libraries -- check PYTHONPATH.\n")
- sys.stderr.write("%s\n" % e)
- sys.exit(1)
- gps_version = '3.19.1~dev'
- if gps.__version__ != gps_version:
- sys.stderr.write("gpscat: ERROR: need gps module version %s, got %s\n" %
- (gps_version, gps.__version__))
- sys.exit(1)
- # The spec says 82, but some receivers (TN-200, GSW 2.3.2) output 86 characters
- # the Skyrtaq S2525F8 emits 100 chars
- NMEA_MAX = 102
- # Lowest debug level at which packet getter begins to emit messages, minus one
- BASELEVEL = sniffer.LOG_IO
- highhalf_latch = True
- PRINTABLE = set(bytearray(string.printable, encoding=BINARY_ENCODING))
- def hexdump(st):
- "Convert string to hex string"
- dmp = ""
- for ch in bytearray(st): # bytearray gets array of ints in Python 2 and 3
- if ch in PRINTABLE:
- dmp += chr(ch)
- else:
- dmp += "\\x%02x" % ch
- return dmp
- debuglevel = 0
- def reporter(errlevel, mesg):
- "Report errors, depending on log level"
- if errlevel <= debuglevel:
- sys.stdout.write(mesg)
- def printusage():
- "Print usage"
- sys.stderr.write("usage: gpscat [-D debuglevel] [-h] [-p] [-s speed] [-t] "
- "[-V] serial-port\n")
- if __name__ == '__main__':
- try:
- try:
- (options, arguments) = getopt.getopt(sys.argv[1:], "hps:tD:V")
- except getopt.GetoptError as msg:
- print("gpscat: " + str(msg))
- raise SystemExit(1)
- speed = None
- parity = None
- stopbits = None
- rawmode = True
- typeflag = False
- for (switch, val) in options:
- if switch == '-D':
- debuglevel = BASELEVEL + int(val)
- elif switch == '-h':
- printusage()
- raise SystemExit(0)
- elif switch == '-p':
- rawmode = False
- elif switch == '-s':
- if val[-2] in ('N', 'E', 'O'):
- parity = val[-2]
- stopbits = int(val[-1])
- val = val[:-2]
- speed = int(val)
- elif switch == '-t':
- typeflag = True
- rawmode = False
- elif switch == '-V':
- sys.stderr.write("gpscat: Version %s\n" % gps_version)
- sys.exit(0)
- if len(arguments) != 1:
- printusage()
- raise SystemExit(1)
- if "rfcomm" in arguments[0]: # Bluetooth special case
- s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM,
- socket.BTPROTO_RFCOMM)
- s.connect((arguments[0], 1))
- tty = s.fileno()
- else: # Ordinary device
- tty = os.open(arguments[0], os.O_RDWR)
- if speed is not None:
- (iflag, oflag, cflag, lflag, ispeed, ospeed, cc) = \
- termios.tcgetattr(tty)
- try:
- ispeed = ospeed = eval("termios.B%d" % speed)
- except AttributeError:
- sys.stderr.write("gpscat: unknown baud rate %d\n" % speed)
- raise SystemExit(1)
- if stopbits:
- cflag &= ~termios.CSIZE
- cflag |= (termios.CS8, termios.CS7)[stopbits - 1]
- if parity:
- if parity == 'N':
- iflag &= ~termios.PARENB
- iflag &= ~termios.INPCK
- elif parity == 'O':
- iflag |= termios.INPCK
- cflag |= termios.PARENB
- cflag |= termios.PARODD
- elif parity == 'E':
- iflag |= termios.INPCK
- cflag |= termios.PARENB
- cflag &= ~termios.PARODD
- termios.tcsetattr(tty, termios.TCSANOW,
- [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
- if not rawmode:
- getter = sniffer.new()
- sniffer.register_report(reporter)
- seqno = 0
- while True:
- rlist, wlist, xlist = select.select([tty], [], [])
- if rlist == [tty]:
- if rawmode:
- buf = os.read(tty, NMEA_MAX)
- if not buf:
- break
- sys.stdout.write(hexdump(buf))
- else:
- (length, ptype, packet, counter) = getter.get(tty)
- seqno += 1
- if length == 0:
- break
- if typeflag:
- sys.stdout.write(repr(ptype) + " (" + repr(length) +
- "@" + repr(counter - length) + "): " +
- hexdump(packet))
- sys.stdout.write("\n")
- else:
- sys.stdout.write(hexdump(packet) + "\n")
- except KeyboardInterrupt:
- if rawmode:
- sys.stdout.write("\n")
- raise SystemExit(0)
- # Local variables:
- # mode: python
- # end:
|