__init__.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. from __future__ import annotations
  2. from urllib.parse import urlparse
  3. from typing import Iterator
  4. from http.cookies import Morsel
  5. try:
  6. from curl_cffi.requests import Session, Response
  7. from .curl_cffi import StreamResponse, StreamSession, FormData
  8. has_curl_cffi = True
  9. except ImportError:
  10. from typing import Type as Session, Type as Response
  11. from .aiohttp import StreamResponse, StreamSession, FormData
  12. has_curl_cffi = False
  13. try:
  14. import webview
  15. import asyncio
  16. has_webview = True
  17. except ImportError:
  18. has_webview = False
  19. try:
  20. import nodriver
  21. from nodriver.cdp.network import CookieParam
  22. from nodriver import Browser
  23. has_nodriver = True
  24. except ImportError:
  25. has_nodriver = False
  26. try:
  27. from platformdirs import user_config_dir
  28. has_platformdirs = True
  29. except ImportError:
  30. has_platformdirs = False
  31. from .. import debug
  32. from .raise_for_status import raise_for_status
  33. from ..webdriver import WebDriver, WebDriverSession
  34. from ..webdriver import bypass_cloudflare, get_driver_cookies
  35. from ..errors import MissingRequirementsError
  36. from ..typing import Cookies
  37. from .defaults import DEFAULT_HEADERS, WEBVIEW_HAEDERS
  38. async def get_args_from_webview(url: str) -> dict:
  39. if not has_webview:
  40. raise MissingRequirementsError('Install "webview" package')
  41. window = webview.create_window("", url, hidden=True)
  42. await asyncio.sleep(2)
  43. body = None
  44. while body is None:
  45. try:
  46. await asyncio.sleep(1)
  47. body = window.dom.get_element("body:not(.no-js)")
  48. except:
  49. ...
  50. headers = {
  51. **WEBVIEW_HAEDERS,
  52. "User-Agent": window.evaluate_js("this.navigator.userAgent"),
  53. "Accept-Language": window.evaluate_js("this.navigator.language"),
  54. "Referer": window.real_url
  55. }
  56. cookies = [list(*cookie.items()) for cookie in window.get_cookies()]
  57. cookies = {name: cookie.value for name, cookie in cookies}
  58. window.destroy()
  59. return {"headers": headers, "cookies": cookies}
  60. def get_args_from_browser(
  61. url: str,
  62. webdriver: WebDriver = None,
  63. proxy: str = None,
  64. timeout: int = 120,
  65. do_bypass_cloudflare: bool = True,
  66. virtual_display: bool = False
  67. ) -> dict:
  68. """
  69. Create a Session object using a WebDriver to handle cookies and headers.
  70. Args:
  71. url (str): The URL to navigate to using the WebDriver.
  72. webdriver (WebDriver, optional): The WebDriver instance to use.
  73. proxy (str, optional): Proxy server to use for the Session.
  74. timeout (int, optional): Timeout in seconds for the WebDriver.
  75. Returns:
  76. Session: A Session object configured with cookies and headers from the WebDriver.
  77. """
  78. with WebDriverSession(webdriver, "", proxy=proxy, virtual_display=virtual_display) as driver:
  79. if do_bypass_cloudflare:
  80. bypass_cloudflare(driver, url, timeout)
  81. headers = {
  82. **DEFAULT_HEADERS,
  83. 'referer': url,
  84. }
  85. if not hasattr(driver, "requests"):
  86. headers["user-agent"] = driver.execute_script("return navigator.userAgent")
  87. else:
  88. for request in driver.requests:
  89. if request.url.startswith(url):
  90. for key, value in request.headers.items():
  91. if key in (
  92. "accept-encoding",
  93. "accept-language",
  94. "user-agent",
  95. "sec-ch-ua",
  96. "sec-ch-ua-platform",
  97. "sec-ch-ua-arch",
  98. "sec-ch-ua-full-version",
  99. "sec-ch-ua-platform-version",
  100. "sec-ch-ua-bitness"
  101. ):
  102. headers[key] = value
  103. break
  104. cookies = get_driver_cookies(driver)
  105. return {
  106. 'cookies': cookies,
  107. 'headers': headers,
  108. }
  109. def get_session_from_browser(url: str, webdriver: WebDriver = None, proxy: str = None, timeout: int = 120) -> Session:
  110. if not has_curl_cffi:
  111. raise MissingRequirementsError('Install "curl_cffi" package | pip install -U curl_cffi')
  112. args = get_args_from_browser(url, webdriver, proxy, timeout)
  113. return Session(
  114. **args,
  115. proxies={"https": proxy, "http": proxy},
  116. timeout=timeout,
  117. impersonate="chrome"
  118. )
  119. def get_cookie_params_from_dict(cookies: Cookies, url: str = None, domain: str = None) -> list[CookieParam]:
  120. [CookieParam.from_json({
  121. "name": key,
  122. "value": value,
  123. "url": url,
  124. "domain": domain
  125. }) for key, value in cookies.items()]
  126. async def get_args_from_nodriver(
  127. url: str,
  128. proxy: str = None,
  129. timeout: int = 120,
  130. cookies: Cookies = None
  131. ) -> dict:
  132. if not has_nodriver:
  133. raise MissingRequirementsError('Install "nodriver" package | pip install -U nodriver')
  134. if debug.logging:
  135. print(f"Open nodriver with url: {url}")
  136. browser = await nodriver.start(
  137. browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
  138. )
  139. domain = urlparse(url).netloc
  140. if cookies is None:
  141. cookies = {}
  142. else:
  143. await browser.cookies.set_all(get_cookie_params_from_dict(cookies, url=url, domain=domain))
  144. page = await browser.get(url)
  145. for c in await browser.cookies.get_all():
  146. if c.domain.endswith(domain):
  147. cookies[c.name] = c.value
  148. user_agent = await page.evaluate("window.navigator.userAgent")
  149. await page.wait_for("body:not(.no-js)", timeout=timeout)
  150. await page.close()
  151. browser.stop()
  152. return {
  153. "cookies": cookies,
  154. "headers": {
  155. **DEFAULT_HEADERS,
  156. "user-agent": user_agent,
  157. "referer": url,
  158. },
  159. "proxy": proxy
  160. }
  161. def merge_cookies(cookies: Iterator[Morsel], response: Response) -> Cookies:
  162. if cookies is None:
  163. cookies = {}
  164. for cookie in response.cookies.jar:
  165. cookies[cookie.name] = cookie.value
  166. async def get_nodriver(proxy: str = None, user_data_dir = "nodriver", **kwargs)-> Browser:
  167. if not has_nodriver:
  168. raise MissingRequirementsError('Install "nodriver" package | pip install -U nodriver')
  169. user_data_dir = user_config_dir(f"g4f-{user_data_dir}") if has_platformdirs else None
  170. debug.log(f"Open nodriver with user_dir: {user_data_dir}")
  171. return await nodriver.start(
  172. user_data_dir=user_data_dir,
  173. browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
  174. **kwargs
  175. )