123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- from __future__ import annotations
- from urllib.parse import urlparse
- from typing import Iterator
- from http.cookies import Morsel
- try:
- from curl_cffi.requests import Session, Response
- from .curl_cffi import StreamResponse, StreamSession, FormData
- has_curl_cffi = True
- except ImportError:
- from typing import Type as Session, Type as Response
- from .aiohttp import StreamResponse, StreamSession, FormData
- has_curl_cffi = False
- try:
- import webview
- import asyncio
- has_webview = True
- except ImportError:
- has_webview = False
- try:
- import nodriver
- from nodriver.cdp.network import CookieParam
- from nodriver import Browser
- has_nodriver = True
- except ImportError:
- has_nodriver = False
- try:
- from platformdirs import user_config_dir
- has_platformdirs = True
- except ImportError:
- has_platformdirs = False
- from .. import debug
- from .raise_for_status import raise_for_status
- from ..webdriver import WebDriver, WebDriverSession
- from ..webdriver import bypass_cloudflare, get_driver_cookies
- from ..errors import MissingRequirementsError
- from ..typing import Cookies
- from .defaults import DEFAULT_HEADERS, WEBVIEW_HAEDERS
- async def get_args_from_webview(url: str) -> dict:
- if not has_webview:
- raise MissingRequirementsError('Install "webview" package')
- window = webview.create_window("", url, hidden=True)
- await asyncio.sleep(2)
- body = None
- while body is None:
- try:
- await asyncio.sleep(1)
- body = window.dom.get_element("body:not(.no-js)")
- except:
- ...
- headers = {
- **WEBVIEW_HAEDERS,
- "User-Agent": window.evaluate_js("this.navigator.userAgent"),
- "Accept-Language": window.evaluate_js("this.navigator.language"),
- "Referer": window.real_url
- }
- cookies = [list(*cookie.items()) for cookie in window.get_cookies()]
- cookies = {name: cookie.value for name, cookie in cookies}
- window.destroy()
- return {"headers": headers, "cookies": cookies}
- def get_args_from_browser(
- url: str,
- webdriver: WebDriver = None,
- proxy: str = None,
- timeout: int = 120,
- do_bypass_cloudflare: bool = True,
- virtual_display: bool = False
- ) -> dict:
- """
- Create a Session object using a WebDriver to handle cookies and headers.
- Args:
- url (str): The URL to navigate to using the WebDriver.
- webdriver (WebDriver, optional): The WebDriver instance to use.
- proxy (str, optional): Proxy server to use for the Session.
- timeout (int, optional): Timeout in seconds for the WebDriver.
- Returns:
- Session: A Session object configured with cookies and headers from the WebDriver.
- """
- with WebDriverSession(webdriver, "", proxy=proxy, virtual_display=virtual_display) as driver:
- if do_bypass_cloudflare:
- bypass_cloudflare(driver, url, timeout)
- headers = {
- **DEFAULT_HEADERS,
- 'referer': url,
- }
- if not hasattr(driver, "requests"):
- headers["user-agent"] = driver.execute_script("return navigator.userAgent")
- else:
- for request in driver.requests:
- if request.url.startswith(url):
- for key, value in request.headers.items():
- if key in (
- "accept-encoding",
- "accept-language",
- "user-agent",
- "sec-ch-ua",
- "sec-ch-ua-platform",
- "sec-ch-ua-arch",
- "sec-ch-ua-full-version",
- "sec-ch-ua-platform-version",
- "sec-ch-ua-bitness"
- ):
- headers[key] = value
- break
- cookies = get_driver_cookies(driver)
- return {
- 'cookies': cookies,
- 'headers': headers,
- }
- def get_session_from_browser(url: str, webdriver: WebDriver = None, proxy: str = None, timeout: int = 120) -> Session:
- if not has_curl_cffi:
- raise MissingRequirementsError('Install "curl_cffi" package | pip install -U curl_cffi')
- args = get_args_from_browser(url, webdriver, proxy, timeout)
- return Session(
- **args,
- proxies={"https": proxy, "http": proxy},
- timeout=timeout,
- impersonate="chrome"
- )
- def get_cookie_params_from_dict(cookies: Cookies, url: str = None, domain: str = None) -> list[CookieParam]:
- [CookieParam.from_json({
- "name": key,
- "value": value,
- "url": url,
- "domain": domain
- }) for key, value in cookies.items()]
- async def get_args_from_nodriver(
- url: str,
- proxy: str = None,
- timeout: int = 120,
- cookies: Cookies = None
- ) -> dict:
- if not has_nodriver:
- raise MissingRequirementsError('Install "nodriver" package | pip install -U nodriver')
- if debug.logging:
- print(f"Open nodriver with url: {url}")
- browser = await nodriver.start(
- browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
- )
- domain = urlparse(url).netloc
- if cookies is None:
- cookies = {}
- else:
- await browser.cookies.set_all(get_cookie_params_from_dict(cookies, url=url, domain=domain))
- page = await browser.get(url)
- for c in await browser.cookies.get_all():
- if c.domain.endswith(domain):
- cookies[c.name] = c.value
- user_agent = await page.evaluate("window.navigator.userAgent")
- await page.wait_for("body:not(.no-js)", timeout=timeout)
- await page.close()
- browser.stop()
- return {
- "cookies": cookies,
- "headers": {
- **DEFAULT_HEADERS,
- "user-agent": user_agent,
- "referer": url,
- },
- "proxy": proxy
- }
- def merge_cookies(cookies: Iterator[Morsel], response: Response) -> Cookies:
- if cookies is None:
- cookies = {}
- for cookie in response.cookies.jar:
- cookies[cookie.name] = cookie.value
- async def get_nodriver(proxy: str = None, user_data_dir = "nodriver", **kwargs)-> Browser:
- if not has_nodriver:
- raise MissingRequirementsError('Install "nodriver" package | pip install -U nodriver')
- user_data_dir = user_config_dir(f"g4f-{user_data_dir}") if has_platformdirs else None
- debug.log(f"Open nodriver with user_dir: {user_data_dir}")
- return await nodriver.start(
- user_data_dir=user_data_dir,
- browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
- **kwargs
- )
|