asyncio.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from __future__ import annotations
  2. import asyncio
  3. from asyncio import AbstractEventLoop, runners
  4. from typing import Union, Callable, AsyncGenerator, Generator
  5. from ..errors import NestAsyncioError
  6. try:
  7. import nest_asyncio
  8. has_nest_asyncio = True
  9. except ImportError:
  10. has_nest_asyncio = False
  11. try:
  12. import uvloop
  13. has_uvloop = True
  14. except ImportError:
  15. has_uvloop = False
  16. def get_running_loop(check_nested: bool) -> Union[AbstractEventLoop, None]:
  17. try:
  18. loop = asyncio.get_running_loop()
  19. # Do not patch uvloop loop because its incompatible.
  20. if has_uvloop:
  21. if isinstance(loop, uvloop.Loop):
  22. return loop
  23. if not hasattr(loop.__class__, "_nest_patched"):
  24. if has_nest_asyncio:
  25. nest_asyncio.apply(loop)
  26. elif check_nested:
  27. raise NestAsyncioError('Install "nest_asyncio" package | pip install -U nest_asyncio')
  28. return loop
  29. except RuntimeError:
  30. pass
  31. # Fix for RuntimeError: async generator ignored GeneratorExit
  32. async def await_callback(callback: Callable):
  33. return await callback()
  34. async def async_generator_to_list(generator: AsyncGenerator) -> list:
  35. return [item async for item in generator]
  36. def to_sync_generator(generator: AsyncGenerator) -> Generator:
  37. loop = get_running_loop(check_nested=False)
  38. new_loop = False
  39. if loop is None:
  40. loop = asyncio.new_event_loop()
  41. asyncio.set_event_loop(loop)
  42. new_loop = True
  43. gen = generator.__aiter__()
  44. try:
  45. while True:
  46. yield loop.run_until_complete(await_callback(gen.__anext__))
  47. except StopAsyncIteration:
  48. pass
  49. finally:
  50. if new_loop:
  51. try:
  52. runners._cancel_all_tasks(loop)
  53. loop.run_until_complete(loop.shutdown_asyncgens())
  54. if hasattr(loop, "shutdown_default_executor"):
  55. loop.run_until_complete(loop.shutdown_default_executor())
  56. finally:
  57. asyncio.set_event_loop(None)
  58. loop.close()