packet.py.in 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # packet.py - recognize GPS packet types
  2. # @GENERATED@
  3. #
  4. # This file is Copyright 2019 by the GPSD project
  5. # SPDX-License-Identifier: BSD-2-Clause
  6. #
  7. # This code runs compatibly under Python 2 and 3.x for x >= 2.
  8. # Preserve this property!
  9. #
  10. # -*- coding: utf-8 -*-
  11. """Python binding of the libgpsd module for recognizing GPS packets.
  12. The new() function returns a new packet-lexer instance. Lexer instances
  13. have two methods:
  14. get() takes a file descriptor argument and returns a tuple consisting of
  15. the integer packet type and string packet value. On end of file it returns
  16. (-1, '').
  17. reset() resets the packet-lexer to its initial state.
  18. The module also has a register_report() function that accepts a callback
  19. for debug message reporting. The callback will get two arguments, the error
  20. level of the message and the message itself.
  21. """
  22. from __future__ import absolute_import, print_function
  23. import ctypes
  24. import os
  25. import gps.misc
  26. # Packet types and Logging levels extracted from gpsd.h
  27. @PYPACKETH@
  28. def importado():
  29. """Load the packet library, from the directory where we reside."""
  30. packet_name = '@GPSPACKET@'
  31. packet_dir = os.path.dirname(__file__)
  32. packet_path = os.path.join(packet_dir, packet_name)
  33. return ctypes.CDLL(packet_path)
  34. _loaded = None
  35. _packet = importado()
  36. _lexer_size = ctypes.c_size_t.in_dll(_packet, "fvi_size_lexer")
  37. LEXER_SIZE = _lexer_size.value
  38. _buffer_size = ctypes.c_size_t.in_dll(_packet, "fvi_size_buffer").value
  39. REPORTER = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_char_p)
  40. class GpsdErrOutT(ctypes.Structure):
  41. '''Used in gps.packet:register_report() to set logging callback.'''
  42. # pylint: disable-msg=R0903
  43. _fields_ = [('debug', ctypes.c_int),
  44. ('report', REPORTER),
  45. ('label', ctypes.c_char_p)]
  46. class lexer_t(ctypes.Structure):
  47. '''Used in gps.packet:lexer.get() to pass in data and pull
  48. out length, packet type, packet, and another datum.'''
  49. # pylint: disable-msg=R0903
  50. _fields_ = [
  51. ('packet_type', ctypes.c_int),
  52. ('state', ctypes.c_uint),
  53. ('length', ctypes.c_size_t),
  54. ('inbuffer', ctypes.c_ubyte * _buffer_size),
  55. ('inbuflen', ctypes.c_size_t),
  56. ('inbufptr', ctypes.c_char_p),
  57. ('outbuffer', ctypes.c_ubyte * _buffer_size),
  58. ('outbuflen', ctypes.c_size_t),
  59. ('char_counter', ctypes.c_ulong),
  60. ('retry_counter', ctypes.c_ulong),
  61. ('counter', ctypes.c_uint),
  62. ('errout', GpsdErrOutT),
  63. ]
  64. def new():
  65. """new() -> new packet-self object"""
  66. return Lexer()
  67. def register_report(reporter):
  68. """register_report(callback)
  69. callback must be a callable object expecting a string as parameter."""
  70. global _loaded
  71. if callable(reporter):
  72. _loaded.errout.report = REPORTER(reporter)
  73. class Lexer():
  74. """GPS packet lexer object
  75. Fetch a single packet from file descriptor
  76. """
  77. pointer = None
  78. def __init__(self):
  79. global _loaded
  80. _packet.ffi_Lexer_init.restype = ctypes.POINTER(lexer_t)
  81. self.pointer = _packet.ffi_Lexer_init()
  82. _loaded = self.pointer.contents
  83. def get(self, file_handle):
  84. """Get a packet from a file descriptor."""
  85. global _loaded
  86. _packet.packet_get.restype = ctypes.c_int
  87. _packet.packet_get.argtypes = [ctypes.c_int, ctypes.POINTER(lexer_t)]
  88. length = _packet.packet_get(file_handle, self.pointer)
  89. _loaded = self.pointer.contents
  90. packet = ''
  91. for octet in range(_loaded.outbuflen):
  92. packet += chr(_loaded.outbuffer[octet])
  93. return [length,
  94. _loaded.packet_type,
  95. gps.misc.polybytes(packet),
  96. _loaded.char_counter]
  97. def reset(self):
  98. """Reset the packet self to ground state."""
  99. _packet.ffi_Lexer_init.restype = None
  100. _packet.ffi_Lexer_init.argtypes = [ctypes.POINTER(lexer_t)]
  101. _packet.ffi_Lexer_init(self.pointer)
  102. # vim: set expandtab shiftwidth=4