123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- #!/usr/bin/env python3
- #
- # This file is Copyright 2010 by the GPSD project
- # SPDX-License-Identifier: BSD-2-clause
- """
- cycle_analyzer - perform cycle analysis on GPS log files
- This tool analyzes one or more NMEA or JSON files to determine the
- cycle sequence of sentences. JSON files must be reports from a gpsd
- driver which is without the CYCLE_END_RELIABLE capability and
- therefore ships every sentence containing a fix; otherwise the results
- will be meaningless because only the end-of-cycle sentences will show
- in the JSON.
- If a filename argument ends with '.log', and the sentence type in it
- is not recognizable, this tool adds '.chk' to the name and tries again
- assuming the latter is a JSON dump. Thus, invoking it again *.log in
- a directory full of check files will do the right thing.
- One purpose of this tool is to determine the end-of-cycle sentence
- that a binary-protocol device emits, so the result can be patched into
- the driver as a CYCLE_END_RELIABLE capability. To get this, apply the
- tool to the JSON output from the driver using the -j switch. It will
- ignore everything but tag and timestamp fields, and will also ignore
- any NMEA in the file.
- Another purpose is to sanity-check the assumptions of the NMEA
- end-of-cycle detector. For this purpose, run without -j; if a device
- has a regular reporting cycle with a constant end-of-cycle sentence,
- this tool will confirm that. Otherwise, it will perform various
- checks attempting to find an end-of-cycle marker and report on what it
- finds.
- When cycle_analyzer reports a split- or variable-cycle device, some arguments
- to the -d switch can dump various analysis stages so you can get a better
- idea what is going on. These are:
- sequence - the entire sequence of dump tag/timestamp pairs from the log
- events - show how those reduce to event sequences
- bursts - show how sentences are grouped into bursts
- trim - show the burst list after the end bursts have been removed
- In an event sequence, a '<' is a nornal start of cycle where the
- timestamp increments. A '>' is where the timestamp actually
- *decreases* between a a sentence and the one that follows. The NMEA
- cycle-end detector will ignore the '>' event; it sometimes occurs when
- GPZDA starts a cycle, but has no effect on where the actual end of
- fix reporting is.
- If you see a message saying 'cycle-enders ... also occur in mid-cycle', the
- device will confuse the NMEA cycle detector, leading to more reports per cycle
- than the ideal.
- """
- # This code runs compatibly under Python 2 and 3.x for x >= 2.
- # Preserve this property!
- from __future__ import absolute_import, print_function, division
- import getopt
- import gps
- import json
- import os
- import sys
- verbose = 0
- suppress_regular = False
- parse_json = False
- class analyze_error(BaseException):
- def __init__(self, filename, msg):
- self.filename = filename
- self.msg = msg
- def __repr__(self):
- return '%s: %s' % (self.filename, self.msg)
- class event(object):
- def __init__(self, tag, time=0):
- self.tag = tag
- self.time = time
- def __str__(self):
- if self.time == 0:
- return self.tag
- else:
- return self.tag + ":" + self.time
- __repr__ = __str__
- def tags(lst):
- return [x.tag for x in lst]
- def extract_from_nmea(filename, lineno, line):
- "Extend sequence of tag/timestamp tuples from an NMEA sentence"
- hhmmss = {
- "RMC": 1,
- "GLL": 5,
- "GGA": 1,
- "GBS": 1,
- "PASHR": {"POS": 4},
- }
- fields = line.split(",")
- tag = fields[0]
- if tag.startswith("$GP") or tag.startswith("$IN"):
- tag = tag[3:]
- elif tag[0] == "$":
- tag = tag[1:]
- field = hhmmss.get(tag)
- if isinstance(field, dict):
- field = field.get(fields[1])
- if field:
- timestamp = fields[field]
- return [event(tag, timestamp)]
- else:
- return []
- def extract_from_json(filename, lineno, line):
- "Extend sequence of tag/timestamp tuples from a JSON dump of a sentence"
- if not line.startswith("{"):
- return []
- try:
- sentence = json.loads(line)
- if "time" not in sentence:
- return []
- return [event(gps.polystr(sentence["class"]), "%.2f" %
- gps.isotime(sentence["time"]))]
- except ValueError as e:
- print(line.rstrip(), file=sys.stderr)
- print(repr(e), file=sys.stderr)
- return []
- def extract_timestamped_sentences(fp, json_parse=parse_json):
- "Do the basic work of extracting tags and timestamps"
- sequence = []
- lineno = 0
- while True:
- line = gps.polystr(fp.readline())
- if not line:
- break
- lineno += 1
- if line.startswith("#"):
- continue
- if line[0] not in ("$", "!", "{"):
- raise analyze_error(fp.name, "unknown sentence type.")
- if not json_parse and line.startswith("$"):
- sequence += extract_from_nmea(fp.name, lineno, line)
- elif json_parse and line.startswith("{"):
- sequence += extract_from_json(fp.name, lineno, line)
- return sequence
- def analyze(sequence, name):
- "Analyze the cycle sequence of a device from its output logs."
- # First, extract tags and timestamps
- regular = False
- if not sequence:
- return
- if "sequence" in stages:
- print("Raw tag/timestamp sequence")
- for e in sequence:
- print(e)
- # Then, do cycle detection
- events = []
- out_of_order = False
- for i in range(len(sequence)):
- this = sequence[i]
- if this.time == "" or float(this.time) == 0:
- continue
- events.append(this)
- if i < len(sequence)-1:
- next = sequence[i+1]
- if float(this.time) < float(next.time):
- events.append(event("<"))
- if float(this.time) > float(next.time):
- events.append(event(">"))
- out_of_order = True
- if out_of_order and verbose:
- sys.stderr.write("%s: has some timestamps out of order.\n" % name)
- if "events" in stages:
- print("Event list:")
- for e in events:
- print(e)
- # Now group events into bursts
- bursts = []
- current = []
- for e in events + [event('<')]:
- if e.tag == '<':
- bursts.append(tuple(current))
- current = []
- else:
- current.append(e)
- if "bursts" in stages:
- print("Burst list:")
- for burst in bursts:
- print(burst)
- # We need 4 cycles because the first and last might be incomplete.
- if tags(events).count("<") < 4:
- sys.stderr.write("%s: has fewer than 4 cycles.\n" % name)
- return
- # First try at detecting a regular cycle
- unequal = False
- for i in range(len(bursts)-1):
- if tags(bursts[i]) != tags(bursts[i+1]):
- unequal = True
- break
- if not unequal:
- # All bursts looked the same
- regular = True
- else:
- # Trim off first and last bursts, which are likely incomplete.
- bursts = bursts[1:-1]
- if "trim" in stages:
- "After trimming:"
- for burst in bursts:
- print(burst)
- # Now the actual clique analysis
- unequal = False
- for i in range(len(bursts)-1):
- if tags(bursts[i]) != tags(bursts[i+1]):
- unequal = True
- break
- if not unequal:
- regular = True
- # Should know now if cycle is regular
- if regular:
- if not suppress_regular:
- print("%s: has a regular cycle %s." %
- (name, " ".join(tags(bursts[0]))))
- else:
- # If it was not the case that all cycles matched, then we need
- # a minimum of 6 cycles because the first and last might be
- # incomplete, and we need at least 4 cycles in the middle to
- # have two full ones on split-cycle devices like old Garmins.
- if tags(events).count("<") < 6:
- sys.stderr.write("%s: variable-cycle log has has fewer "
- "than 6 cycles.\n" % name)
- return
- if verbose > 0:
- print("%s: has a split or variable cycle." % name)
- cycle_enders = []
- for burst in bursts:
- if burst[-1].tag not in cycle_enders:
- cycle_enders.append(burst[-1].tag)
- if len(cycle_enders) == 1:
- if not suppress_regular:
- print("%s: has a fixed end-of-cycle sentence %s." %
- (name, cycle_enders[0]))
- else:
- print("%s: has multiple cycle-enders %s." %
- (name, " ".join(cycle_enders)))
- # Sanity check
- pathological = []
- for ender in cycle_enders:
- for burst in bursts:
- if ((ender in tags(burst) and
- not ender == burst[-1].tag and
- ender not in pathological)):
- pathological.append(ender)
- if pathological:
- print("%s: cycle-enders %s also occur in mid-cycle!" %
- (name, " ".join(pathological)))
- if __name__ == "__main__":
- stages = ""
- try:
- (options, arguments) = getopt.getopt(sys.argv[1:], "d:jsv")
- for (switch, val) in options:
- if (switch == '-d'): # Debug
- stages = val
- elif (switch == '-j'): # Interpret JSON, not NMEA
- parse_json = True
- elif (switch == '-v'): # Verbose
- verbose += 1
- elif (switch == '-s'): # Suppress logs with no problems
- suppress_regular = True
- except getopt.GetoptError as msg:
- print("cycle_analyzer: " + str(msg))
- raise SystemExit(1)
- try:
- if arguments:
- for filename in arguments:
- fp = open(filename, 'rb')
- try:
- sequence = extract_timestamped_sentences(fp)
- analyze(sequence, filename)
- except analyze_error as e:
- if filename.endswith(".log") and os.path.exists(filename +
- ".chk"):
- fp2 = open(filename+".chk", "rb")
- try:
- sequence = extract_timestamped_sentences(
- fp2,
- json_parse=True)
- analyze(sequence, filename+".chk")
- finally:
- fp2.close()
- else:
- print(repr(e), file=sys.stderr)
- fp.close()
- else:
- sequence = extract_timestamped_sentences(sys.stdin)
- analyze(sequence, "standard input")
- except analyze_error as e:
- print(repr(e), file=sys.stderr)
- raise SystemExit(1)
- # vim: set expandtab shiftwidth=4
|