ubxtool.py.in 17 KB


  1. #!@PYSHEBANG@
  2. # -*- coding: UTF-8
  3. # @GENERATED@
  4. # This file is Copyright 2018 by the GPSD project
  5. # SPDX-License-Identifier: BSD-2-clause
  6. #
  7. # This code runs compatibly under Python 2 and 3.x for x >= 2.
  8. # Preserve this property!
  9. #
  10. # ENVIRONMENT:
  11. # Options in the UBXOPTS environment variable will be parsed before
  12. # the CLI options. A handy place to put your '-f /dev/ttyXX -s SPEED'
  13. #
  14. # To see what constellations are enabled:
  15. # ubxtool -p CFG-GNSS -f /dev/ttyXX
  16. #
  17. # To disable GLONASS and enable GALILEO:
  18. # ubxtool -d GLONASS -f /dev/ttyXX
  19. # ubxtool -e GALILEO -f /dev/ttyXX
  20. #
  21. # To read GPS messages a log file:
  22. # ubxtool -v 2 -f test/daemon/ublox-neo-m8n.log
  23. #
  24. # References:
  25. # [1] IS-GPS-200K
  26. # Codacy D203 and D211 conflict, I choose D203
  27. # Codacy D212 and D213 conflict, I choose D212
  28. """ubxtool -- u-blox configurator and packet decoder.
  29. usage: ubxtool [OPTIONS] [host[:port[:device]]]
  30. """
  31. from __future__ import absolute_import, print_function, division
  32. import argparse # to parse CLI options
  33. from functools import reduce # pylint: disable=redefined-builtin
  34. import operator # for or_
  35. import os # for os.environ
  36. import re # for regular expressions
  37. import struct # for pack()
  38. import sys
  39. PROG_NAME = 'ubxtool'
  40. try:
  41. import gps
  42. import gps.ubx
  43. except ImportError:
  44. # PEP8 says local imports last
  45. sys.stderr.write("%s: failed to import gps, check PYTHONPATH\n" %
  46. PROG_NAME)
  47. sys.exit(2)
  48. gps_version = '@VERSION@'
  49. if gps.__version__ != gps_version:
  50. sys.stderr.write("%s: ERROR: need gps module version %s, got %s\n" %
  51. (PROG_NAME, gps_version, gps.__version__))
  52. sys.exit(1)
  53. # Some old versions of Python fail to accept a bytearray as an input to
  54. # struct.unpack_from, though it can be worked around by wrapping it with
  55. # buffer(). Since the fix is only needed in rare cases, this monkey-patches
  56. # struct.unpack_from() when needed, and otherwise changes nothing. If
  57. # struct.unpack() were used, it would need similar treatment, as would
  58. # methods from struct.Struct if that were used.
  59. try:
  60. struct.unpack_from('B', bytearray(1))
  61. except TypeError:
  62. unpack_from_orig = struct.unpack_from
  63. def unpack_from_fixed(fmt, buf, offset=0):
  64. """Unpack_from_fixed."""
  65. return unpack_from_orig(fmt, buffer(buf), offset=offset)
  66. struct.unpack_from = unpack_from_fixed
  67. # opts['protver'], protocol version for sent commands
  68. # u-blox 5, firmware 4 to 6 is protver 10 to 12
  69. # u-blox 6, firmware 6 to 7 is protver 12 to 13
  70. # u-blox 6, firmware 1 is protver 14
  71. # u-blox 7, firmware 1 is protver 14
  72. # u-blox 8, is protver 15 to 23
  73. # u-blox 9, firmware 1 is protver 27
  74. # u-blox F9T, firmware 2 is protver 29
  75. # u-blox F9N, firmware 4 is protver 32
  76. # instantiate the GPS class
  77. gps_model = gps.ubx.ubx()
  78. if 'UBXOPTS' in os.environ:
  79. # grab the UBXOPTS environment variable to be handled as prepended options
  80. options = os.environ['UBXOPTS'].split(' ') + sys.argv[1:]
  81. else:
  82. options = sys.argv[1:]
  83. # "m:")
  84. usage = '%(prog)s [OPTIONS] [host[:port[:device]]]'
  85. epilog = ('BSD terms apply: see the file COPYING in the distribution '
  86. 'root for details.')
  87. parser = argparse.ArgumentParser(add_help=False, epilog=epilog, usage=usage)
  88. parser.add_argument(
  89. '-?', '-h', '--help',
  90. action="store_true",
  91. dest='help',
  92. default=False,
  93. help='Show this help message and exit. Use -v 2, or -v 3, for extra help'
  94. )
  95. parser.add_argument(
  96. '-c', '--command',
  97. dest='command',
  98. default=None,
  99. metavar='CMD',
  100. help='Send raw command CMD (cls,id...) to receiver'
  101. )
  102. parser.add_argument(
  103. '-d', '--disable',
  104. action='append',
  105. dest='disable',
  106. default=None,
  107. metavar='ABLE',
  108. help='Disable ABLE in the receiver. May be used multiple times. ',
  109. )
  110. parser.add_argument(
  111. '-e', '--enable',
  112. action='append',
  113. dest='enable',
  114. default=None,
  115. metavar='ABLE',
  116. help='Enable ABLE in the receiver. May be used multiple times. ',
  117. )
  118. parser.add_argument(
  119. '--device',
  120. dest='gpsd_device',
  121. default=None,
  122. metavar='DEVICE',
  123. help='The gpsd device to connect to. [Default %(default)s)]',
  124. )
  125. parser.add_argument(
  126. '-f', '--file',
  127. dest='input_file_name',
  128. default=None,
  129. metavar='FILE',
  130. help='Read from FILE instead of a gpsd instance.',
  131. )
  132. parser.add_argument(
  133. '-g', '--getitem',
  134. action='append',
  135. dest='get_item',
  136. default=None,
  137. metavar='ITEM,LAYER,POSITION',
  138. help=('Get ITEM from LAYER and POSITION. LAYER and POSITION are '
  139. 'optional. May be used multiple times.'),
  140. )
  141. parser.add_argument(
  142. '--host',
  143. dest='gpsd_host',
  144. default=None,
  145. metavar='HOST',
  146. help='The gpsd host to connect to.',
  147. )
  148. parser.add_argument(
  149. '-i', '-portid',
  150. dest='port',
  151. default=None,
  152. metavar='PORTID',
  153. help=('Specifies receiver PORTID (interface) for port-related commands. '
  154. '[Default %(default)s)]'),
  155. )
  156. parser.add_argument(
  157. '--port',
  158. dest='gpsd_port',
  159. default=gps.GPSD_PORT,
  160. metavar='PORT',
  161. type=int,
  162. help='The gpsd port to connect to. [Default %(default)s)]',
  163. )
  164. parser.add_argument(
  165. '-p', '--preset',
  166. action='append',
  167. dest='poll',
  168. metavar='PRESET',
  169. help='Poll the receiver for PRESET. May be used multiple times.',
  170. )
  171. parser.add_argument(
  172. '-P', '--protver',
  173. dest='protver',
  174. default=10.0,
  175. type=float,
  176. help='Protocol version for sending commands. [Default %(default)s)]',
  177. )
  178. parser.add_argument(
  179. '-r', '--readonly',
  180. action='store_true',
  181. dest='read_only',
  182. default=False,
  183. help='Read only. Do not send anything to the GPS.',
  184. )
  185. parser.add_argument(
  186. '-R', '--rawfile',
  187. dest='raw_file',
  188. default=None,
  189. metavar='FILE',
  190. help='Save raw data from receiver in FILE\n',
  191. )
  192. parser.add_argument(
  193. '-s', '--inspeed',
  194. # FIXME: speeds s/b in io class
  195. choices=gps_model.speeds,
  196. dest='input_speed',
  197. default=9600,
  198. metavar='SPEED',
  199. type=int,
  200. help='Set local serial port speed to SPEED bps.',
  201. )
  202. parser.add_argument(
  203. '-S', '--setspeed',
  204. # FIXME: speeds s/b in io class
  205. choices=gps_model.speeds,
  206. dest='set_speed',
  207. metavar='SPEED',
  208. type=int,
  209. help='Configure receiver speed to SETSPEED.',
  210. )
  211. parser.add_argument(
  212. '-t', '--timestamp',
  213. action='count',
  214. dest='timestamp',
  215. default=0,
  216. help='Timestamp messages with seconds since UNIX epoch. Use -tt for UTC',
  217. )
  218. parser.add_argument(
  219. '-v', '--verbosity',
  220. dest='verbosity',
  221. default=0,
  222. metavar='VERB',
  223. type=int,
  224. help='Set verbosity level to V, 0 to 5. [Default %(default)s)]',
  225. )
  226. parser.add_argument(
  227. '-V', '--version',
  228. action='version',
  229. version="%(prog)s: Version " + gps_version + "\n",
  230. help='Output version to stderr, then exit',
  231. )
  232. parser.add_argument(
  233. '-w', '--wait',
  234. dest='input_wait',
  235. default=2.0,
  236. metavar='WAIT',
  237. type=float,
  238. help='Wait for WAIT seconds before exiting.. [Default %(default)s)]',
  239. )
  240. parser.add_argument(
  241. '-x', '--delitem',
  242. action='append',
  243. dest='del_item',
  244. default=None,
  245. metavar='ITEM',
  246. help=('Delete ITEM from receiver BBR and FLASH layers. '
  247. 'May be used multiple times. '),
  248. )
  249. parser.add_argument(
  250. '-z', '--setitem',
  251. action='append',
  252. dest='set_item',
  253. default=None,
  254. metavar='ITEM,VAL[,LAYER]',
  255. help=('Set ITEM in receiver to VAL in LAYER. LAYER is optional. '
  256. 'May be used multiple times. '),
  257. )
  258. parser.add_argument(
  259. 'target',
  260. nargs='?',
  261. help='[host[:port[:device]]]',
  262. )
  263. # turn the stupid Namespace into a nice dictionary
  264. opts = vars(parser.parse_args(options))
  265. gps_model.port = opts['port']
  266. gps_model.protver = opts['protver']
  267. gps_model.read_only = opts['read_only']
  268. gps_model.timestamp = opts['timestamp']
  269. gps_model.verbosity = opts['verbosity']
  270. if opts['port']:
  271. # FIXME: better error message
  272. valnum = gps_model.port_id_map.get(val.upper())
  273. opts['port'] = valnum if valnum is not None else int(opt['port'])
  274. if opts['help']:
  275. parser.print_help()
  276. if gps.VERB_DECODE <= opts['verbosity']:
  277. print('\nABLE for -d/--disable and -e/--enable can be one of:')
  278. for item in sorted(gps_model.able_commands.keys()):
  279. print(" %-13s %s" %
  280. (item, gps_model.able_commands[item]["help"]))
  281. print('\nPRESET for -p can be one of:')
  282. for item in sorted(gps_model.commands.keys()):
  283. print(" %-13s %s" % (item, gps_model.commands[item]["help"]))
  284. print('\n')
  285. if gps.VERB_DECODE < opts['verbosity']:
  286. print('\nITEM for -g/--getitem, -x/--delitem and -z/--setitem '
  287. 'can be one of:')
  288. for item in sorted(gps_model.cfgs):
  289. print(" %s\n"
  290. " %s" % (item[0], item[5]))
  291. print('\n')
  292. print('Options can be placed in the UBXOPTS environment variable.\n'
  293. 'UBXOPTS is processed before the CLI options.')
  294. sys.exit(0)
  295. if opts['input_file_name']:
  296. # input file given
  297. if opts['target']:
  298. sys.stderr.write('%s: ERROR: both input file and target given.\n' %
  299. (PROG_NAME,))
  300. sys.exit(0)
  301. elif opts['target']:
  302. # TODO: move to module gps as a function
  303. # host[:port[:device]]
  304. # or maybe ::device
  305. # or maybe \[ipv6\][:port[:device]]
  306. if '[' == opts['target'][0]:
  307. # hex, or hex+zoneindex IPv6 address
  308. # could be like [fe80::1ff:fe23:4567:890a%eth2]
  309. match = re.match(r'''\[([^]]+)\](.*)''', opts['target'])
  310. opts['gpsd_host'] = match.group(1)
  311. parts = match.group(2).split(':')
  312. else:
  313. # maybe IPv4 address, maybe hostname
  314. parts = opts['target'].split(':')
  315. if parts[0]:
  316. opts['gpsd_host'] = parts[0]
  317. if 1 < len(parts):
  318. if parts[1]:
  319. opts['gpsd_port'] = parts[1]
  320. if 2 < len(parts) and parts[2]:
  321. opts['gpsd_device'] = parts[2]
  322. else:
  323. # else, use defaults: localhost:2947:
  324. if not opts['gpsd_host']:
  325. opts['gpsd_host'] = 'localhost'
  326. if gps.VERB_PROG <= opts['verbosity']:
  327. # dump versions and all options
  328. print('%s: Version %s\n' % (PROG_NAME, gps_version))
  329. print('Options:')
  330. for option in sorted(opts):
  331. print(" %s: %s" % (option, opts[option]))
  332. # done parsing arguments from environment and CLI
  333. try:
  334. # raw log file requested?
  335. raw = None
  336. if opts['raw_file']:
  337. try:
  338. raw = open(opts['raw_file'], 'wb')
  339. except IOError:
  340. sys.stderr.write('%s: failed to open raw file %s\n' %
  341. (PROG_NAME, opts['raw_file']))
  342. sys.exit(1)
  343. # Check if there is a conflict
  344. # create the I/O instance
  345. io_handle = gps.gps_io(
  346. opts['input_file_name'],
  347. opts['read_only'],
  348. opts['gpsd_host'],
  349. opts['gpsd_port'],
  350. opts['gpsd_device'],
  351. write_requested=(opts['disable'] or opts['enable'] or opts['poll'])
  352. )
  353. gps_model.io_handle = io_handle
  354. sys.stdout.flush()
  355. if opts['disable']:
  356. for disable in opts['disable']:
  357. if gps.VERB_QUIET < opts['verbosity']:
  358. print('%s: disable %s\n' % (PROG_NAME, disable))
  359. args = disable.split(',')
  360. disable = args[0].upper()
  361. if disable in gps_model.able_commands:
  362. command = gps_model.able_commands[disable]
  363. command["command"](gps_model, 0, disable[1:])
  364. else:
  365. sys.stderr.write('%s: disable %s not found\n' %
  366. (PROG_NAME, disable))
  367. sys.exit(1)
  368. if opts['enable']:
  369. for enable in opts['enable']:
  370. if gps.VERB_QUIET < opts['verbosity']:
  371. print('%s: enable %s\n' % (PROG_NAME, enable))
  372. args = enable.split(',')
  373. enable = args[0].upper()
  374. if enable in gps_model.able_commands:
  375. command = gps_model.able_commands[enable]
  376. command["command"](gps_model, 1, args[1:])
  377. else:
  378. sys.stderr.write('%s: enable %s not found\n' %
  379. (PROG_NAME, enable))
  380. sys.exit(1)
  381. if opts['poll']:
  382. for poll in opts['poll']:
  383. if gps.VERB_QUIET < opts['verbosity']:
  384. print('%s: poll %s\n' % (PROG_NAME, poll))
  385. args = poll.split(',')
  386. poll = args[0].upper()
  387. if poll in gps_model.commands:
  388. command = gps_model.commands[poll]
  389. if (('minVer' in command and
  390. opts['protver'] < command['minVer'])):
  391. print('%s: WARNING poll %s requires protVer >= %s '
  392. 'you have %s\n' %
  393. (PROG_NAME, poll, command['minVer'],
  394. opts['protver']))
  395. if (('maxVer' in command and
  396. opts['protver'] > command['maxVer'])):
  397. print('%s: WARNING poll %s requires protVer <= %s '
  398. 'you have %s\n' %
  399. (PROG_NAME, poll, command['maxVer'],
  400. opts['protver']))
  401. if 'opt' in command:
  402. command["command"](gps_model, command["opt"])
  403. elif 'args' in command:
  404. # pass on args, except arg[0]
  405. command["command"](gps_model, args[1:])
  406. else:
  407. command["command"](gps_model)
  408. else:
  409. sys.stderr.write('%s: poll %s not found\n' %
  410. (PROG_NAME, poll))
  411. sys.exit(1)
  412. elif opts['set_speed'] is not None:
  413. gps_model.send_set_speed(opts['set_speed'])
  414. elif opts['command'] is not None:
  415. cmd_list = opts['command'].split(',')
  416. try:
  417. cmd_data = [int(v, 16) for v in cmd_list]
  418. except ValueError:
  419. badarg = True
  420. else:
  421. data_or = reduce(operator.or_, cmd_data)
  422. badarg = data_or != data_or & 0xFF
  423. if badarg or len(cmd_list) < 2:
  424. sys.stderr.write('%s: Argument format (hex bytes) is'
  425. ' class,id[,payload...]\n' % PROG_NAME)
  426. sys.exit(1)
  427. payload = bytearray(cmd_data[2:])
  428. if gps.VERB_QUIET < opts['verbosity']:
  429. print('%s: command %s\n' % (PROG_NAME, opts['command']))
  430. gps_model.gps_send(cmd_data[0], cmd_data[1], payload)
  431. elif opts['del_item']:
  432. keys = []
  433. for name in opts['del_item']:
  434. item = gps_model.cfg_by_name(name)
  435. if item:
  436. keys.append(item[1])
  437. else:
  438. sys.stderr.write('%s: ERROR: item %s unknown\n' %
  439. (PROG_NAME, opts['del_item']))
  440. sys.exit(1)
  441. gps_model.send_cfg_valdel(keys)
  442. elif opts['get_item']:
  443. keys = []
  444. layer = None
  445. position = 0
  446. end = None
  447. for item in opts['get_item']:
  448. parts = item.split(',')
  449. item = gps_model.cfg_by_name(parts[0].upper())
  450. if item:
  451. keys.append(item[1])
  452. else:
  453. sys.stderr.write('%s: ERROR: item %s unknown\n' %
  454. (PROG_NAME, parts[0]))
  455. sys.exit(1)
  456. try:
  457. if 1 < len(parts):
  458. layer = int(parts[1])
  459. if 2 < len(parts):
  460. position = int(parts[2])
  461. if 3 < len(parts):
  462. end = int(parts[3])
  463. except ValueError:
  464. sys.stderr.write('%s: ERROR: -g %s, non numeric parameter\n' %
  465. (PROG_NAME, parts[0]))
  466. sys.exit(1)
  467. print("layer %s position %d end %s " % (layer, position, end))
  468. if end:
  469. for p in range(position, end, 64):
  470. gps_model.send_cfg_valget(keys, layer, p)
  471. else:
  472. gps_model.send_cfg_valget(keys, layer, position)
  473. elif opts['set_item']:
  474. nvs = []
  475. for nv in opts['set_item']:
  476. parts = nv.split(',')
  477. if 1 >= len(parts):
  478. sys.stderr.write('%s: ERROR: item %s missing value to set\n' %
  479. (PROG_NAME, parts[0]))
  480. sys.exit(1)
  481. item = gps_model.cfg_by_name(parts[0])
  482. if item:
  483. nvs.append(nv)
  484. else:
  485. sys.stderr.write('%s: ERROR: item %s unknown\n' %
  486. (PROG_NAME, parts[0]))
  487. sys.exit(1)
  488. gps_model.send_cfg_valset(nvs)
  489. exit_code = io_handle.read(gps_model.decode_msg, # decode function
  490. opts['input_wait']) # raw input
  491. if ((gps.VERB_RAW <= opts['verbosity']) and io_handle.out):
  492. # dump raw left overs
  493. print("Left over data:")
  494. print(io_handle.out)
  495. sys.stdout.flush()
  496. io_handle.ser.close()
  497. except KeyboardInterrupt:
  498. print('')
  499. exit_code = 1
  500. sys.exit(exit_code)
  501. # vim: set expandtab shiftwidth=4