123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- #!@PYSHEBANG@
- # -*- coding: UTF-8
- # @GENERATED@
- # This file is Copyright 2018 by the GPSD project
- # SPDX-License-Identifier: BSD-2-clause
- #
- # This code runs compatibly under Python 2 and 3.x for x >= 2.
- # Preserve this property!
- #
- # ENVIRONMENT:
- # Options in the UBXOPTS environment variable will be parsed before
- # the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED'
- #
- # To see what constellations are enabled:
- # ubxtool -p CFG-GNSS -f /dev/ttyXX
- #
- # To disable GLONASS and enable GALILEO:
- # ubxtool -d GLONASS -f /dev/ttyXX
- # ubxtool -e GALILEO -f /dev/ttyXX
- #
- # To read GPS messages a log file:
- # ubxtool -v 2 -f test/daemon/ublox-neo-m8n.log
- #
- # References:
- # [1] IS-GPS-200K
- # Codacy D203 and D211 conflict, I choose D203
- # Codacy D212 and D213 conflict, I choose D212
- """ubxtool -- u-blox configurator and packet decoder.
- usage: ubxtool [OPTIONS] [host[:port[:device]]]
- """
- from __future__ import absolute_import, print_function, division
- import argparse # to parse CLI options
- from functools import reduce # pylint: disable=redefined-builtin
- import operator # for or_
- import os # for os.environ
- import re # for regular expressions
- import struct # for pack()
- import sys
- PROG_NAME = 'ubxtool'
- try:
- import gps
- import gps.ubx
- except ImportError:
- # PEP8 says local imports last
- sys.stderr.write("%s: failed to import gps, check PYTHONPATH\n" %
- PROG_NAME)
- sys.exit(2)
- gps_version = '@VERSION@'
- if gps.__version__ != gps_version:
- sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" %
- (PROG_NAME, gps_version, gps.__version__))
- sys.exit(1)
- # Some old versions of Python fail to accept a bytearray as an input to
- # struct.unpack_from, though it can be worked around by wrapping it with
- # buffer(). Since the fix is only needed in rare cases, this monkey-patches
- # struct.unpack_from() when needed, and otherwise changes nothing. If
- # struct.unpack() were used, it would need similar treatment, as would
- # methods from struct.Struct if that were used.
- try:
- struct.unpack_from('B', bytearray(1))
- except TypeError:
- unpack_from_orig = struct.unpack_from
- def unpack_from_fixed(fmt, buf, offset=0):
- """Unpack_from_fixed."""
- # buffer() is a Python 2 thing.
- return unpack_from_orig(fmt, buffer(buf), offset=offset)
- struct.unpack_from = unpack_from_fixed
- # opts['protver'], protocol version for sent commands
- # u-blox 5, firmware 4 to 6 is protver 10 to 12
- # u-blox 6, firmware 6 to 7 is protver 12 to 13
- # u-blox 6, firmware 1 is protver 14
- # u-blox 7, firmware 1 is protver 14
- # u-blox 8, is protver 15 to 23
- # u-blox 9, firmware 1 is protver 27
- # u-blox F9T, firmware 2 is protver 29
- # u-blox F9N, firmware 4 is protver 32
- # instantiate the GPS class
- gps_model = gps.ubx.ubx()
- if 'UBXOPTS' in os.environ:
- # grab the UBXOPTS environment variable to be handled as prepended options
- options = os.environ['UBXOPTS'].split(' ') + sys.argv[1:]
- else:
- options = sys.argv[1:]
- # "m:")
- usage = '%(prog)s [OPTIONS] [host[:port[:device]]]'
- epilog = ('BSD terms apply: see the file COPYING in the distribution '
- 'root for details.')
- parser = argparse.ArgumentParser(add_help=False, epilog=epilog, usage=usage)
- parser.add_argument(
- '-?', '-h', '--help',
- action="store_true",
- dest='help',
- default=False,
- help='Show this help message and exit. Use -v 2, or -v 3, for extra help'
- )
- parser.add_argument(
- '-c', '--command',
- dest='command',
- default=None,
- metavar='CMD',
- help='Send raw command CMD (cls,id...) to receiver'
- )
- parser.add_argument(
- '-d', '--disable',
- action='append',
- dest='disable',
- default=None,
- metavar='ABLE',
- help='Disable ABLE in the receiver. May be used multiple times. ',
- )
- parser.add_argument(
- '-e', '--enable',
- action='append',
- dest='enable',
- default=None,
- metavar='ABLE',
- help='Enable ABLE in the receiver. May be used multiple times. ',
- )
- parser.add_argument(
- '--device',
- dest='gpsd_device',
- default=None,
- metavar='DEVICE',
- help='The gpsd device to connect to. [Default %(default)s]',
- )
- parser.add_argument(
- '-f', '--file',
- dest='input_file_name',
- default=None,
- metavar='FILE',
- help='Read from FILE instead of a gpsd instance.',
- )
- parser.add_argument(
- '-g', '--getitem',
- action='append',
- dest='get_item',
- default=None,
- metavar='ITEM,LAYER,POSITION',
- help=('Get ITEM from LAYER and POSITION. LAYER and POSITION are '
- 'optional. May be used multiple times.'),
- )
- parser.add_argument(
- '--host',
- dest='gpsd_host',
- default=None,
- metavar='HOST',
- help='The gpsd host to connect to.',
- )
- parser.add_argument(
- '-i', '-portid',
- dest='port',
- default=None,
- metavar='PORTID',
- help=('Specifies receiver PORTID (interface) for port-related commands. '
- '[Default %(default)s]'),
- )
- parser.add_argument(
- '--port',
- dest='gpsd_port',
- default=gps.GPSD_PORT,
- metavar='PORT',
- type=int,
- help='The gpsd port to connect to. [Default %(default)s]',
- )
- parser.add_argument(
- '-p', '--preset',
- action='append',
- dest='poll',
- metavar='PRESET',
- help='Poll the receiver for PRESET. May be used multiple times.',
- )
- parser.add_argument(
- '-P', '--protver',
- dest='protver',
- default=10.0,
- type=float,
- help='Protocol version for sending commands. [Default %(default)s]',
- )
- parser.add_argument(
- '-r', '--readonly',
- action='store_true',
- dest='read_only',
- default=False,
- help='Read only. Do not send anything to the GPS.',
- )
- parser.add_argument(
- '-R', '--rawfile',
- dest='raw_file',
- default=None,
- metavar='FILE',
- help='Save raw data from receiver in FILE\n',
- )
- parser.add_argument(
- '-s', '--inspeed',
- choices=gps_model.speeds,
- dest='input_speed',
- default=9600,
- metavar='SPEED',
- type=int,
- help='Set local serial port speed to SPEED bps.',
- )
- parser.add_argument(
- '-S', '--setspeed',
- # FIXME: speeds s/b in io class
- choices=gps_model.speeds,
- dest='set_speed',
- metavar='SPEED',
- type=int,
- help='Configure receiver speed to SETSPEED.',
- )
- parser.add_argument(
- '-t', '--timestamp',
- action='count',
- dest='timestamp',
- default=0,
- help='Timestamp messages with seconds since UNIX epoch. Use -tt for UTC',
- )
- parser.add_argument(
- '-v', '--verbosity',
- dest='verbosity',
- default=0,
- metavar='VERB',
- type=int,
- help='Set verbosity level to V, 0 to 5. [Default %(default)s]',
- )
- parser.add_argument(
- '-V', '--version',
- action='version',
- version="%(prog)s: Version " + gps_version + "\n",
- help='Output version to stderr, then exit',
- )
- parser.add_argument(
- '-w', '--wait',
- dest='input_wait',
- default=2.0,
- metavar='WAIT',
- type=float,
- help='Wait for WAIT seconds before exiting.. [Default %(default)s]',
- )
- parser.add_argument(
- '-x', '--delitem',
- action='append',
- dest='del_item',
- default=None,
- metavar='ITEM',
- help=('Delete ITEM from receiver BBR and FLASH layers. '
- 'May be used multiple times. '),
- )
- parser.add_argument(
- '-z', '--setitem',
- action='append',
- dest='set_item',
- default=None,
- metavar='ITEM,VAL[,LAYER]',
- help=('Set ITEM in receiver to VAL in LAYER. LAYER is optional. '
- 'May be used multiple times. '),
- )
- parser.add_argument(
- 'target',
- nargs='?',
- help='[host[:port[:device]]]',
- )
- # turn the stupid Namespace into a nice dictionary
- opts = vars(parser.parse_args(options))
- gps_model.port = opts['port']
- gps_model.protver = opts['protver']
- gps_model.read_only = opts['read_only']
- gps_model.timestamp = opts['timestamp']
- gps_model.verbosity = opts['verbosity']
- if opts['port']:
- # FIXME: better error message
- valnum = gps_model.port_id_map.get(opts['port'].upper())
- opts['port'] = valnum if valnum is not None else int(opts['port'])
- if opts['help']:
- parser.print_help()
- if gps.VERB_DECODE <= opts['verbosity']:
- print('\nABLE for -d/--disable and -e/--enable can be one of:')
- for item in sorted(gps_model.able_commands.keys()):
- print(" %-13s %s" %
- (item, gps_model.able_commands[item]["help"]))
- print('\nPRESET for -p can be one of:')
- for item in sorted(gps_model.commands.keys()):
- print(" %-13s %s" % (item, gps_model.commands[item]["help"]))
- print('\n')
- if gps.VERB_DECODE < opts['verbosity']:
- print('\nITEM for -g/--getitem, -x/--delitem and -z/--setitem '
- 'can be one of:')
- for item in sorted(gps_model.cfgs):
- print(" %s\n"
- " %s" % (item[0], item[5]))
- print('\n')
- print('Options can be placed in the UBXOPTS environment variable.\n'
- 'UBXOPTS is processed before the CLI options.')
- sys.exit(0)
- if opts['input_file_name']:
- # input file given
- if opts['target']:
- sys.stderr.write('%s: ERROR: both input file and target given.\n' %
- (PROG_NAME,))
- sys.exit(0)
- elif opts['target']:
- # TODO: move to module gps as a function
- # host[:port[:device]]
- # or maybe ::device
- # or maybe \[ipv6\][:port[:device]]
- if '[' == opts['target'][0]:
- # hex, or hex+zoneindex IPv6 address
- # could be like [fe80::1ff:fe23:4567:890a%eth2]
- match = re.match(r'''\[([^]]+)\](.*)''', opts['target'])
- opts['gpsd_host'] = match.group(1)
- parts = match.group(2).split(':')
- else:
- # maybe IPv4 address, maybe hostname
- parts = opts['target'].split(':')
- if parts[0]:
- opts['gpsd_host'] = parts[0]
- if 1 < len(parts):
- if parts[1]:
- opts['gpsd_port'] = parts[1]
- if 2 < len(parts) and parts[2]:
- opts['gpsd_device'] = parts[2]
- else:
- # else, use defaults: localhost:2947:
- if not opts['gpsd_host']:
- opts['gpsd_host'] = 'localhost'
- if gps.VERB_PROG <= opts['verbosity']:
- # dump versions and all options
- print('%s: Version %s\n' % (PROG_NAME, gps_version))
- print('Options:')
- for option in sorted(opts):
- print(" %s: %s" % (option, opts[option]))
- # done parsing arguments from environment and CLI
- try:
- # raw log file requested?
- raw = None
- if opts['raw_file']:
- try:
- raw = open(opts['raw_file'], 'wb')
- except IOError:
- sys.stderr.write('%s: failed to open raw file %s\n' %
- (PROG_NAME, opts['raw_file']))
- sys.exit(1)
- # Check if there is a conflict
- # create the I/O instance
- io_handle = gps.gps_io(
- input_file_name=opts['input_file_name'],
- read_only=opts['read_only'],
- gpsd_host=opts['gpsd_host'],
- gpsd_port=opts['gpsd_port'],
- gpsd_device=opts['gpsd_device'],
- input_speed=opts['input_speed'],
- verbosity_level=opts['verbosity'],
- write_requested=(opts['disable'] or opts['enable'] or opts['poll']))
- gps_model.io_handle = io_handle
- sys.stdout.flush()
- if opts['disable']:
- for disable in opts['disable']:
- if gps.VERB_QUIET < opts['verbosity']:
- print('%s: disable %s\n' % (PROG_NAME, disable))
- args = disable.split(',')
- disable = args[0].upper()
- if disable in gps_model.able_commands:
- command = gps_model.able_commands[disable]
- command["command"](gps_model, 0, disable[1:])
- else:
- sys.stderr.write('%s: disable %s not found\n' %
- (PROG_NAME, disable))
- sys.exit(1)
- if opts['enable']:
- for enable in opts['enable']:
- if gps.VERB_QUIET < opts['verbosity']:
- print('%s: enable %s\n' % (PROG_NAME, enable))
- args = enable.split(',')
- enable = args[0].upper()
- if enable in gps_model.able_commands:
- command = gps_model.able_commands[enable]
- command["command"](gps_model, 1, args[1:])
- else:
- sys.stderr.write('%s: enable %s not found\n' %
- (PROG_NAME, enable))
- sys.exit(1)
- if opts['poll']:
- for poll in opts['poll']:
- if gps.VERB_QUIET < opts['verbosity']:
- print('%s: poll %s\n' % (PROG_NAME, poll))
- args = poll.split(',')
- poll = args[0].upper()
- if poll in gps_model.commands:
- command = gps_model.commands[poll]
- if (('minVer' in command and
- opts['protver'] < command['minVer'])):
- print('%s: WARNING poll %s requires protVer >= %s '
- 'you have %s\n' %
- (PROG_NAME, poll, command['minVer'],
- opts['protver']))
- if (('maxVer' in command and
- opts['protver'] > command['maxVer'])):
- print('%s: WARNING poll %s requires protVer <= %s '
- 'you have %s\n' %
- (PROG_NAME, poll, command['maxVer'],
- opts['protver']))
- if 'opt' in command:
- command["command"](gps_model, command["opt"])
- elif 'args' in command:
- # pass on args, except arg[0]
- command["command"](gps_model, args[1:])
- else:
- command["command"](gps_model)
- else:
- sys.stderr.write('%s: poll %s not found\n' %
- (PROG_NAME, poll))
- sys.exit(1)
- elif opts['set_speed'] is not None:
- gps_model.send_set_speed(opts['set_speed'])
- elif opts['command'] is not None:
- cmd_list = opts['command'].split(',')
- try:
- cmd_data = [int(v, 16) for v in cmd_list]
- except ValueError:
- badarg = True
- else:
- data_or = reduce(operator.or_, cmd_data)
- badarg = data_or != data_or & 0xFF
- if badarg or len(cmd_list) < 2:
- sys.stderr.write('%s: Argument format (hex bytes) is'
- ' class,id[,payload...]\n' % PROG_NAME)
- sys.exit(1)
- payload = bytearray(cmd_data[2:])
- if gps.VERB_QUIET < opts['verbosity']:
- print('%s: command %s\n' % (PROG_NAME, opts['command']))
- gps_model.gps_send(cmd_data[0], cmd_data[1], payload)
- elif opts['del_item']:
- keys = []
- for name in opts['del_item']:
- item = gps_model.cfg_by_name(name)
- if item:
- keys.append(item[1])
- else:
- sys.stderr.write('%s: ERROR: item %s unknown\n' %
- (PROG_NAME, opts['del_item']))
- sys.exit(1)
- gps_model.send_cfg_valdel(keys)
- elif opts['get_item']:
- keys = []
- layer = None
- position = 0
- end = None
- for item in opts['get_item']:
- parts = item.split(',')
- item = gps_model.cfg_by_name(parts[0].upper())
- if item:
- keys.append(item[1])
- else:
- sys.stderr.write('%s: ERROR: item %s unknown\n' %
- (PROG_NAME, parts[0]))
- sys.exit(1)
- try:
- if 1 < len(parts):
- layer = int(parts[1])
- if 2 < len(parts):
- position = int(parts[2])
- if 3 < len(parts):
- end = int(parts[3])
- except ValueError:
- sys.stderr.write('%s: ERROR: -g %s, non numeric parameter\n' %
- (PROG_NAME, parts[0]))
- sys.exit(1)
- print("layer %s position %d end %s " % (layer, position, end))
- if end:
- for p in range(position, end, 64):
- gps_model.send_cfg_valget(keys, layer, p)
- else:
- gps_model.send_cfg_valget(keys, layer, position)
- elif opts['set_item']:
- nvs = []
- for nv in opts['set_item']:
- parts = nv.split(',')
- if 1 >= len(parts):
- sys.stderr.write('%s: ERROR: item %s missing value to set\n' %
- (PROG_NAME, parts[0]))
- sys.exit(1)
- item = gps_model.cfg_by_name(parts[0])
- if item:
- nvs.append(nv)
- else:
- sys.stderr.write('%s: ERROR: item %s unknown\n' %
- (PROG_NAME, parts[0]))
- sys.exit(1)
- gps_model.send_cfg_valset(nvs)
- exit_code = io_handle.read(gps_model.decode_msg, # decode function
- opts['input_wait']) # raw input
- if ((gps.VERB_RAW <= opts['verbosity']) and io_handle.out):
- # dump raw left overs
- print("Left over data:")
- print(io_handle.out)
- sys.stdout.flush()
- io_handle.ser.close()
- except KeyboardInterrupt:
- print('')
- exit_code = 1
- sys.exit(exit_code)
- # vim: set expandtab shiftwidth=4
|