aiogps.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2019 Grand Joldes (grandwork2@yahoo.com).
  4. #
  5. # This file is Copyright 2019 by the GPSD project
  6. # SPDX-License-Identifier: BSD-2-clause
  7. # This code run compatibly under Python 3.x for x >= 6.
  8. """aiogps.py -- Asyncio Python interface to GPSD.
  9. This module adds asyncio support to the Python gps interface. It runs on
  10. Python versions >= 3.6 and provides the following benefits:
  11. - easy integration in asyncio applications (all I/O operations done through
  12. non-blocking coroutines, async context manager, async iterator);
  13. - support for cancellation (all operations are cancellable);
  14. - support for timeouts (on both read and connect);
  15. - support for connection keep-alive (using the TCP keep alive mechanism)
  16. - support for automatic re-connection;
  17. - configurable connection parameters;
  18. - configurable exception handling (internally or by application);
  19. - logging support (logger name: 'gps.aiogps').
  20. The use of timeouts, keepalive and automatic reconnection make possible easy
  21. handling of GPSD connections over unreliable networks.
  22. Examples:
  23. import logging
  24. import gps.aiogps
  25. # configuring logging
  26. logging.basicConfig()
  27. logging.root.setLevel(logging.INFO)
  28. # Example of setting up logging level for the aiogps logger
  29. logging.getLogger('gps.aiogps').setLevel(logging.ERROR)
  30. # using default parameters
  31. async with gps.aiogps.aiogps() as gpsd:
  32. async for msg in gpsd:
  33. # Log last message
  34. logging.info(f'Received: {msg}')
  35. # Log updated GPS status
  36. logging.info(f'\nGPS status:\n{gpsd}')
  37. # using custom parameters
  38. try:
  39. async with gps.aiogps.aiogps(
  40. connection_args = {
  41. 'host': '192.168.10.116',
  42. 'port': 2947
  43. },
  44. connection_timeout = 5,
  45. reconnect = 0, # do not try to reconnect, raise exceptions
  46. alive_opts = {
  47. 'rx_timeout': 5
  48. }
  49. ) as gpsd:
  50. async for msg in gpsd:
  51. logging.info(msg)
  52. except asyncio.CancelledError:
  53. return
  54. except asyncio.IncompleteReadError:
  55. logging.info('Connection closed by server')
  56. except asyncio.TimeoutError:
  57. logging.error('Timeout waiting for gpsd to respond')
  58. except Exception as exc:
  59. logging.error(f'Error: {exc}')
  60. """
  61. __all__ = ['aiogps', ]
  62. import logging
  63. import asyncio
  64. import socket
  65. from typing import Optional, Union, Awaitable
  66. from .client import gpsjson, dictwrapper
  67. from .gps import gps, gpsdata, WATCH_ENABLE, PACKET_SET
  68. from .misc import polystr, polybytes
  69. class aiogps(gps): # pylint: disable=R0902
  70. """An asyncio gps client.
  71. Reimplements all gps IO methods using asyncio coros. Adds connection
  72. management, an asyncio context manager and an asyncio iterator.
  73. The class uses a logger named 'gps.aiogps' to record events. The logger is
  74. configured with a NullHandler to disable any message logging until the
  75. application configures another handler.
  76. """
  77. def __init__(self, # pylint: disable=W0231
  78. connection_args: Optional[dict] = None,
  79. connection_timeout: Optional[float] = None,
  80. reconnect: Optional[float] = 2,
  81. alive_opts: Optional[dict] = None) -> None:
  82. """
  83. Arguments:
  84. connection_args: arguments needed for opening a connection.
  85. These will be passed directly to asyncio.open_connection.
  86. If set to None, a connection to the default gps host and port
  87. will be attempded.
  88. connection_timeout: time to wait for a connection to complete
  89. (seconds). Set to None to disable.
  90. reconnect: configures automatic reconnections:
  91. - 0: reconnection is not attempted in case of an error and the
  92. error is raised to the user;
  93. - number > 0: delay until next reconnection attempt (seconds).
  94. alive_opts: options related to detection of disconnections.
  95. Two mechanisms are supported: TCP keepalive (default, may not be
  96. available on all platforms) and Rx timeout, through the
  97. following options:
  98. - rx_timeout: Rx timeout (seconds). Set to None to disable.
  99. - SO_KEEPALIVE: socket keepalive and related parameters:
  100. - TCP_KEEPIDLE
  101. - TCP_KEEPINTVL
  102. - TCP_KEEPCNT
  103. """
  104. # If connection_args are not specified use defaults
  105. self.connection_args = connection_args or {
  106. 'host': self.host,
  107. 'port': self.port
  108. }
  109. self.connection_timeout = connection_timeout
  110. assert reconnect >= 0
  111. self.reconnect = reconnect
  112. # If alive_opts are not specified use defaults
  113. self.alive_opts = alive_opts or {
  114. 'rx_timeout': None,
  115. 'SO_KEEPALIVE': 1,
  116. 'TCP_KEEPIDLE': 2,
  117. 'TCP_KEEPINTVL': 2,
  118. 'TCP_KEEPCNT': 3
  119. }
  120. # Connection access streams
  121. self.reader: Optional[asyncio.StreamReader] = None
  122. self.writer: Optional[asyncio.StreamWriter] = None
  123. # Set up logging
  124. self.logger = logging.getLogger(__name__)
  125. # Set the Null handler - prevents logging message handling unless the
  126. # application sets up a handler.
  127. self.logger.addHandler(logging.NullHandler())
  128. # Init gps parents
  129. gpsdata.__init__(self) # pylint: disable=W0233
  130. gpsjson.__init__(self) # pylint: disable=W0233
  131. # Provide the response in both 'str' and 'bytes' form
  132. self.bresponse = b''
  133. self.response = polystr(self.bresponse)
  134. # Default stream command
  135. self.stream_command = self.generate_stream_command(WATCH_ENABLE)
  136. self.loop = self.connection_args.get('loop', asyncio.get_event_loop())
  137. def __del__(self) -> None:
  138. """ Destructor """
  139. self.close()
  140. async def _open_connection(self) -> None:
  141. """
  142. Opens a connection to the GPSD server and configures the TCP socket.
  143. """
  144. self.logger.info(
  145. f"Connecting to gpsd at {self.connection_args['host']}" +
  146. (f":{self.connection_args['port']}"
  147. if self.connection_args['port'] else ''))
  148. self.reader, self.writer = await asyncio.wait_for(
  149. asyncio.open_connection(**self.connection_args),
  150. self.connection_timeout,
  151. loop=self.loop)
  152. # Set socket options
  153. sock = self.writer.get_extra_info('socket')
  154. if sock is not None:
  155. if 'SO_KEEPALIVE' in self.alive_opts:
  156. sock.setsockopt(socket.SOL_SOCKET,
  157. socket.SO_KEEPALIVE,
  158. self.alive_opts['SO_KEEPALIVE'])
  159. if hasattr(
  160. sock,
  161. 'TCP_KEEPIDLE') and 'TCP_KEEPIDLE' in self.alive_opts:
  162. sock.setsockopt(socket.IPPROTO_TCP,
  163. socket.TCP_KEEPIDLE, # pylint: disable=E1101
  164. self.alive_opts['TCP_KEEPIDLE'])
  165. if hasattr(
  166. sock,
  167. 'TCP_KEEPINTVL') and 'TCP_KEEPINTVL' in self.alive_opts:
  168. sock.setsockopt(socket.IPPROTO_TCP,
  169. socket.TCP_KEEPINTVL, # pylint: disable=E1101
  170. self.alive_opts['TCP_KEEPINTVL'])
  171. if hasattr(
  172. sock,
  173. 'TCP_KEEPCNT') and 'TCP_KEEPCNT' in self.alive_opts:
  174. sock.setsockopt(socket.IPPROTO_TCP,
  175. socket.TCP_KEEPCNT,
  176. self.alive_opts['TCP_KEEPCNT'])
  177. def close(self) -> None:
  178. """ Closes connection to GPSD server """
  179. if self.writer:
  180. try:
  181. self.writer.close()
  182. except Exception: # pylint: disable=W0703
  183. pass
  184. self.writer = None
  185. def waiting(self) -> bool: # pylint: disable=W0221
  186. """ Mask the blocking waiting method from gpscommon """
  187. return True
  188. async def read(self) -> Union[dictwrapper, str]:
  189. """ Reads data from GPSD server """
  190. while True:
  191. await self.connect()
  192. try:
  193. rx_timeout = self.alive_opts.get('rx_timeout', None)
  194. reader = self.reader.readuntil(separator=b'\n')
  195. self.bresponse = await asyncio.wait_for(reader,
  196. rx_timeout,
  197. loop=self.loop)
  198. self.response = polystr(self.bresponse)
  199. if self.response.startswith(
  200. "{") and self.response.endswith("}\r\n"):
  201. self.unpack(self.response)
  202. self._oldstyle_shim()
  203. self.valid |= PACKET_SET
  204. return self.data
  205. return self.response
  206. except asyncio.CancelledError:
  207. self.close()
  208. raise
  209. except Exception as exc: # pylint: disable=W0703
  210. error = 'timeout' if isinstance(
  211. exc, asyncio.TimeoutError) else exc
  212. self.logger.warning(
  213. f'Failed to get message from GPSD: {error}')
  214. self.close()
  215. if self.reconnect:
  216. # Try again later
  217. await asyncio.sleep(self.reconnect)
  218. else:
  219. raise
  220. async def connect(self) -> None: # pylint: disable=W0221
  221. """ Connects to GPSD server and starts streaming data """
  222. while not self.writer:
  223. try:
  224. await self._open_connection()
  225. await self.stream()
  226. self.logger.info('Connected to gpsd')
  227. except asyncio.CancelledError:
  228. self.close()
  229. raise
  230. except Exception as exc: # pylint: disable=W0703
  231. error = 'timeout' if isinstance(
  232. exc, asyncio.TimeoutError) else exc
  233. self.logger.error(f'Failed to connect to GPSD: {error}')
  234. self.close()
  235. if self.reconnect:
  236. # Try again later
  237. await asyncio.sleep(self.reconnect)
  238. else:
  239. raise
  240. async def send(self, commands) -> None:
  241. """ Sends commands """
  242. bcommands = polybytes(commands + "\n")
  243. if self.writer:
  244. self.writer.write(bcommands)
  245. await self.writer.drain()
  246. async def stream(self, flags: Optional[int] = 0,
  247. devpath: Optional[str] = None) -> None:
  248. """ Creates and sends the stream command """
  249. if flags > 0:
  250. # Update the stream command
  251. self.stream_command = self.generate_stream_command(flags, devpath)
  252. if self.stream_command:
  253. self.logger.info(f'Sent stream as: {self.stream_command}')
  254. await self.send(self.stream_command)
  255. else:
  256. raise TypeError(f'Invalid streaming command: {flags}')
  257. async def __aenter__(self) -> 'aiogps':
  258. """ Context manager entry """
  259. return self
  260. async def __aexit__(self, exc_type, exc, traceback) -> None:
  261. """ Context manager exit: close connection """
  262. self.close()
  263. def __aiter__(self) -> 'aiogps':
  264. """ Async iterator interface """
  265. return self
  266. async def __anext__(self) -> Union[dictwrapper, str]:
  267. """ Returns next message from GPSD """
  268. data = await self.read()
  269. return data
  270. def __next__(self) -> Awaitable:
  271. """
  272. Reimplementation of the blocking iterator from gps.
  273. Returns an awaitable which returns the next message from GPSD.
  274. """
  275. return self.read()
  276. # vim: set expandtab shiftwidth=4