aiohttp.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from __future__ import annotations
  2. import json
  3. from aiohttp import ClientSession, ClientResponse, ClientTimeout, BaseConnector, FormData
  4. from typing import AsyncIterator, Any, Optional
  5. from .defaults import DEFAULT_HEADERS
  6. from ..errors import MissingRequirementsError
  7. class StreamResponse(ClientResponse):
  8. async def iter_lines(self) -> AsyncIterator[bytes]:
  9. async for line in self.content:
  10. yield line.rstrip(b"\r\n")
  11. async def iter_content(self) -> AsyncIterator[bytes]:
  12. async for chunk in self.content.iter_any():
  13. yield chunk
  14. async def json(self, content_type: str = None) -> Any:
  15. return await super().json(content_type=content_type)
  16. async def sse(self) -> AsyncIterator[dict]:
  17. """Asynchronously iterate over the Server-Sent Events of the response."""
  18. async for line in self.content:
  19. if line.startswith(b"data: "):
  20. chunk = line[6:]
  21. if chunk.startswith(b"[DONE]"):
  22. break
  23. try:
  24. yield json.loads(chunk)
  25. except json.JSONDecodeError:
  26. continue
  27. class StreamSession(ClientSession):
  28. def __init__(
  29. self,
  30. headers: dict = {},
  31. timeout: int = None,
  32. connector: BaseConnector = None,
  33. proxy: str = None,
  34. proxies: dict = {},
  35. impersonate = None,
  36. **kwargs
  37. ):
  38. if impersonate:
  39. headers = {
  40. **DEFAULT_HEADERS,
  41. **headers
  42. }
  43. connect = None
  44. if isinstance(timeout, tuple):
  45. connect, timeout = timeout;
  46. if timeout is not None:
  47. timeout = ClientTimeout(timeout, connect)
  48. if proxy is None:
  49. proxy = proxies.get("all", proxies.get("https"))
  50. super().__init__(
  51. **kwargs,
  52. timeout=timeout,
  53. response_class=StreamResponse,
  54. connector=get_connector(connector, proxy),
  55. headers=headers
  56. )
  57. def get_connector(connector: BaseConnector = None, proxy: str = None, rdns: bool = False) -> Optional[BaseConnector]:
  58. if proxy and not connector:
  59. try:
  60. from aiohttp_socks import ProxyConnector
  61. if proxy.startswith("socks5h://"):
  62. proxy = proxy.replace("socks5h://", "socks5://")
  63. rdns = True
  64. connector = ProxyConnector.from_url(proxy, rdns=rdns)
  65. except ImportError:
  66. raise MissingRequirementsError('Install "aiohttp_socks" package for proxy support')
  67. return connector