example_aiogps.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2019 Grand Joldes (grandwork2@yahoo.com).
  4. #
  5. # This file is Copyright 2019 by the GPSD project
  6. #
  7. # SPDX-License-Identifier: BSD-2-clause
  8. # This code run compatibly under Python 3.x for x >= 6.
  9. """
  10. Example of using the asyncio Python interface to GPSD. This example
  11. demonstrates error handling by the application code when aiogps is not
  12. configured to handle automatic re-connection.
  13. """
  14. import asyncio
  15. import logging
  16. import gps.aiogps
  17. async def get_gps_updates(gpsd: gps.aiogps.aiogps) -> None:
  18. """ Receives and prints messages from GPSD.
  19. The GPS status information is updated within aiogps every time a new
  20. message is received.
  21. This function also demonstrates what error messages can be expected when
  22. auto reconnection is not used in aiogps (reconnect = 0).
  23. """
  24. while True:
  25. try:
  26. async for msg in gpsd:
  27. # Print received message
  28. logging.info(f'Received: {msg}')
  29. except asyncio.CancelledError:
  30. return
  31. except asyncio.IncompleteReadError:
  32. logging.info('Connection closed by server')
  33. except asyncio.TimeoutError:
  34. logging.error('Timeout waiting for gpsd to respond')
  35. except Exception as exc: # pylint: disable=W0703
  36. logging.error(f'Error: {exc}')
  37. # Try again in 1s
  38. await asyncio.sleep(1)
  39. async def print_gps_info(gpsd: gps.aiogps.aiogps) -> None:
  40. """ Prints GPS status every 5s """
  41. while True:
  42. try:
  43. await asyncio.sleep(5)
  44. logging.info(f'\nGPS status:\n{gpsd}')
  45. except asyncio.CancelledError:
  46. return
  47. except Exception as exc: # pylint: disable=W0703
  48. logging.error(f'Error: {exc}')
  49. async def main():
  50. """ Main coroutine - executes 2 asyncio tasks in parallel """
  51. try:
  52. # Example of using custom connection configuration
  53. async with gps.aiogps.aiogps(
  54. connection_args={
  55. 'host': '127.0.0.1',
  56. 'port': 2947
  57. },
  58. connection_timeout=5,
  59. reconnect=0, # do not reconnect, raise errors
  60. alive_opts={
  61. 'rx_timeout': 5
  62. }
  63. ) as gpsd:
  64. # These tasks will be executed in parallel
  65. await asyncio.gather(
  66. get_gps_updates(gpsd),
  67. print_gps_info(gpsd),
  68. return_exceptions=True
  69. )
  70. except asyncio.CancelledError:
  71. return
  72. except Exception as exc: # pylint: disable=W0703
  73. logging.error(f'Error: {exc}')
  74. def run():
  75. """
  76. Main function.
  77. Because this code only compiles on Python versions >= 3.6,
  78. it is not run directly, but through the example_aiogps_run wrapper,
  79. which fails gracefully on unsupported Python versions.
  80. """
  81. # Set up logging program logging
  82. logging.basicConfig()
  83. logging.root.setLevel(logging.INFO)
  84. # Example of setting up logging level for aiogps - this setting will
  85. # prevent all aiogps events from being logged
  86. logging.getLogger('gps.aiogps').setLevel(logging.CRITICAL)
  87. loop = asyncio.events.new_event_loop()
  88. try:
  89. asyncio.events.set_event_loop(loop)
  90. loop.run_until_complete(main())
  91. except KeyboardInterrupt:
  92. print('Got keyboard interrupt.')
  93. finally:
  94. print('Exiting.')
  95. try:
  96. for task in asyncio.Task.all_tasks():
  97. task.cancel()
  98. finally:
  99. loop.run_until_complete(loop.shutdown_asyncgens())
  100. asyncio.events.set_event_loop(None)
  101. loop.close()
  102. # vim: set expandtab shiftwidth=4