__init__.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from __future__ import annotations
  2. import os
  3. from urllib.parse import urlparse
  4. from typing import Iterator
  5. from http.cookies import Morsel
  6. try:
  7. from curl_cffi.requests import Session, Response
  8. from .curl_cffi import StreamResponse, StreamSession, FormData
  9. has_curl_cffi = True
  10. except ImportError:
  11. from typing import Type as Response
  12. from .aiohttp import StreamResponse, StreamSession, FormData
  13. has_curl_cffi = False
  14. try:
  15. import webview
  16. import asyncio
  17. has_webview = True
  18. except ImportError:
  19. has_webview = False
  20. try:
  21. import nodriver
  22. from nodriver.cdp.network import CookieParam
  23. from nodriver.core.config import find_chrome_executable
  24. from nodriver import Browser
  25. has_nodriver = True
  26. except ImportError:
  27. has_nodriver = False
  28. try:
  29. from platformdirs import user_config_dir
  30. has_platformdirs = True
  31. except ImportError:
  32. has_platformdirs = False
  33. from .. import debug
  34. from .raise_for_status import raise_for_status
  35. from ..errors import MissingRequirementsError
  36. from ..typing import Cookies
  37. from .defaults import DEFAULT_HEADERS, WEBVIEW_HAEDERS
  38. if not has_curl_cffi:
  39. class Session:
  40. def __init__(self, **kwargs):
  41. raise MissingRequirementsError('Install "curl_cffi" package | pip install -U curl_cffi')
  42. async def get_args_from_webview(url: str) -> dict:
  43. if not has_webview:
  44. raise MissingRequirementsError('Install "webview" package')
  45. window = webview.create_window("", url, hidden=True)
  46. await asyncio.sleep(2)
  47. body = None
  48. while body is None:
  49. try:
  50. await asyncio.sleep(1)
  51. body = window.dom.get_element("body:not(.no-js)")
  52. except:
  53. ...
  54. headers = {
  55. **WEBVIEW_HAEDERS,
  56. "User-Agent": window.evaluate_js("this.navigator.userAgent"),
  57. "Accept-Language": window.evaluate_js("this.navigator.language"),
  58. "Referer": window.real_url
  59. }
  60. cookies = [list(*cookie.items()) for cookie in window.get_cookies()]
  61. cookies = {name: cookie.value for name, cookie in cookies}
  62. window.destroy()
  63. return {"headers": headers, "cookies": cookies}
  64. def get_cookie_params_from_dict(cookies: Cookies, url: str = None, domain: str = None) -> list[CookieParam]:
  65. [CookieParam.from_json({
  66. "name": key,
  67. "value": value,
  68. "url": url,
  69. "domain": domain
  70. }) for key, value in cookies.items()]
  71. async def get_args_from_nodriver(
  72. url: str,
  73. proxy: str = None,
  74. timeout: int = 120,
  75. cookies: Cookies = None
  76. ) -> dict:
  77. if not has_nodriver:
  78. raise MissingRequirementsError('Install "nodriver" package | pip install -U nodriver')
  79. if debug.logging:
  80. print(f"Open nodriver with url: {url}")
  81. browser = await nodriver.start(
  82. browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
  83. )
  84. domain = urlparse(url).netloc
  85. if cookies is None:
  86. cookies = {}
  87. else:
  88. await browser.cookies.set_all(get_cookie_params_from_dict(cookies, url=url, domain=domain))
  89. page = await browser.get(url)
  90. for c in await page.send(nodriver.cdp.network.get_cookies([url])):
  91. cookies[c.name] = c.value
  92. user_agent = await page.evaluate("window.navigator.userAgent")
  93. await page.wait_for("body:not(.no-js)", timeout=timeout)
  94. for c in await page.send(nodriver.cdp.network.get_cookies([url])):
  95. cookies[c.name] = c.value
  96. await page.close()
  97. browser.stop()
  98. return {
  99. "impersonate": "chrome",
  100. "cookies": cookies,
  101. "headers": {
  102. **DEFAULT_HEADERS,
  103. "user-agent": user_agent,
  104. "referer": url,
  105. },
  106. "proxy": proxy
  107. }
  108. def merge_cookies(cookies: Iterator[Morsel], response: Response) -> Cookies:
  109. if cookies is None:
  110. cookies = {}
  111. for cookie in response.cookies.jar:
  112. cookies[cookie.name] = cookie.value
  113. async def get_nodriver(proxy: str = None, user_data_dir = "nodriver", browser_executable_path=None, **kwargs)-> Browser:
  114. if not has_nodriver:
  115. raise MissingRequirementsError('Install "nodriver" package | pip install -U nodriver')
  116. user_data_dir = user_config_dir(f"g4f-{user_data_dir}") if has_platformdirs else None
  117. if browser_executable_path is None:
  118. try:
  119. browser_executable_path = find_chrome_executable()
  120. except FileNotFoundError:
  121. # Default to Edge if Chrome is not found
  122. browser_executable_path = "C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe"
  123. if not os.path.exists(browser_executable_path):
  124. browser_executable_path = None
  125. debug.log(f"Open nodriver with user_dir: {user_data_dir}")
  126. return await nodriver.start(
  127. user_data_dir=user_data_dir,
  128. browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
  129. browser_executable_path=browser_executable_path,
  130. **kwargs
  131. )