|
- #!/usr/bin/env python
- """
- Usage: leapsecond.py [-v] { [-h] | [-f filename] | [-g filename]
- | [-H filename] | [-I isodate] | [-O unixdate]
- | [-i rfcdate] | [-o unixdate] | [-n MMMYYYY] }
- Options:
- -I take a date in ISO8601 format and convert to Unix-UTC time
- -O take a date in Unix-UTC time and convert to ISO8601.
- -i take a date in RFC822 format and convert to Unix-UTC time
- -o take a date in Unix-UTC time and convert to RFC822.
- -f fetch leap-second offset data and save to local cache file
- -H make leapsecond include
- -h print this help
- -v be verbose
- -g generate a plot of leap-second dates over time. The command you
- probably want is something like (depending on if your gnuplot install
- does or does not support X11.
- leapsecond.py -g leapseconds.cache | gnuplot --persist
- leapsecond.py -g leapseconds.cache | gnuplot -e 'set terminal svg' - \\
- | display
- -n compute Unix gmt time for an IERS leap-second event given as a
- three-letter English Gregorian month abbreviation followed by a
- 4-digit year.
- Public urls and local cache file used:
- http://hpiers.obspm.fr/iers/bul/bulc/bulletinc.dat
- http://hpiers.obspm.fr/iers/bul/bulc/UTC-TAI.history
- ftp://maia.usno.navy.mil/ser7/tai-utc.dat
- leapseconds.cache
- This file is Copyright (c) 2013 by the GPSD project
- SPDX-License-Identifier: BSD-2-clause
- """
- # This code runs compatibly under Python 2 and 3.x for x >= 2.
- # Preserve this property!
- from __future__ import absolute_import, print_function, division
- import calendar
- import math
- import os
- import random
- import re
- import signal
- import sys
- import time
- try:
- import urllib.request as urlrequest # Python 3
- except ImportError:
- import urllib as urlrequest # Python 2
- # Set a socket timeout for slow servers
- import socket
- socket.setdefaulttimeout(30)
- del socket
- # *** Duplicate some code from gps.misc to avoid a dependency ***
- # Determine a single class for testing "stringness"
- try:
- STR_CLASS = basestring # Base class for 'str' and 'unicode' in Python 2
- except NameError:
- STR_CLASS = str # In Python 3, 'str' is the base class
- # Polymorphic str/bytes handling
- BINARY_ENCODING = 'latin-1'
- if bytes is str: # In Python 2 these functions can be null transformations
- polystr = str
- else: # Otherwise we do something real
- def polystr(o):
- "Convert bytes or str to str with proper encoding."
- if isinstance(o, str):
- return o
- if isinstance(o, bytes):
- return str(o, encoding=BINARY_ENCODING)
- raise ValueError
- def isotime(s):
- """Convert timestamps in ISO8601 format to and from Unix time including
- optional fractional seconds.
- """
- if isinstance(s, int):
- return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(s))
- if isinstance(s, float):
- date = int(s)
- msec = s - date
- date = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(s))
- return date + "." + repr(msec)[3:]
- if isinstance(s, STR_CLASS):
- if s[-1] == "Z":
- s = s[:-1]
- if "." in s:
- (date, msec) = s.split(".")
- else:
- date = s
- msec = "0"
- # Note: no leap-second correction!
- return calendar.timegm(time.strptime(date, "%Y-%m-%dT%H:%M:%S")) \
- + float("0." + msec)
- # else:
- raise TypeError
- # *** End of duplicated code ***
- verbose = 0
- __locations = [
- (
- # U.S. Navy's offset-history file
- "ftp://maia.usno.navy.mil/ser7/tai-utc.dat",
- r" TAI-UTC= +([0-9-]+)[^\n]*\n$",
- 1,
- 19, # Magic TAI-GPS offset -> (leapseconds 1980)
- "ftp://maia.usno.navy.mil/ser7/tai-utc.dat",
- ),
- (
- # International Earth Rotation Service Bulletin C
- "http://hpiers.obspm.fr/iers/bul/bulc/bulletinc.dat",
- r" UTC-TAI = ([0-9-]+)",
- -1,
- 19, # Magic TAI-GPS offset -> (leapseconds 1980)
- "http://hpiers.obspm.fr/iers/bul/bulc/UTC-TAI.history",
- ),
- ]
- GPS_EPOCH = 315964800 # 6 Jan 1980 00:00:00
- SECS_PER_WEEK = 60 * 60 * 24 * 7 # Seconds per GPS week
- ROLLOVER = 1024 # 10-bit week rollover
- def gps_week(t):
- return (t - GPS_EPOCH) // SECS_PER_WEEK % ROLLOVER
- def gps_rollovers(t):
- return (t - GPS_EPOCH) // SECS_PER_WEEK // ROLLOVER
- def retrieve():
- "Retrieve current leap-second from Web sources."
- random.shuffle(__locations) # To spread the load
- for (url, regexp, sign, offset, _) in __locations:
- try:
- if os.path.exists(url):
- ifp = open(url)
- else:
- ifp = urlrequest.urlopen(url)
- txt = polystr(ifp.read())
- ifp.close()
- if verbose:
- sys.stderr.write("%s\n" % txt)
- m = re.search(regexp, txt)
- if m:
- return int(m.group(1)) * sign - offset
- except IOError:
- if verbose:
- sys.stderr.write("IOError: %s\n" % url)
- return None
- def last_insertion_time():
- "Give last potential insertion time for a leap second."
- # We need the Unix times for midnights Jan 1 and Jul 1 this year.
- when = time.gmtime()
- (tm_year, tm_mon, tm_mday, tm_hour, tm_min,
- tm_sec, tm_wday, tm_yday, tm_isdst) = when
- tm_mday = 1
- tm_hour = tm_min = tm_sec = 0
- tm_mon = 1
- jan_t = (tm_year, tm_mon, tm_mday, tm_hour, tm_min,
- tm_sec, tm_wday, tm_yday, tm_isdst)
- jan = int(calendar.timegm(jan_t))
- tm_mon = 7
- jul_t = (tm_year, tm_mon, tm_mday, tm_hour, tm_min,
- tm_sec, tm_wday, tm_yday, tm_isdst)
- jul = int(calendar.timegm(jul_t))
- # We have the UTC times of the potential insertion points this year.
- now = time.time()
- if now > jul:
- return jul
- return jan
- def save_leapseconds(outfile):
- """Fetch the leap-second history data and make a leap-second list since
- Unix epoch GMT (1970-01-01T00:00:00).
- """
- random.shuffle(__locations) # To spread the load
- for (_, _, _, _, url) in __locations:
- skip = True
- try:
- fetchobj = urlrequest.urlopen(url)
- except IOError:
- sys.stderr.write("Fetch from %s failed.\n" % url)
- continue
- # This code assumes that after 1980, leap-second increments are
- # always integrally one second and every increment is listed here
- fp = open(outfile, "w")
- for line in fetchobj:
- line = polystr(line)
- if verbose:
- sys.stderr.write("%s\n" % line[:-1])
- if line.startswith(" 1980"):
- skip = False
- if skip:
- continue
- fields = line.strip().split()
- if len(fields) < 2:
- continue
- md = leapbound(fields[0], fields[1])
- if verbose:
- sys.stderr.write("# %s\n" % md)
- fp.write(repr(iso_to_unix(md)) + "\t# " + str(md) + "\n")
- fp.close()
- return
- sys.stderr.write("%s not updated.\n" % outfile)
- def fetch_leapsecs(filename):
- "Get a list of leap seconds from the local cache of the USNO history"
- leapsecs = []
- for line in open(str(filename)):
- leapsecs.append(float(line.strip().split()[0]))
- return leapsecs
- def make_leapsecond_include(infile):
- """Get the current leap second count and century from the local cache
- usable as C preprocessor #define
- """
- # Underscore prefixes avoids warning W0612 from pylint,
- # which doesn't count substitution through locals() as use.
- leapjumps = fetch_leapsecs(infile)
- now = int(time.time())
- _century = time.strftime("%Y", time.gmtime(now))[:2] + "00"
- _week = gps_week(now)
- _rollovers = gps_rollovers(now)
- _isodate = isotime(now - now % SECS_PER_WEEK)
- _leapsecs = 0
- for leapjump in leapjumps:
- if leapjump < time.time():
- _leapsecs += 1
- return """\
- /*
- * Constants used for GPS time detection and rollover correction.
- *
- * Correct for week beginning %(_isodate)s
- *
- * Autogenerated, do not edit. All changes will be undone.
- */
- #define BUILD_CENTURY\t%(_century)s
- #define BUILD_WEEK\t%(_week)d # Assumes 10-bit week counter
- #define BUILD_LEAPSECONDS\t%(_leapsecs)d
- #define BUILD_ROLLOVERS\t%(_rollovers)d # Assumes 10-bit week counter
- /* Autogenerated, do not edit. All changes will be undone. */
- """ % locals()
- def conditional_leapsecond_fetch(outfile, timeout):
- """Conditionally fetch leapsecond data,
- w. timeout in case of evil firewalls.
- """
- if not os.path.exists(outfile):
- stale = True
- else:
- # If there can't have been a leapsecond insertion since the
- # last time the cache was updated, we don't need to refresh.
- # This test cuts way down on the frequency with which we fetch.
- stale = last_insertion_time() > os.path.getmtime(outfile)
- if not stale:
- return True
- def handler(_signum, _frame):
- raise IOError
- try:
- signal.signal(signal.SIGALRM, handler)
- except ValueError:
- # Parallel builds trigger this - signal only works in main thread
- sys.stdout.write("Signal set failed; ")
- return False
- signal.alarm(timeout)
- sys.stdout.write("Attempting leap-second fetch...")
- try:
- save_leapseconds(outfile)
- sys.stdout.write("succeeded.\n")
- except IOError:
- sys.stdout.write("failed; ")
- return False
- signal.alarm(0)
- return True
- def leastsquares(tuples):
- "Generate coefficients for a least-squares fit to the specified data."
- sum_x = 0
- sum_y = 0
- sum_xx = 0
- sum_xy = 0
- for (x, y) in tuples:
- sum_x = sum_x + x
- sum_y = sum_y + y
- xx = math.pow(x, 2)
- sum_xx = sum_xx + xx
- xy = x * y
- sum_xy = sum_xy + xy
- n = len(tuples)
- c = (-sum_x * sum_xy + sum_xx * sum_y) / (n * sum_xx - sum_x * sum_x)
- b = (-sum_x * sum_y + n * sum_xy) / (n * sum_xx - sum_x * sum_x)
- # y = b * x + c
- maxerr = 0
- for (x, y) in tuples:
- err = y - (x * b + c)
- if err > maxerr:
- maxerr = err
- return (b, c, maxerr)
- def iso_to_unix(tv):
- "Local Unix time to iso date."
- return calendar.timegm(time.strptime(tv, "%Y-%m-%dT%H:%M:%S"))
- def graph_history(filename):
- "Generate a GNUPLOT plot of the leap-second history."
- raw = fetch_leapsecs(filename)
- (b, c, e) = leastsquares(list(zip(list(range(len(raw))), raw)))
- e /= (60 * 60 * 24 * 7)
- dates = [time.strftime("%Y-%m-%d", time.localtime(t)) for t in raw]
- # Adding 190 days to scale
- enddate = time.strftime("%Y-%m-%d", time.localtime(raw[-1] + 16416000))
- fmt = ''
- fmt += '# Least-squares approximation of Unix time from leapsecond is:\n'
- fmt += 'lsq(x) = %s * x + %s\n' % (b, c)
- fmt += '# Maximum residual error is %.2f weeks\n' % e
- fmt += 'set autoscale\n'
- fmt += 'set ylabel "GPS-UTC (s)"\n'
- fmt += 'set yrange [-1:%d]\n' % (len(dates))
- fmt += 'set xlabel "Leap second date"\n'
- fmt += 'set xtics rotate by 300\n'
- fmt += 'set timefmt "%Y-%m-%d"\n'
- fmt += 'set xdata time\n'
- fmt += 'set format x "%Y-%m-%d"\n'
- fmt += 'set xrange ["%s":"%s"]\n' % ("1979-09-01", enddate)
- fmt += 'set key left top box\n'
- fmt += 'plot "-" using 3:1 title "Leap second inserted" with points ;\n'
- for (i, (r, d)) in enumerate(zip(raw, dates)):
- fmt += "%d\t%s\t%s\n" % (i, r, d)
- fmt += 'e\n'
- print(fmt)
- def rfc822_to_unix(tv):
- "Local Unix time to RFC822 date."
- return calendar.timegm(time.strptime(tv, "%d %b %Y %H:%M:%S"))
- def unix_to_rfc822(tv):
- "RFC822 date to gmt Unix time."
- return time.strftime("%d %b %Y %H:%M:%S", time.gmtime(tv))
- def printnext(val):
- "Compute Unix time correponsing to a scheduled leap second."
- if val[:3].lower() not in ("jun", "dec"):
- sys.stderr.write("leapsecond.py: -n argument must begin with "
- "'Jun' or 'Dec'\n")
- raise SystemExit(1)
- # else:
- month = val[:3].lower()
- if len(val) != 7:
- sys.stderr.wrrite("leapsecond.py: -n argument must be of "
- "the form {jun|dec}nnnn.\n")
- raise SystemExit(1)
- try:
- year = int(val[3:])
- except ValueError:
- sys.stderr.write("leapsecond.py: -n argument must end "
- "with a 4-digit year.\n")
- raise SystemExit(1)
- # Date looks valid
- tv = leapbound(year, month)
- print("%d /* %s */" % (iso_to_unix(tv), tv))
- def leapbound(year, month):
- "Return a leap-second date in RFC822 form."
- # USNO lists JAN and JUL (month following the leap second).
- # IERS lists DEC. and JUN. (month preceding the leap second).
- # Note: It is also possible for leap seconds to occur in end-Mar and
- # end-Sep although none have occurred yet
- if month.upper()[:3] == "JAN":
- tv = "%s-12-31T23:59:60" % (int(year) - 1)
- elif month.upper()[:3] in ("JUN", "JUL"):
- tv = "%s-06-30T23:59:59" % year
- elif month.upper()[:3] == "DEC":
- tv = "%s-12-31T23:59:59" % year
- return tv
- # Main part
- def usage():
- print(__doc__)
- raise SystemExit(0)
- if __name__ == '__main__':
- import getopt
- (options, arguments) = getopt.getopt(sys.argv[1:], "hvf:g:H:i:n:o:I:O:")
- for (switch, val) in options:
- if switch == '-h': # help, get usage only
- usage()
- elif switch == '-v': # be verbose
- verbose = 1
- elif switch == '-f': # Fetch USNO data to cache locally
- save_leapseconds(val)
- raise SystemExit(0)
- elif switch == '-g': # Graph the leap_second history
- graph_history(val)
- raise SystemExit(0)
- elif switch == '-H': # make leapsecond include
- sys.stdout.write(make_leapsecond_include(val))
- raise SystemExit(0)
- elif switch == '-i': # Compute Unix time from RFC822 date
- print(rfc822_to_unix(val))
- raise SystemExit(0)
- elif switch == '-n': # Compute possible next leapsecond
- printnext(val)
- raise SystemExit(0)
- elif switch == '-o': # Compute RFC822 date from Unix time
- print(unix_to_rfc822(float(val)))
- raise SystemExit(0)
- elif switch == '-I': # Compute Unix time from ISO8601 date
- print(isotime(val))
- raise SystemExit(0)
- elif switch == '-O': # Compute ISO8601 date from Unix time
- print(isotime(float(val)))
- raise SystemExit(0)
- # End
|