raise_for_status.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from __future__ import annotations
  2. from typing import Union
  3. from aiohttp import ClientResponse
  4. from requests import Response as RequestsResponse
  5. from ..errors import ResponseStatusError, RateLimitError
  6. from . import Response, StreamResponse
  7. class CloudflareError(ResponseStatusError):
  8. ...
  9. def is_cloudflare(text: str) -> bool:
  10. if "Generated by cloudfront" in text or '<p id="cf-spinner-please-wait">' in text:
  11. return True
  12. elif "<title>Attention Required! | Cloudflare</title>" in text or 'id="cf-cloudflare-status"' in text:
  13. return True
  14. return '<div id="cf-please-wait">' in text or "<title>Just a moment...</title>" in text
  15. def is_openai(text: str) -> bool:
  16. return "<p>Unable to load site</p>" in text or 'id="challenge-error-text"' in text
  17. async def raise_for_status_async(response: Union[StreamResponse, ClientResponse], message: str = None):
  18. if response.ok:
  19. return
  20. if message is None:
  21. content_type = response.headers.get("content-type", "")
  22. # if content_type.startswith("application/json"):
  23. # try:
  24. # data = await response.json()
  25. # message = data.get("error")
  26. # if isinstance(message, dict):
  27. # message = data.get("message")
  28. # except Exception:
  29. # pass
  30. # else:
  31. text = (await response.text()).strip()
  32. is_html = content_type.startswith("text/html") or text.startswith("<!DOCTYPE")
  33. message = "HTML content" if is_html else text
  34. if message is None or message == "HTML content":
  35. if response.status == 520:
  36. message = "Unknown error (Cloudflare)"
  37. elif response.status in (429, 402):
  38. message = "Rate limit"
  39. if response.status == 403 and is_cloudflare(text):
  40. raise CloudflareError(f"Response {response.status}: Cloudflare detected")
  41. elif response.status == 403 and is_openai(text):
  42. raise ResponseStatusError(f"Response {response.status}: OpenAI Bot detected")
  43. elif response.status == 502:
  44. raise ResponseStatusError(f"Response {response.status}: Bad gateway")
  45. else:
  46. raise ResponseStatusError(f"Response {response.status}: {message}")
  47. def raise_for_status(response: Union[Response, StreamResponse, ClientResponse, RequestsResponse], message: str = None):
  48. if hasattr(response, "status"):
  49. return raise_for_status_async(response, message)
  50. if response.ok:
  51. return
  52. if message is None:
  53. is_html = response.headers.get("content-type", "").startswith("text/html") or response.text.startswith("<!DOCTYPE")
  54. message = "HTML content" if is_html else response.text
  55. if message == "HTML content":
  56. if response.status_code == 520:
  57. message = "Unknown error (Cloudflare)"
  58. elif response.status_code in (429, 402):
  59. message = "Rate limit"
  60. raise RateLimitError(f"Response {response.status_code}: {message}")
  61. if response.status_code == 403 and is_cloudflare(response.text):
  62. raise CloudflareError(f"Response {response.status_code}: Cloudflare detected")
  63. elif response.status_code == 403 and is_openai(response.text):
  64. raise ResponseStatusError(f"Response {response.status_code}: OpenAI Bot detected")
  65. elif response.status_code == 502:
  66. raise ResponseStatusError(f"Response {response.status_code}: Bad gateway")
  67. else:
  68. raise ResponseStatusError(f"Response {response.status_code}: {message}")