_curlcffi.py 12 KB


  1. from __future__ import annotations
  2. import io
  3. import itertools
  4. import math
  5. import re
  6. import urllib.parse
  7. from ._helper import InstanceStoreMixin, select_proxy
  8. from .common import (
  9. Features,
  10. Request,
  11. Response,
  12. register_preference,
  13. register_rh,
  14. )
  15. from .exceptions import (
  16. CertificateVerifyError,
  17. HTTPError,
  18. IncompleteRead,
  19. ProxyError,
  20. SSLError,
  21. TransportError,
  22. )
  23. from .impersonate import ImpersonateRequestHandler, ImpersonateTarget
  24. from ..dependencies import curl_cffi, certifi
  25. from ..utils import int_or_none
  26. if curl_cffi is None:
  27. raise ImportError('curl_cffi is not installed')
  28. curl_cffi_version = tuple(map(int, re.split(r'[^\d]+', curl_cffi.__version__)[:3]))
  29. if curl_cffi_version != (0, 5, 10) and not (0, 10) <= curl_cffi_version:
  30. curl_cffi._yt_dlp__version = f'{curl_cffi.__version__} (unsupported)'
  31. raise ImportError('Only curl_cffi versions 0.5.10 and 0.10.x are supported')
  32. import curl_cffi.requests
  33. from curl_cffi.const import CurlECode, CurlOpt
  34. class CurlCFFIResponseReader(io.IOBase):
  35. def __init__(self, response: curl_cffi.requests.Response):
  36. self._response = response
  37. self._iterator = response.iter_content()
  38. self._buffer = b''
  39. self.bytes_read = 0
  40. def readable(self):
  41. return True
  42. def read(self, size=None):
  43. exception_raised = True
  44. try:
  45. while self._iterator and (size is None or len(self._buffer) < size):
  46. chunk = next(self._iterator, None)
  47. if chunk is None:
  48. self._iterator = None
  49. break
  50. self._buffer += chunk
  51. self.bytes_read += len(chunk)
  52. if size is None:
  53. size = len(self._buffer)
  54. data = self._buffer[:size]
  55. self._buffer = self._buffer[size:]
  56. # "free" the curl instance if the response is fully read.
  57. # curl_cffi doesn't do this automatically and only allows one open response per thread
  58. if not self._iterator and not self._buffer:
  59. self.close()
  60. exception_raised = False
  61. return data
  62. finally:
  63. if exception_raised:
  64. self.close()
  65. def close(self):
  66. if not self.closed:
  67. self._response.close()
  68. self._buffer = b''
  69. super().close()
  70. class CurlCFFIResponseAdapter(Response):
  71. fp: CurlCFFIResponseReader
  72. def __init__(self, response: curl_cffi.requests.Response):
  73. super().__init__(
  74. fp=CurlCFFIResponseReader(response),
  75. headers=response.headers,
  76. url=response.url,
  77. status=response.status_code)
  78. def read(self, amt=None):
  79. try:
  80. return self.fp.read(amt)
  81. except curl_cffi.requests.errors.RequestsError as e:
  82. if e.code == CurlECode.PARTIAL_FILE:
  83. content_length = e.response and int_or_none(e.response.headers.get('Content-Length'))
  84. raise IncompleteRead(
  85. partial=self.fp.bytes_read,
  86. expected=content_length - self.fp.bytes_read if content_length is not None else None,
  87. cause=e) from e
  88. raise TransportError(cause=e) from e
  89. # See: https://github.com/lexiforest/curl_cffi?tab=readme-ov-file#supported-impersonate-browsers
  90. # https://github.com/lexiforest/curl-impersonate?tab=readme-ov-file#supported-browsers
  91. BROWSER_TARGETS: dict[tuple[int, ...], dict[str, ImpersonateTarget]] = {
  92. (0, 5): {
  93. 'chrome99': ImpersonateTarget('chrome', '99', 'windows', '10'),
  94. 'chrome99_android': ImpersonateTarget('chrome', '99', 'android', '12'),
  95. 'chrome100': ImpersonateTarget('chrome', '100', 'windows', '10'),
  96. 'chrome101': ImpersonateTarget('chrome', '101', 'windows', '10'),
  97. 'chrome104': ImpersonateTarget('chrome', '104', 'windows', '10'),
  98. 'chrome107': ImpersonateTarget('chrome', '107', 'windows', '10'),
  99. 'chrome110': ImpersonateTarget('chrome', '110', 'windows', '10'),
  100. 'edge99': ImpersonateTarget('edge', '99', 'windows', '10'),
  101. 'edge101': ImpersonateTarget('edge', '101', 'windows', '10'),
  102. 'safari15_3': ImpersonateTarget('safari', '15.3', 'macos', '11'),
  103. 'safari15_5': ImpersonateTarget('safari', '15.5', 'macos', '12'),
  104. },
  105. (0, 7): {
  106. 'chrome116': ImpersonateTarget('chrome', '116', 'windows', '10'),
  107. 'chrome119': ImpersonateTarget('chrome', '119', 'macos', '14'),
  108. 'chrome120': ImpersonateTarget('chrome', '120', 'macos', '14'),
  109. 'chrome123': ImpersonateTarget('chrome', '123', 'macos', '14'),
  110. 'chrome124': ImpersonateTarget('chrome', '124', 'macos', '14'),
  111. 'safari17_0': ImpersonateTarget('safari', '17.0', 'macos', '14'),
  112. 'safari17_2_ios': ImpersonateTarget('safari', '17.2', 'ios', '17.2'),
  113. },
  114. (0, 9): {
  115. 'safari15_3': ImpersonateTarget('safari', '15.3', 'macos', '14'),
  116. 'safari15_5': ImpersonateTarget('safari', '15.5', 'macos', '14'),
  117. 'chrome119': ImpersonateTarget('chrome', '119', 'macos', '14'),
  118. 'chrome120': ImpersonateTarget('chrome', '120', 'macos', '14'),
  119. 'chrome123': ImpersonateTarget('chrome', '123', 'macos', '14'),
  120. 'chrome124': ImpersonateTarget('chrome', '124', 'macos', '14'),
  121. 'chrome131': ImpersonateTarget('chrome', '131', 'macos', '14'),
  122. 'chrome131_android': ImpersonateTarget('chrome', '131', 'android', '14'),
  123. 'chrome133a': ImpersonateTarget('chrome', '133', 'macos', '15'),
  124. 'firefox133': ImpersonateTarget('firefox', '133', 'macos', '14'),
  125. 'safari18_0': ImpersonateTarget('safari', '18.0', 'macos', '15'),
  126. 'safari18_0_ios': ImpersonateTarget('safari', '18.0', 'ios', '18.0'),
  127. },
  128. (0, 10): {
  129. 'firefox135': ImpersonateTarget('firefox', '135', 'macos', '14'),
  130. },
  131. }
  132. @register_rh
  133. class CurlCFFIRH(ImpersonateRequestHandler, InstanceStoreMixin):
  134. RH_NAME = 'curl_cffi'
  135. _SUPPORTED_URL_SCHEMES = ('http', 'https')
  136. _SUPPORTED_FEATURES = (Features.NO_PROXY, Features.ALL_PROXY)
  137. _SUPPORTED_PROXY_SCHEMES = ('http', 'https', 'socks4', 'socks4a', 'socks5', 'socks5h')
  138. _SUPPORTED_IMPERSONATE_TARGET_MAP = {
  139. target: name if curl_cffi_version >= (0, 9) else curl_cffi.requests.BrowserType[name]
  140. for name, target in dict(sorted(itertools.chain.from_iterable(
  141. targets.items()
  142. for version, targets in BROWSER_TARGETS.items()
  143. if curl_cffi_version >= version
  144. ), key=lambda x: (
  145. # deprioritize mobile targets since they give very different behavior
  146. x[1].os not in ('ios', 'android'),
  147. # prioritize edge < firefox < safari < chrome
  148. ('edge', 'firefox', 'safari', 'chrome').index(x[1].client),
  149. # prioritize newest version
  150. float(x[1].version) if x[1].version else 0,
  151. # group by os name
  152. x[1].os,
  153. ), reverse=True)).items()
  154. }
  155. def _create_instance(self, cookiejar=None):
  156. return curl_cffi.requests.Session(cookies=cookiejar)
  157. def _check_extensions(self, extensions):
  158. super()._check_extensions(extensions)
  159. extensions.pop('impersonate', None)
  160. extensions.pop('cookiejar', None)
  161. extensions.pop('timeout', None)
  162. # CurlCFFIRH ignores legacy ssl options currently.
  163. # Impersonation generally uses a looser SSL configuration than urllib/requests.
  164. extensions.pop('legacy_ssl', None)
  165. def send(self, request: Request) -> Response:
  166. target = self._get_request_target(request)
  167. try:
  168. response = super().send(request)
  169. except HTTPError as e:
  170. e.response.extensions['impersonate'] = target
  171. raise
  172. response.extensions['impersonate'] = target
  173. return response
  174. def _send(self, request: Request):
  175. max_redirects_exceeded = False
  176. session: curl_cffi.requests.Session = self._get_instance(
  177. cookiejar=self._get_cookiejar(request) if 'cookie' not in request.headers else None)
  178. if self.verbose:
  179. session.curl.setopt(CurlOpt.VERBOSE, 1)
  180. proxies = self._get_proxies(request)
  181. if 'no' in proxies:
  182. session.curl.setopt(CurlOpt.NOPROXY, proxies['no'])
  183. proxies.pop('no', None)
  184. # curl doesn't support per protocol proxies, so we select the one that matches the request protocol
  185. proxy = select_proxy(request.url, proxies=proxies)
  186. if proxy:
  187. session.curl.setopt(CurlOpt.PROXY, proxy)
  188. scheme = urllib.parse.urlparse(request.url).scheme.lower()
  189. if scheme != 'http':
  190. # Enable HTTP CONNECT for HTTPS urls.
  191. # Don't use CONNECT for http for compatibility with urllib behaviour.
  192. # See: https://curl.se/libcurl/c/CURLOPT_HTTPPROXYTUNNEL.html
  193. session.curl.setopt(CurlOpt.HTTPPROXYTUNNEL, 1)
  194. # curl_cffi does not currently set these for proxies
  195. session.curl.setopt(CurlOpt.PROXY_CAINFO, certifi.where())
  196. if not self.verify:
  197. session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYPEER, 0)
  198. session.curl.setopt(CurlOpt.PROXY_SSL_VERIFYHOST, 0)
  199. headers = self._get_impersonate_headers(request)
  200. if self._client_cert:
  201. session.curl.setopt(CurlOpt.SSLCERT, self._client_cert['client_certificate'])
  202. client_certificate_key = self._client_cert.get('client_certificate_key')
  203. client_certificate_password = self._client_cert.get('client_certificate_password')
  204. if client_certificate_key:
  205. session.curl.setopt(CurlOpt.SSLKEY, client_certificate_key)
  206. if client_certificate_password:
  207. session.curl.setopt(CurlOpt.KEYPASSWD, client_certificate_password)
  208. timeout = self._calculate_timeout(request)
  209. # set CURLOPT_LOW_SPEED_LIMIT and CURLOPT_LOW_SPEED_TIME to act as a read timeout. [1]
  210. # This is required only for 0.5.10 [2]
  211. # Note: CURLOPT_LOW_SPEED_TIME is in seconds, so we need to round up to the nearest second. [3]
  212. # [1] https://unix.stackexchange.com/a/305311
  213. # [2] https://github.com/yifeikong/curl_cffi/issues/156
  214. # [3] https://curl.se/libcurl/c/CURLOPT_LOW_SPEED_TIME.html
  215. session.curl.setopt(CurlOpt.LOW_SPEED_LIMIT, 1) # 1 byte per second
  216. session.curl.setopt(CurlOpt.LOW_SPEED_TIME, math.ceil(timeout))
  217. try:
  218. curl_response = session.request(
  219. method=request.method,
  220. url=request.url,
  221. headers=headers,
  222. data=request.data,
  223. verify=self.verify,
  224. max_redirects=5,
  225. timeout=(timeout, timeout),
  226. impersonate=self._SUPPORTED_IMPERSONATE_TARGET_MAP.get(
  227. self._get_request_target(request)),
  228. interface=self.source_address,
  229. stream=True,
  230. )
  231. except curl_cffi.requests.errors.RequestsError as e:
  232. if e.code == CurlECode.PEER_FAILED_VERIFICATION:
  233. raise CertificateVerifyError(cause=e) from e
  234. elif e.code == CurlECode.SSL_CONNECT_ERROR:
  235. raise SSLError(cause=e) from e
  236. elif e.code == CurlECode.TOO_MANY_REDIRECTS:
  237. max_redirects_exceeded = True
  238. curl_response = e.response
  239. elif (
  240. e.code == CurlECode.PROXY
  241. or (e.code == CurlECode.RECV_ERROR and 'CONNECT' in str(e))
  242. ):
  243. raise ProxyError(cause=e) from e
  244. else:
  245. raise TransportError(cause=e) from e
  246. response = CurlCFFIResponseAdapter(curl_response)
  247. if not 200 <= response.status < 300:
  248. raise HTTPError(response, redirect_loop=max_redirects_exceeded)
  249. return response
  250. @register_preference(CurlCFFIRH)
  251. def curl_cffi_preference(rh, request):
  252. return -100