123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- # packet.py - recognize GPS packet types
- # @GENERATED@
- #
- # This file is Copyright 2019 by the GPSD project
- # SPDX-License-Identifier: BSD-2-Clause
- #
- # This code runs compatibly under Python 2 and 3.x for x >= 2.
- # Preserve this property!
- #
- # -*- coding: utf-8 -*-
- """Python binding of the libgpsd module for recognizing GPS packets.
- The new() function returns a new packet-lexer instance. Lexer instances
- have two methods:
- get() takes a file descriptor argument and returns a tuple consisting of
- the integer packet type and string packet value. On end of file it returns
- (-1, '').
- reset() resets the packet-lexer to its initial state.
- The module also has a register_report() function that accepts a callback
- for debug message reporting. The callback will get two arguments, the error
- level of the message and the message itself.
- """
- from __future__ import absolute_import, print_function
- import ctypes
- import os
- import gps.misc
- # Packet types and Logging levels extracted from gpsd.h
- @PYPACKETH@
- def importado():
- """Load the packet library, from the directory where we reside."""
- packet_name = '@GPSPACKET@'
- packet_dir = os.path.dirname(__file__)
- packet_path = os.path.join(packet_dir, packet_name)
- return ctypes.CDLL(packet_path)
- _loaded = None
- _packet = importado()
- _lexer_size = ctypes.c_size_t.in_dll(_packet, "fvi_size_lexer")
- LEXER_SIZE = _lexer_size.value
- _buffer_size = ctypes.c_size_t.in_dll(_packet, "fvi_size_buffer").value
- REPORTER = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_char_p)
- class GpsdErrOutT(ctypes.Structure):
- '''Used in gps.packet:register_report() to set logging callback.'''
- # pylint: disable-msg=R0903
- _fields_ = [('debug', ctypes.c_int),
- ('report', REPORTER),
- ('label', ctypes.c_char_p)]
- class lexer_t(ctypes.Structure):
- '''Used in gps.packet:lexer.get() to pass in data and pull
- out length, packet type, packet, and another datum.'''
- # pylint: disable-msg=R0903
- _fields_ = [
- ('packet_type', ctypes.c_int),
- ('state', ctypes.c_uint),
- ('length', ctypes.c_size_t),
- ('inbuffer', ctypes.c_ubyte * _buffer_size),
- ('inbuflen', ctypes.c_size_t),
- ('inbufptr', ctypes.c_char_p),
- ('outbuffer', ctypes.c_ubyte * _buffer_size),
- ('outbuflen', ctypes.c_size_t),
- ('char_counter', ctypes.c_ulong),
- ('retry_counter', ctypes.c_ulong),
- ('counter', ctypes.c_uint),
- ('errout', GpsdErrOutT),
- ]
- def new():
- """new() -> new packet-self object"""
- return Lexer()
- def register_report(reporter):
- """register_report(callback)
- callback must be a callable object expecting a string as parameter."""
- global _loaded
- if callable(reporter):
- _loaded.errout.report = REPORTER(reporter)
- class Lexer():
- """GPS packet lexer object
- Fetch a single packet from file descriptor
- """
- pointer = None
- def __init__(self):
- global _loaded
- _packet.ffi_Lexer_init.restype = ctypes.POINTER(lexer_t)
- self.pointer = _packet.ffi_Lexer_init()
- _loaded = self.pointer.contents
- def get(self, file_handle):
- """Get a packet from a file descriptor."""
- global _loaded
- _packet.packet_get.restype = ctypes.c_int
- _packet.packet_get.argtypes = [ctypes.c_int, ctypes.POINTER(lexer_t)]
- length = _packet.packet_get(file_handle, self.pointer)
- _loaded = self.pointer.contents
- packet = ''
- for octet in range(_loaded.outbuflen):
- packet += chr(_loaded.outbuffer[octet])
- return [length,
- _loaded.packet_type,
- gps.misc.polybytes(packet),
- _loaded.char_counter]
- def reset(self):
- """Reset the packet self to ground state."""
- _packet.ffi_Lexer_init.restype = None
- _packet.ffi_Lexer_init.argtypes = [ctypes.POINTER(lexer_t)]
- _packet.ffi_Lexer_init(self.pointer)
- # vim: set expandtab shiftwidth=4
|