webdriver.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from __future__ import annotations
  2. from platformdirs import user_config_dir
  3. from selenium.webdriver.remote.webdriver import WebDriver
  4. from undetected_chromedriver import Chrome, ChromeOptions
  5. from selenium.webdriver.common.by import By
  6. from selenium.webdriver.support.ui import WebDriverWait
  7. from selenium.webdriver.support import expected_conditions as EC
  8. from os import path
  9. from . import debug
  10. try:
  11. from pyvirtualdisplay import Display
  12. has_pyvirtualdisplay = True
  13. except ImportError:
  14. has_pyvirtualdisplay = False
  15. def get_browser(
  16. user_data_dir: str = None,
  17. headless: bool = False,
  18. proxy: str = None,
  19. options: ChromeOptions = None
  20. ) -> WebDriver:
  21. if user_data_dir == None:
  22. user_data_dir = user_config_dir("g4f")
  23. if user_data_dir and debug.logging:
  24. print("Open browser with config dir:", user_data_dir)
  25. if not options:
  26. options = ChromeOptions()
  27. if proxy:
  28. options.add_argument(f'--proxy-server={proxy}')
  29. driver = '/usr/bin/chromedriver'
  30. if not path.isfile(driver):
  31. driver = None
  32. return Chrome(
  33. options=options,
  34. user_data_dir=user_data_dir,
  35. driver_executable_path=driver,
  36. headless=headless
  37. )
  38. def get_driver_cookies(driver: WebDriver):
  39. return dict([(cookie["name"], cookie["value"]) for cookie in driver.get_cookies()])
  40. def bypass_cloudflare(driver: WebDriver, url: str, timeout: int) -> None:
  41. # Open website
  42. driver.get(url)
  43. # Is cloudflare protection
  44. if driver.find_element(By.TAG_NAME, "body").get_attribute("class") == "no-js":
  45. if debug.logging:
  46. print("Cloudflare protection detected:", url)
  47. try:
  48. # Click button in iframe
  49. WebDriverWait(driver, 5).until(
  50. EC.presence_of_element_located((By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
  51. )
  52. driver.switch_to.frame(driver.find_element(By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
  53. WebDriverWait(driver, 5).until(
  54. EC.presence_of_element_located((By.CSS_SELECTOR, "#challenge-stage input"))
  55. )
  56. driver.find_element(By.CSS_SELECTOR, "#challenge-stage input").click()
  57. except:
  58. pass
  59. finally:
  60. driver.switch_to.default_content()
  61. # No cloudflare protection
  62. WebDriverWait(driver, timeout).until(
  63. EC.presence_of_element_located((By.CSS_SELECTOR, "body:not(.no-js)"))
  64. )
  65. class WebDriverSession():
  66. def __init__(
  67. self,
  68. webdriver: WebDriver = None,
  69. user_data_dir: str = None,
  70. headless: bool = False,
  71. virtual_display: bool = False,
  72. proxy: str = None,
  73. options: ChromeOptions = None
  74. ):
  75. self.webdriver = webdriver
  76. self.user_data_dir = user_data_dir
  77. self.headless = headless
  78. self.virtual_display = None
  79. if has_pyvirtualdisplay and virtual_display:
  80. self.virtual_display = Display(size=(1920, 1080))
  81. self.proxy = proxy
  82. self.options = options
  83. self.default_driver = None
  84. def reopen(
  85. self,
  86. user_data_dir: str = None,
  87. headless: bool = False,
  88. virtual_display: bool = False
  89. ) -> WebDriver:
  90. if user_data_dir == None:
  91. user_data_dir = self.user_data_dir
  92. if self.default_driver:
  93. self.default_driver.quit()
  94. if not virtual_display and self.virtual_display:
  95. self.virtual_display.stop()
  96. self.virtual_display = None
  97. self.default_driver = get_browser(user_data_dir, headless, self.proxy)
  98. return self.default_driver
  99. def __enter__(self) -> WebDriver:
  100. if self.webdriver:
  101. return self.webdriver
  102. if self.virtual_display:
  103. self.virtual_display.start()
  104. self.default_driver = get_browser(self.user_data_dir, self.headless, self.proxy, self.options)
  105. return self.default_driver
  106. def __exit__(self, exc_type, exc_val, exc_tb):
  107. if self.default_driver:
  108. try:
  109. self.default_driver.close()
  110. except:
  111. pass
  112. self.default_driver.quit()
  113. if self.virtual_display:
  114. self.virtual_display.stop()