aiogps.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2019 Grand Joldes (grandwork2@yahoo.com).
  4. #
  5. # This file is Copyright 2019 by the GPSD project
  6. # SPDX-License-Identifier: BSD-2-clause
  7. # This code run compatibly under Python 3.x for x >= 6.
  8. # Codacy D203 and D211 conflict, I choose D203
  9. # Codacy D212 and D213 conflict, I choose D212
  10. """aiogps.py -- Asyncio Python interface to GPSD.
  11. This module adds asyncio support to the Python gps interface. It runs on
  12. Python versions >= 3.6 and provides the following benefits:
  13. - easy integration in asyncio applications (all I/O operations done through
  14. non-blocking coroutines, async context manager, async iterator);
  15. - support for cancellation (all operations are cancellable);
  16. - support for timeouts (on both read and connect);
  17. - support for connection keep-alive (using the TCP keep alive mechanism)
  18. - support for automatic re-connection;
  19. - configurable connection parameters;
  20. - configurable exception handling (internally or by application);
  21. - logging support (logger name: 'gps.aiogps').
  22. The use of timeouts, keepalive and automatic reconnection make possible easy
  23. handling of GPSD connections over unreliable networks.
  24. Examples:
  25. import logging
  26. import gps.aiogps
  27. # configuring logging
  28. logging.basicConfig()
  29. logging.root.setLevel(logging.INFO)
  30. # Example of setting up logging level for the aiogps logger
  31. logging.getLogger('gps.aiogps').setLevel(logging.ERROR)
  32. # using default parameters
  33. async with gps.aiogps.aiogps() as gpsd:
  34. async for msg in gpsd:
  35. # Log last message
  36. logging.info(f'Received: {msg}')
  37. # Log updated GPS status
  38. logging.info(f'\nGPS status:\n{gpsd}')
  39. # using custom parameters
  40. try:
  41. async with gps.aiogps.aiogps(
  42. connection_args = {
  43. 'host': '192.168.10.116',
  44. 'port': 2947
  45. },
  46. connection_timeout = 5,
  47. reconnect = 0, # do not try to reconnect, raise exceptions
  48. alive_opts = {
  49. 'rx_timeout': 5
  50. }
  51. ) as gpsd:
  52. async for msg in gpsd:
  53. logging.info(msg)
  54. except asyncio.CancelledError:
  55. return
  56. except asyncio.IncompleteReadError:
  57. logging.info('Connection closed by server')
  58. except asyncio.TimeoutError:
  59. logging.error('Timeout waiting for gpsd to respond')
  60. except Exception as exc:
  61. logging.error(f'Error: {exc}')
  62. """
  63. __all__ = ['aiogps', ]
  64. import asyncio
  65. import logging
  66. import socket
  67. from typing import Optional, Union, Awaitable
  68. from .client import gpsjson, dictwrapper
  69. from .gps import gps, gpsdata, WATCH_ENABLE, PACKET_SET
  70. from .misc import polystr, polybytes
  71. class aiogps(gps): # pylint: disable=R0902
  72. """An asyncio gps client.
  73. Reimplements all gps IO methods using asyncio coros. Adds connection
  74. management, an asyncio context manager and an asyncio iterator.
  75. The class uses a logger named 'gps.aiogps' to record events. The logger is
  76. configured with a NullHandler to disable any message logging until the
  77. application configures another handler.
  78. """
  79. def __init__(self, # pylint: disable=W0231
  80. connection_args: Optional[dict] = None,
  81. connection_timeout: Optional[float] = None,
  82. reconnect: Optional[float] = 2,
  83. alive_opts: Optional[dict] = None) -> None:
  84. """Arguments:
  85. connection_args: arguments needed for opening a connection.
  86. These will be passed directly to asyncio.open_connection.
  87. If set to None, a connection to the default gps host and port
  88. will be attempded.
  89. connection_timeout: time to wait for a connection to complete
  90. (seconds). Set to None to disable.
  91. reconnect: configures automatic reconnections:
  92. - 0: reconnection is not attempted in case of an error and the
  93. error is raised to the user;
  94. - number > 0: delay until next reconnection attempt (seconds).
  95. alive_opts: options related to detection of disconnections.
  96. Two mechanisms are supported: TCP keepalive (default, may not
  97. be available on all platforms) and Rx timeout, through the
  98. following options:
  99. - rx_timeout: Rx timeout (seconds). Set to None to disable.
  100. - SO_KEEPALIVE: socket keepalive and related parameters:
  101. - TCP_KEEPIDLE
  102. - TCP_KEEPINTVL
  103. - TCP_KEEPCNT
  104. """
  105. # If connection_args are not specified use defaults
  106. self.connection_args = connection_args or {
  107. 'host': self.host,
  108. 'port': self.port
  109. }
  110. self.connection_timeout = connection_timeout
  111. assert reconnect >= 0
  112. self.reconnect = reconnect
  113. # If alive_opts are not specified use defaults
  114. self.alive_opts = alive_opts or {
  115. 'rx_timeout': None,
  116. 'SO_KEEPALIVE': 1,
  117. 'TCP_KEEPIDLE': 2,
  118. 'TCP_KEEPINTVL': 2,
  119. 'TCP_KEEPCNT': 3
  120. }
  121. # Connection access streams
  122. self.reader: Optional[asyncio.StreamReader] = None
  123. self.writer: Optional[asyncio.StreamWriter] = None
  124. # Set up logging
  125. self.logger = logging.getLogger(__name__)
  126. # Set the Null handler - prevents logging message handling unless the
  127. # application sets up a handler.
  128. self.logger.addHandler(logging.NullHandler())
  129. # Init gps parents
  130. gpsdata.__init__(self) # pylint: disable=W0233
  131. gpsjson.__init__(self) # pylint: disable=W0233
  132. # Provide the response in both 'str' and 'bytes' form
  133. self.bresponse = b''
  134. self.response = polystr(self.bresponse)
  135. # Default stream command
  136. self.stream_command = self.generate_stream_command(WATCH_ENABLE)
  137. self.loop = self.connection_args.get('loop', asyncio.get_event_loop())
  138. self.valid = 0
  139. def __del__(self) -> None:
  140. """Destructor."""
  141. self.close()
  142. async def _open_connection(self) -> None:
  143. """Opens connection to GPSD server and configure the TCP socket."""
  144. self.logger.info(
  145. f"Connecting to gpsd at {self.connection_args['host']}" +
  146. (f":{self.connection_args['port']}"
  147. if self.connection_args['port'] else ''))
  148. self.reader, self.writer = await asyncio.wait_for(
  149. asyncio.open_connection(**self.connection_args),
  150. self.connection_timeout,
  151. loop=self.loop)
  152. # Set socket options
  153. sock = self.writer.get_extra_info('socket')
  154. if sock is not None:
  155. if 'SO_KEEPALIVE' in self.alive_opts:
  156. sock.setsockopt(socket.SOL_SOCKET,
  157. socket.SO_KEEPALIVE,
  158. self.alive_opts['SO_KEEPALIVE'])
  159. if hasattr(
  160. sock,
  161. 'TCP_KEEPIDLE') and 'TCP_KEEPIDLE' in self.alive_opts:
  162. sock.setsockopt(socket.IPPROTO_TCP,
  163. socket.TCP_KEEPIDLE, # pylint: disable=E1101
  164. self.alive_opts['TCP_KEEPIDLE'])
  165. if hasattr(
  166. sock,
  167. 'TCP_KEEPINTVL') and 'TCP_KEEPINTVL' in self.alive_opts:
  168. sock.setsockopt(socket.IPPROTO_TCP,
  169. socket.TCP_KEEPINTVL, # pylint: disable=E1101
  170. self.alive_opts['TCP_KEEPINTVL'])
  171. if hasattr(
  172. sock,
  173. 'TCP_KEEPCNT') and 'TCP_KEEPCNT' in self.alive_opts:
  174. sock.setsockopt(socket.IPPROTO_TCP,
  175. socket.TCP_KEEPCNT,
  176. self.alive_opts['TCP_KEEPCNT'])
  177. def close(self) -> None:
  178. """Closes connection to GPSD server."""
  179. if self.writer:
  180. try:
  181. self.writer.close()
  182. except Exception: # pylint: disable=W0703
  183. pass
  184. self.writer = None
  185. def waiting(self) -> bool: # pylint: disable=W0221
  186. """Mask the blocking waiting method from gpscommon."""
  187. return True
  188. async def read(self) -> Union[dictwrapper, str]:
  189. """Reads data from GPSD server."""
  190. while True:
  191. await self.connect()
  192. try:
  193. rx_timeout = self.alive_opts.get('rx_timeout', None)
  194. reader = self.reader.readuntil(separator=b'\n')
  195. self.bresponse = await asyncio.wait_for(reader,
  196. rx_timeout,
  197. loop=self.loop)
  198. self.response = polystr(self.bresponse)
  199. if self.response.startswith(
  200. "{") and self.response.endswith("}\r\n"):
  201. self.unpack(self.response)
  202. self._oldstyle_shim()
  203. self.valid |= PACKET_SET
  204. return self.data
  205. return self.response
  206. except asyncio.CancelledError:
  207. self.close()
  208. raise
  209. except Exception as exc: # pylint: disable=W0703
  210. error = 'timeout' if isinstance(
  211. exc, asyncio.TimeoutError) else exc
  212. self.logger.warning(
  213. f'Failed to get message from GPSD: {error}')
  214. self.close()
  215. if self.reconnect:
  216. # Try again later
  217. await asyncio.sleep(self.reconnect)
  218. else:
  219. raise
  220. async def connect(self) -> None: # pylint: disable=W0221
  221. """Connects to GPSD server and starts streaming data."""
  222. while not self.writer:
  223. try:
  224. await self._open_connection()
  225. await self.stream()
  226. self.logger.info('Connected to gpsd')
  227. except asyncio.CancelledError:
  228. self.close()
  229. raise
  230. except Exception as exc: # pylint: disable=W0703
  231. error = 'timeout' if isinstance(
  232. exc, asyncio.TimeoutError) else exc
  233. self.logger.error(f'Failed to connect to GPSD: {error}')
  234. self.close()
  235. if self.reconnect:
  236. # Try again later
  237. await asyncio.sleep(self.reconnect)
  238. else:
  239. raise
  240. async def send(self, commands) -> None:
  241. """Sends commands."""
  242. bcommands = polybytes(commands + "\n")
  243. if self.writer:
  244. self.writer.write(bcommands)
  245. await self.writer.drain()
  246. async def stream(self, flags: Optional[int] = 0,
  247. devpath: Optional[str] = None) -> None:
  248. """Creates and sends the stream command."""
  249. if flags > 0:
  250. # Update the stream command
  251. self.stream_command = self.generate_stream_command(flags, devpath)
  252. if self.stream_command:
  253. self.logger.info(f'Sent stream as: {self.stream_command}')
  254. await self.send(self.stream_command)
  255. else:
  256. raise TypeError(f'Invalid streaming command: {flags}')
  257. async def __aenter__(self) -> 'aiogps':
  258. """Context manager entry."""
  259. return self
  260. async def __aexit__(self, exc_type, exc, traceback) -> None:
  261. """Context manager exit: close connection."""
  262. self.close()
  263. def __aiter__(self) -> 'aiogps':
  264. """Async iterator interface."""
  265. return self
  266. async def __anext__(self) -> Union[dictwrapper, str]:
  267. """Returns next message from GPSD."""
  268. data = await self.read()
  269. return data
  270. def __next__(self) -> Awaitable:
  271. """Reimplementation of the blocking iterator from gps.
  272. Returns an awaitable which returns the next message from GPSD.
  273. """
  274. return self.read()
  275. # vim: set expandtab shiftwidth=4