123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- from __future__ import annotations
- import os
- import time
- import random
- from urllib.parse import urlparse
- from typing import Iterator
- from http.cookies import Morsel
- from pathlib import Path
- import asyncio
- try:
- from curl_cffi.requests import Session, Response
- from .curl_cffi import StreamResponse, StreamSession, FormData
- has_curl_cffi = True
- except ImportError:
- from typing import Type as Response
- from .aiohttp import StreamResponse, StreamSession, FormData
- has_curl_cffi = False
- try:
- import webview
- has_webview = True
- except ImportError:
- has_webview = False
- try:
- import nodriver
- from nodriver.cdp.network import CookieParam
- from nodriver.core.config import find_chrome_executable
- from nodriver import Browser, Tab, util
- has_nodriver = True
- except ImportError:
- from typing import Type as Browser
- from typing import Type as Tab
- has_nodriver = False
- try:
- from platformdirs import user_config_dir
- has_platformdirs = True
- except ImportError:
- has_platformdirs = False
- from .. import debug
- from .raise_for_status import raise_for_status
- from ..errors import MissingRequirementsError
- from ..typing import Cookies
- from ..cookies import get_cookies_dir
- from .defaults import DEFAULT_HEADERS, WEBVIEW_HAEDERS
- if not has_curl_cffi:
- class Session:
- def __init__(self, **kwargs):
- raise MissingRequirementsError('Install "curl_cffi" package | pip install -U curl_cffi')
- async def get_args_from_webview(url: str) -> dict:
- if not has_webview:
- raise MissingRequirementsError('Install "webview" package')
- window = webview.create_window("", url, hidden=True)
- await asyncio.sleep(2)
- body = None
- while body is None:
- try:
- await asyncio.sleep(1)
- body = window.dom.get_element("body:not(.no-js)")
- except:
- ...
- headers = {
- **WEBVIEW_HAEDERS,
- "User-Agent": window.evaluate_js("this.navigator.userAgent"),
- "Accept-Language": window.evaluate_js("this.navigator.language"),
- "Referer": window.real_url
- }
- cookies = [list(*cookie.items()) for cookie in window.get_cookies()]
- cookies = {name: cookie.value for name, cookie in cookies}
- window.destroy()
- return {"headers": headers, "cookies": cookies}
- def get_cookie_params_from_dict(cookies: Cookies, url: str = None, domain: str = None) -> list[CookieParam]:
- [CookieParam.from_json({
- "name": key,
- "value": value,
- "url": url,
- "domain": domain
- }) for key, value in cookies.items()]
- async def get_args_from_nodriver(
- url: str,
- proxy: str = None,
- timeout: int = 120,
- wait_for: str = None,
- callback: callable = None,
- cookies: Cookies = None,
- browser: Browser = None
- ) -> dict:
- if browser is None:
- browser, stop_browser = await get_nodriver(proxy=proxy, timeout=timeout)
- else:
- def stop_browser():
- ...
- try:
- if debug.logging:
- print(f"Open nodriver with url: {url}")
- domain = urlparse(url).netloc
- if cookies is None:
- cookies = {}
- else:
- await browser.cookies.set_all(get_cookie_params_from_dict(cookies, url=url, domain=domain))
- page = await browser.get(url)
- user_agent = await page.evaluate("window.navigator.userAgent")
- await page.wait_for("body:not(.no-js)", timeout=timeout)
- if wait_for is not None:
- await page.wait_for(wait_for, timeout=timeout)
- if callback is not None:
- await callback(page)
- for c in await page.send(nodriver.cdp.network.get_cookies([url])):
- cookies[c.name] = c.value
- await page.close()
- return {
- "impersonate": "chrome",
- "cookies": cookies,
- "headers": {
- **DEFAULT_HEADERS,
- "user-agent": user_agent,
- "referer": url,
- },
- "proxy": proxy,
- }
- finally:
- stop_browser()
- def merge_cookies(cookies: Iterator[Morsel], response: Response) -> Cookies:
- if cookies is None:
- cookies = {}
- if hasattr(response.cookies, "jar"):
- for cookie in response.cookies.jar:
- cookies[cookie.name] = cookie.value
- else:
- for key, value in response.cookies.items():
- cookies[key] = value
- async def get_nodriver(
- proxy: str = None,
- user_data_dir = "nodriver",
- timeout: int = 120,
- browser_executable_path=None,
- **kwargs
- ) -> tuple[Browser, callable]:
- if not has_nodriver:
- raise MissingRequirementsError('Install "nodriver" and "platformdirs" package | pip install -U nodriver platformdirs')
- user_data_dir = user_config_dir(f"g4f-{user_data_dir}") if has_platformdirs else None
- if browser_executable_path is None:
- try:
- browser_executable_path = find_chrome_executable()
- except FileNotFoundError:
- # Default to Edge if Chrome is not available.
- browser_executable_path = "C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe"
- if not os.path.exists(browser_executable_path):
- browser_executable_path = None
- lock_file = Path(get_cookies_dir()) / ".nodriver_is_open"
- # Implement a short delay (milliseconds) to prevent race conditions.
- await asyncio.sleep(0.1 * random.randint(0, 50))
- if lock_file.exists():
- opend_at = float(lock_file.read_text())
- time_open = time.time() - opend_at
- if timeout * 2 > time_open:
- debug.log(f"Nodriver: Browser is already in use since {time_open} secs.")
- for _ in range(timeout):
- if lock_file.exists():
- await asyncio.sleep(1)
- else:
- break
- lock_file.write_text(str(time.time()))
- debug.log(f"Open nodriver with user_dir: {user_data_dir}")
- try:
- browser = await nodriver.start(
- user_data_dir=user_data_dir,
- browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
- browser_executable_path=browser_executable_path,
- **kwargs
- )
- except:
- if util.get_registered_instances():
- browser = util.get_registered_instances().pop()
- else:
- raise
- def on_stop():
- try:
- if browser.connection:
- browser.stop()
- finally:
- lock_file.unlink(missing_ok=True)
- return browser, on_stop
|