test_networking.py 89 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093
  1. #!/usr/bin/env python3
  2. # Allow direct execution
  3. import os
  4. import sys
  5. import pytest
  6. from yt_dlp.networking.common import Features, DEFAULT_TIMEOUT
  7. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  8. import gzip
  9. import http.client
  10. import http.cookiejar
  11. import http.server
  12. import io
  13. import logging
  14. import pathlib
  15. import random
  16. import ssl
  17. import tempfile
  18. import threading
  19. import time
  20. import urllib.error
  21. import urllib.request
  22. import warnings
  23. import zlib
  24. from email.message import Message
  25. from http.cookiejar import CookieJar
  26. from test.helper import (
  27. FakeYDL,
  28. http_server_port,
  29. validate_and_send,
  30. verify_address_availability,
  31. )
  32. from yt_dlp.cookies import YoutubeDLCookieJar
  33. from yt_dlp.dependencies import brotli, curl_cffi, requests, urllib3
  34. from yt_dlp.networking import (
  35. HEADRequest,
  36. PUTRequest,
  37. Request,
  38. RequestDirector,
  39. RequestHandler,
  40. Response,
  41. )
  42. from yt_dlp.networking._urllib import UrllibRH
  43. from yt_dlp.networking.exceptions import (
  44. CertificateVerifyError,
  45. HTTPError,
  46. IncompleteRead,
  47. NoSupportingHandlers,
  48. ProxyError,
  49. RequestError,
  50. SSLError,
  51. TransportError,
  52. UnsupportedRequest,
  53. )
  54. from yt_dlp.networking.impersonate import (
  55. ImpersonateRequestHandler,
  56. ImpersonateTarget,
  57. )
  58. from yt_dlp.utils import YoutubeDLError
  59. from yt_dlp.utils._utils import _YDLLogger as FakeLogger
  60. from yt_dlp.utils.networking import HTTPHeaderDict, std_headers
  61. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  62. class HTTPTestRequestHandler(http.server.BaseHTTPRequestHandler):
  63. protocol_version = 'HTTP/1.1'
  64. default_request_version = 'HTTP/1.1'
  65. def log_message(self, format, *args):
  66. pass
  67. def _headers(self):
  68. payload = str(self.headers).encode()
  69. self.send_response(200)
  70. self.send_header('Content-Type', 'application/json')
  71. self.send_header('Content-Length', str(len(payload)))
  72. self.end_headers()
  73. self.wfile.write(payload)
  74. def _redirect(self):
  75. self.send_response(int(self.path[len('/redirect_'):]))
  76. self.send_header('Location', '/method')
  77. self.send_header('Content-Length', '0')
  78. self.end_headers()
  79. def _method(self, method, payload=None):
  80. self.send_response(200)
  81. self.send_header('Content-Length', str(len(payload or '')))
  82. self.send_header('Method', method)
  83. self.end_headers()
  84. if payload:
  85. self.wfile.write(payload)
  86. def _status(self, status):
  87. payload = f'<html>{status} NOT FOUND</html>'.encode()
  88. self.send_response(int(status))
  89. self.send_header('Content-Type', 'text/html; charset=utf-8')
  90. self.send_header('Content-Length', str(len(payload)))
  91. self.end_headers()
  92. self.wfile.write(payload)
  93. def _read_data(self):
  94. if 'Content-Length' in self.headers:
  95. return self.rfile.read(int(self.headers['Content-Length']))
  96. else:
  97. return b''
  98. def do_POST(self):
  99. data = self._read_data() + str(self.headers).encode()
  100. if self.path.startswith('/redirect_'):
  101. self._redirect()
  102. elif self.path.startswith('/method'):
  103. self._method('POST', data)
  104. elif self.path.startswith('/headers'):
  105. self._headers()
  106. else:
  107. self._status(404)
  108. def do_HEAD(self):
  109. if self.path.startswith('/redirect_'):
  110. self._redirect()
  111. elif self.path.startswith('/method'):
  112. self._method('HEAD')
  113. else:
  114. self._status(404)
  115. def do_PUT(self):
  116. data = self._read_data() + str(self.headers).encode()
  117. if self.path.startswith('/redirect_'):
  118. self._redirect()
  119. elif self.path.startswith('/method'):
  120. self._method('PUT', data)
  121. else:
  122. self._status(404)
  123. def do_GET(self):
  124. if self.path == '/video.html':
  125. payload = b'<html><video src="/vid.mp4" /></html>'
  126. self.send_response(200)
  127. self.send_header('Content-Type', 'text/html; charset=utf-8')
  128. self.send_header('Content-Length', str(len(payload)))
  129. self.end_headers()
  130. self.wfile.write(payload)
  131. elif self.path == '/vid.mp4':
  132. payload = b'\x00\x00\x00\x00\x20\x66\x74[video]'
  133. self.send_response(200)
  134. self.send_header('Content-Type', 'video/mp4')
  135. self.send_header('Content-Length', str(len(payload)))
  136. self.end_headers()
  137. self.wfile.write(payload)
  138. elif self.path == '/%E4%B8%AD%E6%96%87.html':
  139. payload = b'<html><video src="/vid.mp4" /></html>'
  140. self.send_response(200)
  141. self.send_header('Content-Type', 'text/html; charset=utf-8')
  142. self.send_header('Content-Length', str(len(payload)))
  143. self.end_headers()
  144. self.wfile.write(payload)
  145. elif self.path == '/%c7%9f':
  146. payload = b'<html><video src="/vid.mp4" /></html>'
  147. self.send_response(200)
  148. self.send_header('Content-Type', 'text/html; charset=utf-8')
  149. self.send_header('Content-Length', str(len(payload)))
  150. self.end_headers()
  151. self.wfile.write(payload)
  152. elif self.path.startswith('/redirect_loop'):
  153. self.send_response(301)
  154. self.send_header('Location', self.path)
  155. self.send_header('Content-Length', '0')
  156. self.end_headers()
  157. elif self.path == '/redirect_dotsegments':
  158. self.send_response(301)
  159. # redirect to /headers but with dot segments before
  160. self.send_header('Location', '/a/b/./../../headers')
  161. self.send_header('Content-Length', '0')
  162. self.end_headers()
  163. elif self.path == '/redirect_dotsegments_absolute':
  164. self.send_response(301)
  165. # redirect to /headers but with dot segments before - absolute url
  166. self.send_header('Location', f'http://127.0.0.1:{http_server_port(self.server)}/a/b/./../../headers')
  167. self.send_header('Content-Length', '0')
  168. self.end_headers()
  169. elif self.path.startswith('/redirect_'):
  170. self._redirect()
  171. elif self.path.startswith('/method'):
  172. self._method('GET', str(self.headers).encode())
  173. elif self.path.startswith('/headers'):
  174. self._headers()
  175. elif self.path.startswith('/308-to-headers'):
  176. self.send_response(308)
  177. # redirect to "localhost" for testing cookie redirection handling
  178. self.send_header('Location', f'http://localhost:{self.connection.getsockname()[1]}/headers')
  179. self.send_header('Content-Length', '0')
  180. self.end_headers()
  181. elif self.path == '/trailing_garbage':
  182. payload = b'<html><video src="/vid.mp4" /></html>'
  183. self.send_response(200)
  184. self.send_header('Content-Type', 'text/html; charset=utf-8')
  185. self.send_header('Content-Encoding', 'gzip')
  186. buf = io.BytesIO()
  187. with gzip.GzipFile(fileobj=buf, mode='wb') as f:
  188. f.write(payload)
  189. compressed = buf.getvalue() + b'trailing garbage'
  190. self.send_header('Content-Length', str(len(compressed)))
  191. self.end_headers()
  192. self.wfile.write(compressed)
  193. elif self.path == '/302-non-ascii-redirect':
  194. new_url = f'http://127.0.0.1:{http_server_port(self.server)}/中文.html'
  195. self.send_response(301)
  196. self.send_header('Location', new_url)
  197. self.send_header('Content-Length', '0')
  198. self.end_headers()
  199. elif self.path == '/content-encoding':
  200. encodings = self.headers.get('ytdl-encoding', '')
  201. payload = b'<html><video src="/vid.mp4" /></html>'
  202. for encoding in filter(None, (e.strip() for e in encodings.split(','))):
  203. if encoding == 'br' and brotli:
  204. payload = brotli.compress(payload)
  205. elif encoding == 'gzip':
  206. buf = io.BytesIO()
  207. with gzip.GzipFile(fileobj=buf, mode='wb') as f:
  208. f.write(payload)
  209. payload = buf.getvalue()
  210. elif encoding == 'deflate':
  211. payload = zlib.compress(payload)
  212. elif encoding == 'unsupported':
  213. payload = b'raw'
  214. break
  215. else:
  216. self._status(415)
  217. return
  218. self.send_response(200)
  219. self.send_header('Content-Encoding', encodings)
  220. self.send_header('Content-Length', str(len(payload)))
  221. self.end_headers()
  222. self.wfile.write(payload)
  223. elif self.path.startswith('/gen_'):
  224. payload = b'<html></html>'
  225. self.send_response(int(self.path[len('/gen_'):]))
  226. self.send_header('Content-Type', 'text/html; charset=utf-8')
  227. self.send_header('Content-Length', str(len(payload)))
  228. self.end_headers()
  229. self.wfile.write(payload)
  230. elif self.path.startswith('/incompleteread'):
  231. payload = b'<html></html>'
  232. self.send_response(200)
  233. self.send_header('Content-Type', 'text/html; charset=utf-8')
  234. self.send_header('Content-Length', '234234')
  235. self.end_headers()
  236. self.wfile.write(payload)
  237. self.finish()
  238. elif self.path.startswith('/timeout_'):
  239. time.sleep(int(self.path[len('/timeout_'):]))
  240. self._headers()
  241. elif self.path == '/source_address':
  242. payload = str(self.client_address[0]).encode()
  243. self.send_response(200)
  244. self.send_header('Content-Type', 'text/html; charset=utf-8')
  245. self.send_header('Content-Length', str(len(payload)))
  246. self.end_headers()
  247. self.wfile.write(payload)
  248. self.finish()
  249. elif self.path == '/get_cookie':
  250. self.send_response(200)
  251. self.send_header('Set-Cookie', 'test=ytdlp; path=/')
  252. self.end_headers()
  253. self.finish()
  254. else:
  255. self._status(404)
  256. def send_header(self, keyword, value):
  257. """
  258. Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers.
  259. This is against what is defined in RFC 3986, however we need to test we support this
  260. since some sites incorrectly do this.
  261. """
  262. if keyword.lower() == 'connection':
  263. return super().send_header(keyword, value)
  264. if not hasattr(self, '_headers_buffer'):
  265. self._headers_buffer = []
  266. self._headers_buffer.append(f'{keyword}: {value}\r\n'.encode())
  267. class TestRequestHandlerBase:
  268. @classmethod
  269. def setup_class(cls):
  270. cls.http_httpd = http.server.ThreadingHTTPServer(
  271. ('127.0.0.1', 0), HTTPTestRequestHandler)
  272. cls.http_port = http_server_port(cls.http_httpd)
  273. cls.http_server_thread = threading.Thread(target=cls.http_httpd.serve_forever)
  274. # FIXME: we should probably stop the http server thread after each test
  275. # See: https://github.com/yt-dlp/yt-dlp/pull/7094#discussion_r1199746041
  276. cls.http_server_thread.daemon = True
  277. cls.http_server_thread.start()
  278. # HTTPS server
  279. certfn = os.path.join(TEST_DIR, 'testcert.pem')
  280. cls.https_httpd = http.server.ThreadingHTTPServer(
  281. ('127.0.0.1', 0), HTTPTestRequestHandler)
  282. sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  283. sslctx.load_cert_chain(certfn, None)
  284. cls.https_httpd.socket = sslctx.wrap_socket(cls.https_httpd.socket, server_side=True)
  285. cls.https_port = http_server_port(cls.https_httpd)
  286. cls.https_server_thread = threading.Thread(target=cls.https_httpd.serve_forever)
  287. cls.https_server_thread.daemon = True
  288. cls.https_server_thread.start()
  289. @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
  290. class TestHTTPRequestHandler(TestRequestHandlerBase):
  291. def test_verify_cert(self, handler):
  292. with handler() as rh:
  293. with pytest.raises(CertificateVerifyError):
  294. validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
  295. with handler(verify=False) as rh:
  296. r = validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
  297. assert r.status == 200
  298. r.close()
  299. def test_ssl_error(self, handler):
  300. # HTTPS server with too old TLS version
  301. # XXX: is there a better way to test this than to create a new server?
  302. https_httpd = http.server.ThreadingHTTPServer(
  303. ('127.0.0.1', 0), HTTPTestRequestHandler)
  304. sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  305. https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
  306. https_port = http_server_port(https_httpd)
  307. https_server_thread = threading.Thread(target=https_httpd.serve_forever)
  308. https_server_thread.daemon = True
  309. https_server_thread.start()
  310. with handler(verify=False) as rh:
  311. with pytest.raises(SSLError, match=r'(?i)ssl(?:v3|/tls).alert.handshake.failure') as exc_info:
  312. validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
  313. assert not issubclass(exc_info.type, CertificateVerifyError)
  314. @pytest.mark.skip_handler('CurlCFFI', 'legacy_ssl ignored by CurlCFFI')
  315. def test_legacy_ssl_extension(self, handler):
  316. # HTTPS server with old ciphers
  317. # XXX: is there a better way to test this than to create a new server?
  318. https_httpd = http.server.ThreadingHTTPServer(
  319. ('127.0.0.1', 0), HTTPTestRequestHandler)
  320. sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  321. sslctx.maximum_version = ssl.TLSVersion.TLSv1_2
  322. sslctx.set_ciphers('SHA1:AESCCM:aDSS:eNULL:aNULL')
  323. sslctx.load_cert_chain(os.path.join(TEST_DIR, 'testcert.pem'), None)
  324. https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
  325. https_port = http_server_port(https_httpd)
  326. https_server_thread = threading.Thread(target=https_httpd.serve_forever)
  327. https_server_thread.daemon = True
  328. https_server_thread.start()
  329. with handler(verify=False) as rh:
  330. res = validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers', extensions={'legacy_ssl': True}))
  331. assert res.status == 200
  332. res.close()
  333. # Ensure only applies to request extension
  334. with pytest.raises(SSLError):
  335. validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
  336. @pytest.mark.skip_handler('CurlCFFI', 'legacy_ssl ignored by CurlCFFI')
  337. def test_legacy_ssl_support(self, handler):
  338. # HTTPS server with old ciphers
  339. # XXX: is there a better way to test this than to create a new server?
  340. https_httpd = http.server.ThreadingHTTPServer(
  341. ('127.0.0.1', 0), HTTPTestRequestHandler)
  342. sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  343. sslctx.maximum_version = ssl.TLSVersion.TLSv1_2
  344. sslctx.set_ciphers('SHA1:AESCCM:aDSS:eNULL:aNULL')
  345. sslctx.load_cert_chain(os.path.join(TEST_DIR, 'testcert.pem'), None)
  346. https_httpd.socket = sslctx.wrap_socket(https_httpd.socket, server_side=True)
  347. https_port = http_server_port(https_httpd)
  348. https_server_thread = threading.Thread(target=https_httpd.serve_forever)
  349. https_server_thread.daemon = True
  350. https_server_thread.start()
  351. with handler(verify=False, legacy_ssl_support=True) as rh:
  352. res = validate_and_send(rh, Request(f'https://127.0.0.1:{https_port}/headers'))
  353. assert res.status == 200
  354. res.close()
  355. def test_percent_encode(self, handler):
  356. with handler() as rh:
  357. # Unicode characters should be encoded with uppercase percent-encoding
  358. res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/中文.html'))
  359. assert res.status == 200
  360. res.close()
  361. # don't normalize existing percent encodings
  362. res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/%c7%9f'))
  363. assert res.status == 200
  364. res.close()
  365. @pytest.mark.parametrize('path', [
  366. '/a/b/./../../headers',
  367. '/redirect_dotsegments',
  368. # https://github.com/yt-dlp/yt-dlp/issues/9020
  369. '/redirect_dotsegments_absolute',
  370. ])
  371. def test_remove_dot_segments(self, handler, path):
  372. with handler(verbose=True) as rh:
  373. # This isn't a comprehensive test,
  374. # but it should be enough to check whether the handler is removing dot segments in required scenarios
  375. res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}{path}'))
  376. assert res.status == 200
  377. assert res.url == f'http://127.0.0.1:{self.http_port}/headers'
  378. res.close()
  379. @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi (non-standard)')
  380. def test_unicode_path_redirection(self, handler):
  381. with handler() as rh:
  382. r = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/302-non-ascii-redirect'))
  383. assert r.url == f'http://127.0.0.1:{self.http_port}/%E4%B8%AD%E6%96%87.html'
  384. r.close()
  385. def test_raise_http_error(self, handler):
  386. with handler() as rh:
  387. for bad_status in (400, 500, 599, 302):
  388. with pytest.raises(HTTPError):
  389. validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_{bad_status}'))
  390. # Should not raise an error
  391. validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200')).close()
  392. def test_response_url(self, handler):
  393. with handler() as rh:
  394. # Response url should be that of the last url in redirect chain
  395. res = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_301'))
  396. assert res.url == f'http://127.0.0.1:{self.http_port}/method'
  397. res.close()
  398. res2 = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_200'))
  399. assert res2.url == f'http://127.0.0.1:{self.http_port}/gen_200'
  400. res2.close()
  401. # Covers some basic cases we expect some level of consistency between request handlers for
  402. @pytest.mark.parametrize('redirect_status,method,expected', [
  403. # A 303 must either use GET or HEAD for subsequent request
  404. (303, 'POST', ('', 'GET', False)),
  405. (303, 'HEAD', ('', 'HEAD', False)),
  406. # 301 and 302 turn POST only into a GET
  407. (301, 'POST', ('', 'GET', False)),
  408. (301, 'HEAD', ('', 'HEAD', False)),
  409. (302, 'POST', ('', 'GET', False)),
  410. (302, 'HEAD', ('', 'HEAD', False)),
  411. # 307 and 308 should not change method
  412. (307, 'POST', ('testdata', 'POST', True)),
  413. (308, 'POST', ('testdata', 'POST', True)),
  414. (307, 'HEAD', ('', 'HEAD', False)),
  415. (308, 'HEAD', ('', 'HEAD', False)),
  416. ])
  417. def test_redirect(self, handler, redirect_status, method, expected):
  418. with handler() as rh:
  419. data = b'testdata' if method == 'POST' else None
  420. headers = {}
  421. if data is not None:
  422. headers['Content-Type'] = 'application/test'
  423. res = validate_and_send(
  424. rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_{redirect_status}', method=method, data=data,
  425. headers=headers))
  426. headers = b''
  427. data_recv = b''
  428. if data is not None:
  429. data_recv += res.read(len(data))
  430. if data_recv != data:
  431. headers += data_recv
  432. data_recv = b''
  433. headers += res.read()
  434. assert expected[0] == data_recv.decode()
  435. assert expected[1] == res.headers.get('method')
  436. assert expected[2] == ('content-length' in headers.decode().lower())
  437. def test_request_cookie_header(self, handler):
  438. # We should accept a Cookie header being passed as in normal headers and handle it appropriately.
  439. with handler() as rh:
  440. # Specified Cookie header should be used
  441. res = validate_and_send(
  442. rh, Request(
  443. f'http://127.0.0.1:{self.http_port}/headers',
  444. headers={'Cookie': 'test=test'})).read().decode()
  445. assert 'cookie: test=test' in res.lower()
  446. # Specified Cookie header should be removed on any redirect
  447. res = validate_and_send(
  448. rh, Request(
  449. f'http://127.0.0.1:{self.http_port}/308-to-headers',
  450. headers={'Cookie': 'test=test2'})).read().decode()
  451. assert 'cookie: test=test2' not in res.lower()
  452. # Specified Cookie header should override global cookiejar for that request
  453. # Whether cookies from the cookiejar is applied on the redirect is considered undefined for now
  454. cookiejar = YoutubeDLCookieJar()
  455. cookiejar.set_cookie(http.cookiejar.Cookie(
  456. version=0, name='test', value='ytdlp', port=None, port_specified=False,
  457. domain='127.0.0.1', domain_specified=True, domain_initial_dot=False, path='/',
  458. path_specified=True, secure=False, expires=None, discard=False, comment=None,
  459. comment_url=None, rest={}))
  460. with handler(cookiejar=cookiejar) as rh:
  461. data = validate_and_send(
  462. rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'cookie': 'test=test3'})).read()
  463. assert b'cookie: test=ytdlp' not in data.lower()
  464. assert b'cookie: test=test3' in data.lower()
  465. def test_redirect_loop(self, handler):
  466. with handler() as rh:
  467. with pytest.raises(HTTPError, match='redirect loop'):
  468. validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/redirect_loop'))
  469. def test_incompleteread(self, handler):
  470. with handler(timeout=2) as rh:
  471. with pytest.raises(IncompleteRead, match='13 bytes read, 234221 more expected'):
  472. validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/incompleteread')).read()
  473. def test_cookies(self, handler):
  474. cookiejar = YoutubeDLCookieJar()
  475. cookiejar.set_cookie(http.cookiejar.Cookie(
  476. 0, 'test', 'ytdlp', None, False, '127.0.0.1', True,
  477. False, '/headers', True, False, None, False, None, None, {}))
  478. with handler(cookiejar=cookiejar) as rh:
  479. data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
  480. assert b'cookie: test=ytdlp' in data.lower()
  481. # Per request
  482. with handler() as rh:
  483. data = validate_and_send(
  484. rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': cookiejar})).read()
  485. assert b'cookie: test=ytdlp' in data.lower()
  486. def test_cookie_sync_only_cookiejar(self, handler):
  487. # Ensure that cookies are ONLY being handled by the cookiejar
  488. with handler() as rh:
  489. validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/get_cookie', extensions={'cookiejar': YoutubeDLCookieJar()}))
  490. data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={'cookiejar': YoutubeDLCookieJar()})).read()
  491. assert b'cookie: test=ytdlp' not in data.lower()
  492. def test_cookie_sync_delete_cookie(self, handler):
  493. # Ensure that cookies are ONLY being handled by the cookiejar
  494. cookiejar = YoutubeDLCookieJar()
  495. with handler(cookiejar=cookiejar) as rh:
  496. validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/get_cookie'))
  497. data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
  498. assert b'cookie: test=ytdlp' in data.lower()
  499. cookiejar.clear_session_cookies()
  500. data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read()
  501. assert b'cookie: test=ytdlp' not in data.lower()
  502. def test_headers(self, handler):
  503. with handler(headers=HTTPHeaderDict({'test1': 'test', 'test2': 'test2'})) as rh:
  504. # Global Headers
  505. data = validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).read().lower()
  506. assert b'test1: test' in data
  507. # Per request headers, merged with global
  508. data = validate_and_send(rh, Request(
  509. f'http://127.0.0.1:{self.http_port}/headers', headers={'test2': 'changed', 'test3': 'test3'})).read().lower()
  510. assert b'test1: test' in data
  511. assert b'test2: changed' in data
  512. assert b'test2: test2' not in data
  513. assert b'test3: test3' in data
  514. def test_read_timeout(self, handler):
  515. with handler() as rh:
  516. # Default timeout is 20 seconds, so this should go through
  517. validate_and_send(
  518. rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1'))
  519. with handler(timeout=0.1) as rh:
  520. with pytest.raises(TransportError):
  521. validate_and_send(
  522. rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_5'))
  523. # Per request timeout, should override handler timeout
  524. validate_and_send(
  525. rh, Request(f'http://127.0.0.1:{self.http_port}/timeout_1', extensions={'timeout': 4}))
  526. def test_connect_timeout(self, handler):
  527. # nothing should be listening on this port
  528. connect_timeout_url = 'http://10.255.255.255'
  529. with handler(timeout=0.01) as rh, pytest.raises(TransportError):
  530. now = time.time()
  531. validate_and_send(rh, Request(connect_timeout_url))
  532. assert time.time() - now < DEFAULT_TIMEOUT
  533. # Per request timeout, should override handler timeout
  534. request = Request(connect_timeout_url, extensions={'timeout': 0.01})
  535. with handler() as rh, pytest.raises(TransportError):
  536. now = time.time()
  537. validate_and_send(rh, request)
  538. assert time.time() - now < DEFAULT_TIMEOUT
  539. def test_source_address(self, handler):
  540. source_address = f'127.0.0.{random.randint(5, 255)}'
  541. # on some systems these loopback addresses we need for testing may not be available
  542. # see: https://github.com/yt-dlp/yt-dlp/issues/8890
  543. verify_address_availability(source_address)
  544. with handler(source_address=source_address) as rh:
  545. data = validate_and_send(
  546. rh, Request(f'http://127.0.0.1:{self.http_port}/source_address')).read().decode()
  547. assert source_address == data
  548. @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
  549. def test_gzip_trailing_garbage(self, handler):
  550. with handler() as rh:
  551. data = validate_and_send(rh, Request(f'http://localhost:{self.http_port}/trailing_garbage')).read().decode()
  552. assert data == '<html><video src="/vid.mp4" /></html>'
  553. @pytest.mark.skip_handler('CurlCFFI', 'not applicable to curl-cffi')
  554. @pytest.mark.skipif(not brotli, reason='brotli support is not installed')
  555. def test_brotli(self, handler):
  556. with handler() as rh:
  557. res = validate_and_send(
  558. rh, Request(
  559. f'http://127.0.0.1:{self.http_port}/content-encoding',
  560. headers={'ytdl-encoding': 'br'}))
  561. assert res.headers.get('Content-Encoding') == 'br'
  562. assert res.read() == b'<html><video src="/vid.mp4" /></html>'
  563. def test_deflate(self, handler):
  564. with handler() as rh:
  565. res = validate_and_send(
  566. rh, Request(
  567. f'http://127.0.0.1:{self.http_port}/content-encoding',
  568. headers={'ytdl-encoding': 'deflate'}))
  569. assert res.headers.get('Content-Encoding') == 'deflate'
  570. assert res.read() == b'<html><video src="/vid.mp4" /></html>'
  571. def test_gzip(self, handler):
  572. with handler() as rh:
  573. res = validate_and_send(
  574. rh, Request(
  575. f'http://127.0.0.1:{self.http_port}/content-encoding',
  576. headers={'ytdl-encoding': 'gzip'}))
  577. assert res.headers.get('Content-Encoding') == 'gzip'
  578. assert res.read() == b'<html><video src="/vid.mp4" /></html>'
  579. def test_multiple_encodings(self, handler):
  580. with handler() as rh:
  581. for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'):
  582. res = validate_and_send(
  583. rh, Request(
  584. f'http://127.0.0.1:{self.http_port}/content-encoding',
  585. headers={'ytdl-encoding': pair}))
  586. assert res.headers.get('Content-Encoding') == pair
  587. assert res.read() == b'<html><video src="/vid.mp4" /></html>'
  588. @pytest.mark.skip_handler('CurlCFFI', 'not supported by curl-cffi')
  589. def test_unsupported_encoding(self, handler):
  590. with handler() as rh:
  591. res = validate_and_send(
  592. rh, Request(
  593. f'http://127.0.0.1:{self.http_port}/content-encoding',
  594. headers={'ytdl-encoding': 'unsupported', 'Accept-Encoding': '*'}))
  595. assert res.headers.get('Content-Encoding') == 'unsupported'
  596. assert res.read() == b'raw'
  597. def test_read(self, handler):
  598. with handler() as rh:
  599. res = validate_and_send(
  600. rh, Request(f'http://127.0.0.1:{self.http_port}/headers'))
  601. assert res.readable()
  602. assert res.read(1) == b'H'
  603. assert res.read(3) == b'ost'
  604. assert res.read().decode().endswith('\n\n')
  605. assert res.read() == b''
  606. def test_request_disable_proxy(self, handler):
  607. for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
  608. # Given the handler is configured with a proxy
  609. with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
  610. # When a proxy is explicitly set to None for the request
  611. res = validate_and_send(
  612. rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'http': None}))
  613. # Then no proxy should be used
  614. res.close()
  615. assert res.status == 200
  616. @pytest.mark.skip_handlers_if(
  617. lambda _, handler: Features.NO_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support NO_PROXY')
  618. def test_noproxy(self, handler):
  619. for proxy_proto in handler._SUPPORTED_PROXY_SCHEMES or ['http']:
  620. # Given the handler is configured with a proxy
  621. with handler(proxies={'http': f'{proxy_proto}://10.255.255.255'}, timeout=5) as rh:
  622. for no_proxy in (f'127.0.0.1:{self.http_port}', '127.0.0.1', 'localhost'):
  623. # When request no proxy includes the request url host
  624. nop_response = validate_and_send(
  625. rh, Request(f'http://127.0.0.1:{self.http_port}/headers', proxies={'no': no_proxy}))
  626. # Then the proxy should not be used
  627. assert nop_response.status == 200
  628. nop_response.close()
  629. @pytest.mark.skip_handlers_if(
  630. lambda _, handler: Features.ALL_PROXY not in handler._SUPPORTED_FEATURES, 'handler does not support ALL_PROXY')
  631. def test_allproxy(self, handler):
  632. # This is a bit of a hacky test, but it should be enough to check whether the handler is using the proxy.
  633. # 0.1s might not be enough of a timeout if proxy is not used in all cases, but should still get failures.
  634. with handler(proxies={'all': 'http://10.255.255.255'}, timeout=0.1) as rh:
  635. with pytest.raises(TransportError):
  636. validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/headers')).close()
  637. with handler(timeout=0.1) as rh:
  638. with pytest.raises(TransportError):
  639. validate_and_send(
  640. rh, Request(
  641. f'http://127.0.0.1:{self.http_port}/headers', proxies={'all': 'http://10.255.255.255'})).close()
  642. @pytest.mark.skip_handlers_if(lambda _, handler: handler not in ['Urllib', 'CurlCFFI'], 'handler does not support keep_header_casing')
  643. def test_keep_header_casing(self, handler):
  644. with handler() as rh:
  645. res = validate_and_send(
  646. rh, Request(
  647. f'http://127.0.0.1:{self.http_port}/headers', headers={'X-test-heaDer': 'test'}, extensions={'keep_header_casing': True})).read().decode()
  648. assert 'X-test-heaDer: test' in res
  649. @pytest.mark.parametrize('handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
  650. class TestClientCertificate:
  651. @classmethod
  652. def setup_class(cls):
  653. certfn = os.path.join(TEST_DIR, 'testcert.pem')
  654. cls.certdir = os.path.join(TEST_DIR, 'testdata', 'certificate')
  655. cacertfn = os.path.join(cls.certdir, 'ca.crt')
  656. cls.httpd = http.server.ThreadingHTTPServer(('127.0.0.1', 0), HTTPTestRequestHandler)
  657. sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
  658. sslctx.verify_mode = ssl.CERT_REQUIRED
  659. sslctx.load_verify_locations(cafile=cacertfn)
  660. sslctx.load_cert_chain(certfn, None)
  661. cls.httpd.socket = sslctx.wrap_socket(cls.httpd.socket, server_side=True)
  662. cls.port = http_server_port(cls.httpd)
  663. cls.server_thread = threading.Thread(target=cls.httpd.serve_forever)
  664. cls.server_thread.daemon = True
  665. cls.server_thread.start()
  666. def _run_test(self, handler, **handler_kwargs):
  667. with handler(
  668. # Disable client-side validation of unacceptable self-signed testcert.pem
  669. # The test is of a check on the server side, so unaffected
  670. verify=False,
  671. **handler_kwargs,
  672. ) as rh:
  673. validate_and_send(rh, Request(f'https://127.0.0.1:{self.port}/video.html')).read().decode()
  674. def test_certificate_combined_nopass(self, handler):
  675. self._run_test(handler, client_cert={
  676. 'client_certificate': os.path.join(self.certdir, 'clientwithkey.crt'),
  677. })
  678. def test_certificate_nocombined_nopass(self, handler):
  679. self._run_test(handler, client_cert={
  680. 'client_certificate': os.path.join(self.certdir, 'client.crt'),
  681. 'client_certificate_key': os.path.join(self.certdir, 'client.key'),
  682. })
  683. def test_certificate_combined_pass(self, handler):
  684. self._run_test(handler, client_cert={
  685. 'client_certificate': os.path.join(self.certdir, 'clientwithencryptedkey.crt'),
  686. 'client_certificate_password': 'foobar',
  687. })
  688. def test_certificate_nocombined_pass(self, handler):
  689. self._run_test(handler, client_cert={
  690. 'client_certificate': os.path.join(self.certdir, 'client.crt'),
  691. 'client_certificate_key': os.path.join(self.certdir, 'clientencrypted.key'),
  692. 'client_certificate_password': 'foobar',
  693. })
  694. @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
  695. class TestHTTPImpersonateRequestHandler(TestRequestHandlerBase):
  696. def test_supported_impersonate_targets(self, handler):
  697. with handler(headers=std_headers) as rh:
  698. # note: this assumes the impersonate request handler supports the impersonate extension
  699. for target in rh.supported_targets:
  700. res = validate_and_send(rh, Request(
  701. f'http://127.0.0.1:{self.http_port}/headers', extensions={'impersonate': target}))
  702. assert res.status == 200
  703. assert std_headers['user-agent'].lower() not in res.read().decode().lower()
  704. def test_response_extensions(self, handler):
  705. with handler() as rh:
  706. for target in rh.supported_targets:
  707. request = Request(
  708. f'http://127.0.0.1:{self.http_port}/gen_200', extensions={'impersonate': target})
  709. res = validate_and_send(rh, request)
  710. assert res.extensions['impersonate'] == rh._get_request_target(request)
  711. def test_http_error_response_extensions(self, handler):
  712. with handler() as rh:
  713. for target in rh.supported_targets:
  714. request = Request(
  715. f'http://127.0.0.1:{self.http_port}/gen_404', extensions={'impersonate': target})
  716. try:
  717. validate_and_send(rh, request)
  718. except HTTPError as e:
  719. res = e.response
  720. assert res.extensions['impersonate'] == rh._get_request_target(request)
  721. class TestRequestHandlerMisc:
  722. """Misc generic tests for request handlers, not related to request or validation testing"""
  723. @pytest.mark.parametrize('handler,logger_name', [
  724. ('Requests', 'urllib3'),
  725. ('Websockets', 'websockets.client'),
  726. ('Websockets', 'websockets.server'),
  727. ], indirect=['handler'])
  728. def test_remove_logging_handler(self, handler, logger_name):
  729. # Ensure any logging handlers, which may contain a YoutubeDL instance,
  730. # are removed when we close the request handler
  731. # See: https://github.com/yt-dlp/yt-dlp/issues/8922
  732. logging_handlers = logging.getLogger(logger_name).handlers
  733. before_count = len(logging_handlers)
  734. rh = handler()
  735. assert len(logging_handlers) == before_count + 1
  736. rh.close()
  737. assert len(logging_handlers) == before_count
  738. def test_wrap_request_errors(self):
  739. class TestRequestHandler(RequestHandler):
  740. def _validate(self, request):
  741. if request.headers.get('x-fail'):
  742. raise UnsupportedRequest('test error')
  743. def _send(self, request: Request):
  744. raise RequestError('test error')
  745. with TestRequestHandler(logger=FakeLogger()) as rh:
  746. with pytest.raises(UnsupportedRequest, match='test error') as exc_info:
  747. rh.validate(Request('http://example.com', headers={'x-fail': '1'}))
  748. assert exc_info.value.handler is rh
  749. with pytest.raises(RequestError, match='test error') as exc_info:
  750. rh.send(Request('http://example.com'))
  751. assert exc_info.value.handler is rh
  752. @pytest.mark.parametrize('handler', ['Urllib'], indirect=True)
  753. class TestUrllibRequestHandler(TestRequestHandlerBase):
  754. def test_file_urls(self, handler):
  755. # See https://github.com/ytdl-org/youtube-dl/issues/8227
  756. tf = tempfile.NamedTemporaryFile(delete=False)
  757. tf.write(b'foobar')
  758. tf.close()
  759. req = Request(pathlib.Path(tf.name).as_uri())
  760. with handler() as rh:
  761. with pytest.raises(UnsupportedRequest):
  762. rh.validate(req)
  763. # Test that urllib never loaded FileHandler
  764. with pytest.raises(TransportError):
  765. rh.send(req)
  766. with handler(enable_file_urls=True) as rh:
  767. res = validate_and_send(rh, req)
  768. assert res.read() == b'foobar'
  769. res.close()
  770. os.unlink(tf.name)
  771. def test_http_error_returns_content(self, handler):
  772. # urllib HTTPError will try close the underlying response if reference to the HTTPError object is lost
  773. def get_response():
  774. with handler() as rh:
  775. # headers url
  776. try:
  777. validate_and_send(rh, Request(f'http://127.0.0.1:{self.http_port}/gen_404'))
  778. except HTTPError as e:
  779. return e.response
  780. assert get_response().read() == b'<html></html>'
  781. def test_verify_cert_error_text(self, handler):
  782. # Check the output of the error message
  783. with handler() as rh:
  784. with pytest.raises(
  785. CertificateVerifyError,
  786. match=r'\[SSL: CERTIFICATE_VERIFY_FAILED\] certificate verify failed: self.signed certificate',
  787. ):
  788. validate_and_send(rh, Request(f'https://127.0.0.1:{self.https_port}/headers'))
  789. @pytest.mark.parametrize('req,match,version_check', [
  790. # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1256
  791. # bpo-39603: Check implemented in 3.7.9+, 3.8.5+
  792. (
  793. Request('http://127.0.0.1', method='GET\n'),
  794. 'method can\'t contain control characters',
  795. lambda v: v < (3, 7, 9) or (3, 8, 0) <= v < (3, 8, 5),
  796. ),
  797. # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1265
  798. # bpo-38576: Check implemented in 3.7.8+, 3.8.3+
  799. (
  800. Request('http://127.0.0. 1', method='GET'),
  801. 'URL can\'t contain control characters',
  802. lambda v: v < (3, 7, 8) or (3, 8, 0) <= v < (3, 8, 3),
  803. ),
  804. # https://github.com/python/cpython/blob/987b712b4aeeece336eed24fcc87a950a756c3e2/Lib/http/client.py#L1288C31-L1288C50
  805. (Request('http://127.0.0.1', headers={'foo\n': 'bar'}), 'Invalid header name', None),
  806. ])
  807. def test_httplib_validation_errors(self, handler, req, match, version_check):
  808. if version_check and version_check(sys.version_info):
  809. pytest.skip(f'Python {sys.version} version does not have the required validation for this test.')
  810. with handler() as rh:
  811. with pytest.raises(RequestError, match=match) as exc_info:
  812. validate_and_send(rh, req)
  813. assert not isinstance(exc_info.value, TransportError)
  814. @pytest.mark.parametrize('handler', ['Requests'], indirect=True)
  815. class TestRequestsRequestHandler(TestRequestHandlerBase):
  816. @pytest.mark.parametrize('raised,expected', [
  817. (lambda: requests.exceptions.ConnectTimeout(), TransportError),
  818. (lambda: requests.exceptions.ReadTimeout(), TransportError),
  819. (lambda: requests.exceptions.Timeout(), TransportError),
  820. (lambda: requests.exceptions.ConnectionError(), TransportError),
  821. (lambda: requests.exceptions.ProxyError(), ProxyError),
  822. (lambda: requests.exceptions.SSLError('12[CERTIFICATE_VERIFY_FAILED]34'), CertificateVerifyError),
  823. (lambda: requests.exceptions.SSLError(), SSLError),
  824. (lambda: requests.exceptions.InvalidURL(), RequestError),
  825. (lambda: requests.exceptions.InvalidHeader(), RequestError),
  826. # catch-all: https://github.com/psf/requests/blob/main/src/requests/adapters.py#L535
  827. (lambda: urllib3.exceptions.HTTPError(), TransportError),
  828. (lambda: requests.exceptions.RequestException(), RequestError),
  829. # (lambda: requests.exceptions.TooManyRedirects(), HTTPError) - Needs a response object
  830. ])
  831. def test_request_error_mapping(self, handler, monkeypatch, raised, expected):
  832. with handler() as rh:
  833. def mock_get_instance(*args, **kwargs):
  834. class MockSession:
  835. def request(self, *args, **kwargs):
  836. raise raised()
  837. return MockSession()
  838. monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
  839. with pytest.raises(expected) as exc_info:
  840. rh.send(Request('http://fake'))
  841. assert exc_info.type is expected
  842. @pytest.mark.parametrize('raised,expected,match', [
  843. (lambda: urllib3.exceptions.SSLError(), SSLError, None),
  844. (lambda: urllib3.exceptions.TimeoutError(), TransportError, None),
  845. (lambda: urllib3.exceptions.ReadTimeoutError(None, None, None), TransportError, None),
  846. (lambda: urllib3.exceptions.ProtocolError(), TransportError, None),
  847. (lambda: urllib3.exceptions.DecodeError(), TransportError, None),
  848. (lambda: urllib3.exceptions.HTTPError(), TransportError, None), # catch-all
  849. (
  850. lambda: urllib3.exceptions.ProtocolError('error', http.client.IncompleteRead(partial=b'abc', expected=4)),
  851. IncompleteRead,
  852. '3 bytes read, 4 more expected',
  853. ),
  854. (
  855. lambda: urllib3.exceptions.ProtocolError('error', urllib3.exceptions.IncompleteRead(partial=3, expected=5)),
  856. IncompleteRead,
  857. '3 bytes read, 5 more expected',
  858. ),
  859. ])
  860. def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
  861. from requests.models import Response as RequestsResponse
  862. from urllib3.response import HTTPResponse as Urllib3Response
  863. from yt_dlp.networking._requests import RequestsResponseAdapter
  864. requests_res = RequestsResponse()
  865. requests_res.raw = Urllib3Response(body=b'', status=200)
  866. res = RequestsResponseAdapter(requests_res)
  867. def mock_read(*args, **kwargs):
  868. raise raised()
  869. monkeypatch.setattr(res.fp, 'read', mock_read)
  870. with pytest.raises(expected, match=match) as exc_info:
  871. res.read()
  872. assert exc_info.type is expected
  873. def test_close(self, handler, monkeypatch):
  874. rh = handler()
  875. session = rh._get_instance(cookiejar=rh.cookiejar)
  876. called = False
  877. original_close = session.close
  878. def mock_close(*args, **kwargs):
  879. nonlocal called
  880. called = True
  881. return original_close(*args, **kwargs)
  882. monkeypatch.setattr(session, 'close', mock_close)
  883. rh.close()
  884. assert called
  885. @pytest.mark.parametrize('handler', ['CurlCFFI'], indirect=True)
  886. class TestCurlCFFIRequestHandler(TestRequestHandlerBase):
  887. @pytest.mark.parametrize('params,extensions', [
  888. ({'impersonate': ImpersonateTarget('chrome', '110')}, {}),
  889. ({'impersonate': ImpersonateTarget('chrome', '99')}, {'impersonate': ImpersonateTarget('chrome', '110')}),
  890. ])
  891. def test_impersonate(self, handler, params, extensions):
  892. with handler(headers=std_headers, **params) as rh:
  893. res = validate_and_send(
  894. rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions=extensions)).read().decode()
  895. assert 'sec-ch-ua: "Chromium";v="110"' in res
  896. # Check that user agent is added over ours
  897. assert 'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36' in res
  898. def test_headers(self, handler):
  899. with handler(headers=std_headers) as rh:
  900. # Ensure curl-impersonate overrides our standard headers (usually added
  901. res = validate_and_send(
  902. rh, Request(f'http://127.0.0.1:{self.http_port}/headers', extensions={
  903. 'impersonate': ImpersonateTarget('safari')}, headers={'x-custom': 'test', 'sec-fetch-mode': 'custom'})).read().decode().lower()
  904. assert std_headers['user-agent'].lower() not in res
  905. assert std_headers['accept-language'].lower() not in res
  906. assert std_headers['sec-fetch-mode'].lower() not in res
  907. # other than UA, custom headers that differ from std_headers should be kept
  908. assert 'sec-fetch-mode: custom' in res
  909. assert 'x-custom: test' in res
  910. # but when not impersonating don't remove std_headers
  911. res = validate_and_send(
  912. rh, Request(f'http://127.0.0.1:{self.http_port}/headers', headers={'x-custom': 'test'})).read().decode().lower()
  913. # std_headers should be present
  914. for k, v in std_headers.items():
  915. assert f'{k}: {v}'.lower() in res
  916. @pytest.mark.parametrize('raised,expected,match', [
  917. (lambda: curl_cffi.requests.errors.RequestsError(
  918. '', code=curl_cffi.const.CurlECode.PARTIAL_FILE), IncompleteRead, None),
  919. (lambda: curl_cffi.requests.errors.RequestsError(
  920. '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
  921. (lambda: curl_cffi.requests.errors.RequestsError(
  922. '', code=curl_cffi.const.CurlECode.RECV_ERROR), TransportError, None),
  923. ])
  924. def test_response_error_mapping(self, handler, monkeypatch, raised, expected, match):
  925. import curl_cffi.requests
  926. from yt_dlp.networking._curlcffi import CurlCFFIResponseAdapter
  927. curl_res = curl_cffi.requests.Response()
  928. res = CurlCFFIResponseAdapter(curl_res)
  929. def mock_read(*args, **kwargs):
  930. try:
  931. raise raised()
  932. except Exception as e:
  933. e.response = curl_res
  934. raise
  935. monkeypatch.setattr(res.fp, 'read', mock_read)
  936. with pytest.raises(expected, match=match) as exc_info:
  937. res.read()
  938. assert exc_info.type is expected
  939. @pytest.mark.parametrize('raised,expected,match', [
  940. (lambda: curl_cffi.requests.errors.RequestsError(
  941. '', code=curl_cffi.const.CurlECode.OPERATION_TIMEDOUT), TransportError, None),
  942. (lambda: curl_cffi.requests.errors.RequestsError(
  943. '', code=curl_cffi.const.CurlECode.PEER_FAILED_VERIFICATION), CertificateVerifyError, None),
  944. (lambda: curl_cffi.requests.errors.RequestsError(
  945. '', code=curl_cffi.const.CurlECode.SSL_CONNECT_ERROR), SSLError, None),
  946. (lambda: curl_cffi.requests.errors.RequestsError(
  947. '', code=curl_cffi.const.CurlECode.TOO_MANY_REDIRECTS), HTTPError, None),
  948. (lambda: curl_cffi.requests.errors.RequestsError(
  949. '', code=curl_cffi.const.CurlECode.PROXY), ProxyError, None),
  950. ])
  951. def test_request_error_mapping(self, handler, monkeypatch, raised, expected, match):
  952. import curl_cffi.requests
  953. curl_res = curl_cffi.requests.Response()
  954. curl_res.status_code = 301
  955. with handler() as rh:
  956. original_get_instance = rh._get_instance
  957. def mock_get_instance(*args, **kwargs):
  958. instance = original_get_instance(*args, **kwargs)
  959. def request(*_, **__):
  960. try:
  961. raise raised()
  962. except Exception as e:
  963. e.response = curl_res
  964. raise
  965. monkeypatch.setattr(instance, 'request', request)
  966. return instance
  967. monkeypatch.setattr(rh, '_get_instance', mock_get_instance)
  968. with pytest.raises(expected) as exc_info:
  969. rh.send(Request('http://fake'))
  970. assert exc_info.type is expected
  971. def test_response_reader(self, handler):
  972. class FakeResponse:
  973. def __init__(self, raise_error=False):
  974. self.raise_error = raise_error
  975. self.closed = False
  976. def iter_content(self):
  977. yield b'foo'
  978. yield b'bar'
  979. yield b'z'
  980. if self.raise_error:
  981. raise Exception('test')
  982. def close(self):
  983. self.closed = True
  984. from yt_dlp.networking._curlcffi import CurlCFFIResponseReader
  985. res = CurlCFFIResponseReader(FakeResponse())
  986. assert res.readable
  987. assert res.bytes_read == 0
  988. assert res.read(1) == b'f'
  989. assert res.bytes_read == 3
  990. assert res._buffer == b'oo'
  991. assert res.read(2) == b'oo'
  992. assert res.bytes_read == 3
  993. assert res._buffer == b''
  994. assert res.read(2) == b'ba'
  995. assert res.bytes_read == 6
  996. assert res._buffer == b'r'
  997. assert res.read(3) == b'rz'
  998. assert res.bytes_read == 7
  999. assert res._buffer == b''
  1000. assert res.closed
  1001. assert res._response.closed
  1002. # should handle no size param
  1003. res2 = CurlCFFIResponseReader(FakeResponse())
  1004. assert res2.read() == b'foobarz'
  1005. assert res2.bytes_read == 7
  1006. assert res2._buffer == b''
  1007. assert res2.closed
  1008. # should close on an exception
  1009. res3 = CurlCFFIResponseReader(FakeResponse(raise_error=True))
  1010. with pytest.raises(Exception, match='test'):
  1011. res3.read()
  1012. assert res3._buffer == b''
  1013. assert res3.bytes_read == 7
  1014. assert res3.closed
  1015. # buffer should be cleared on close
  1016. res4 = CurlCFFIResponseReader(FakeResponse())
  1017. res4.read(2)
  1018. assert res4._buffer == b'o'
  1019. res4.close()
  1020. assert res4.closed
  1021. assert res4._buffer == b''
  1022. def run_validation(handler, error, req, **handler_kwargs):
  1023. with handler(**handler_kwargs) as rh:
  1024. if error:
  1025. with pytest.raises(error):
  1026. rh.validate(req)
  1027. else:
  1028. rh.validate(req)
  1029. class TestRequestHandlerValidation:
  1030. class ValidationRH(RequestHandler):
  1031. def _send(self, request):
  1032. raise RequestError('test')
  1033. class NoCheckRH(ValidationRH):
  1034. _SUPPORTED_FEATURES = None
  1035. _SUPPORTED_PROXY_SCHEMES = None
  1036. _SUPPORTED_URL_SCHEMES = None
  1037. def _check_extensions(self, extensions):
  1038. extensions.clear()
  1039. class HTTPSupportedRH(ValidationRH):
  1040. _SUPPORTED_URL_SCHEMES = ('http',)
  1041. URL_SCHEME_TESTS = [
  1042. # scheme, expected to fail, handler kwargs
  1043. ('Urllib', [
  1044. ('http', False, {}),
  1045. ('https', False, {}),
  1046. ('data', False, {}),
  1047. ('ftp', False, {}),
  1048. ('file', UnsupportedRequest, {}),
  1049. ('file', False, {'enable_file_urls': True}),
  1050. ]),
  1051. ('Requests', [
  1052. ('http', False, {}),
  1053. ('https', False, {}),
  1054. ]),
  1055. ('Websockets', [
  1056. ('ws', False, {}),
  1057. ('wss', False, {}),
  1058. ]),
  1059. ('CurlCFFI', [
  1060. ('http', False, {}),
  1061. ('https', False, {}),
  1062. ]),
  1063. (NoCheckRH, [('http', False, {})]),
  1064. (ValidationRH, [('http', UnsupportedRequest, {})]),
  1065. ]
  1066. PROXY_SCHEME_TESTS = [
  1067. # proxy scheme, expected to fail
  1068. ('Urllib', 'http', [
  1069. ('http', False),
  1070. ('https', UnsupportedRequest),
  1071. ('socks4', False),
  1072. ('socks4a', False),
  1073. ('socks5', False),
  1074. ('socks5h', False),
  1075. ('socks', UnsupportedRequest),
  1076. ]),
  1077. ('Requests', 'http', [
  1078. ('http', False),
  1079. ('https', False),
  1080. ('socks4', False),
  1081. ('socks4a', False),
  1082. ('socks5', False),
  1083. ('socks5h', False),
  1084. ]),
  1085. ('CurlCFFI', 'http', [
  1086. ('http', False),
  1087. ('https', False),
  1088. ('socks4', False),
  1089. ('socks4a', False),
  1090. ('socks5', False),
  1091. ('socks5h', False),
  1092. ]),
  1093. ('Websockets', 'ws', [
  1094. ('http', UnsupportedRequest),
  1095. ('https', UnsupportedRequest),
  1096. ('socks4', False),
  1097. ('socks4a', False),
  1098. ('socks5', False),
  1099. ('socks5h', False),
  1100. ]),
  1101. (NoCheckRH, 'http', [('http', False)]),
  1102. (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
  1103. (NoCheckRH, 'http', [('http', False)]),
  1104. (HTTPSupportedRH, 'http', [('http', UnsupportedRequest)]),
  1105. ]
  1106. PROXY_KEY_TESTS = [
  1107. # proxy key, proxy scheme, expected to fail
  1108. ('Urllib', 'http', [
  1109. ('all', 'http', False),
  1110. ('unrelated', 'http', False),
  1111. ]),
  1112. ('Requests', 'http', [
  1113. ('all', 'http', False),
  1114. ('unrelated', 'http', False),
  1115. ]),
  1116. ('CurlCFFI', 'http', [
  1117. ('all', 'http', False),
  1118. ('unrelated', 'http', False),
  1119. ]),
  1120. ('Websockets', 'ws', [
  1121. ('all', 'socks5', False),
  1122. ('unrelated', 'socks5', False),
  1123. ]),
  1124. (NoCheckRH, 'http', [('all', 'http', False)]),
  1125. (HTTPSupportedRH, 'http', [('all', 'http', UnsupportedRequest)]),
  1126. (HTTPSupportedRH, 'http', [('no', 'http', UnsupportedRequest)]),
  1127. ]
  1128. EXTENSION_TESTS = [
  1129. ('Urllib', 'http', [
  1130. ({'cookiejar': 'notacookiejar'}, AssertionError),
  1131. ({'cookiejar': YoutubeDLCookieJar()}, False),
  1132. ({'cookiejar': CookieJar()}, AssertionError),
  1133. ({'timeout': 1}, False),
  1134. ({'timeout': 'notatimeout'}, AssertionError),
  1135. ({'unsupported': 'value'}, UnsupportedRequest),
  1136. ({'legacy_ssl': False}, False),
  1137. ({'legacy_ssl': True}, False),
  1138. ({'legacy_ssl': 'notabool'}, AssertionError),
  1139. ({'keep_header_casing': True}, UnsupportedRequest),
  1140. ]),
  1141. ('Requests', 'http', [
  1142. ({'cookiejar': 'notacookiejar'}, AssertionError),
  1143. ({'cookiejar': YoutubeDLCookieJar()}, False),
  1144. ({'timeout': 1}, False),
  1145. ({'timeout': 'notatimeout'}, AssertionError),
  1146. ({'unsupported': 'value'}, UnsupportedRequest),
  1147. ({'legacy_ssl': False}, False),
  1148. ({'legacy_ssl': True}, False),
  1149. ({'legacy_ssl': 'notabool'}, AssertionError),
  1150. ({'keep_header_casing': False}, False),
  1151. ({'keep_header_casing': True}, False),
  1152. ({'keep_header_casing': 'notabool'}, AssertionError),
  1153. ]),
  1154. ('CurlCFFI', 'http', [
  1155. ({'cookiejar': 'notacookiejar'}, AssertionError),
  1156. ({'cookiejar': YoutubeDLCookieJar()}, False),
  1157. ({'timeout': 1}, False),
  1158. ({'timeout': 'notatimeout'}, AssertionError),
  1159. ({'unsupported': 'value'}, UnsupportedRequest),
  1160. ({'impersonate': ImpersonateTarget('badtarget', None, None, None)}, UnsupportedRequest),
  1161. ({'impersonate': 123}, AssertionError),
  1162. ({'impersonate': ImpersonateTarget('chrome', None, None, None)}, False),
  1163. ({'impersonate': ImpersonateTarget(None, None, None, None)}, False),
  1164. ({'impersonate': ImpersonateTarget()}, False),
  1165. ({'impersonate': 'chrome'}, AssertionError),
  1166. ({'legacy_ssl': False}, False),
  1167. ({'legacy_ssl': True}, False),
  1168. ({'legacy_ssl': 'notabool'}, AssertionError),
  1169. ]),
  1170. (NoCheckRH, 'http', [
  1171. ({'cookiejar': 'notacookiejar'}, False),
  1172. ({'somerandom': 'test'}, False), # but any extension is allowed through
  1173. ]),
  1174. ('Websockets', 'ws', [
  1175. ({'cookiejar': YoutubeDLCookieJar()}, False),
  1176. ({'timeout': 2}, False),
  1177. ({'legacy_ssl': False}, False),
  1178. ({'legacy_ssl': True}, False),
  1179. ({'legacy_ssl': 'notabool'}, AssertionError),
  1180. ]),
  1181. ]
  1182. @pytest.mark.parametrize('handler,fail,scheme', [
  1183. ('Urllib', False, 'http'),
  1184. ('Requests', False, 'http'),
  1185. ('CurlCFFI', False, 'http'),
  1186. ('Websockets', False, 'ws'),
  1187. ], indirect=['handler'])
  1188. def test_no_proxy(self, handler, fail, scheme):
  1189. run_validation(handler, fail, Request(f'{scheme}://', proxies={'no': '127.0.0.1,github.com'}))
  1190. run_validation(handler, fail, Request(f'{scheme}://'), proxies={'no': '127.0.0.1,github.com'})
  1191. @pytest.mark.parametrize('handler,scheme', [
  1192. ('Urllib', 'http'),
  1193. (HTTPSupportedRH, 'http'),
  1194. ('Requests', 'http'),
  1195. ('CurlCFFI', 'http'),
  1196. ('Websockets', 'ws'),
  1197. ], indirect=['handler'])
  1198. def test_empty_proxy(self, handler, scheme):
  1199. run_validation(handler, False, Request(f'{scheme}://', proxies={scheme: None}))
  1200. run_validation(handler, False, Request(f'{scheme}://'), proxies={scheme: None})
  1201. @pytest.mark.parametrize('proxy_url', ['//example.com', 'example.com', '127.0.0.1', '/a/b/c'])
  1202. @pytest.mark.parametrize('handler,scheme', [
  1203. ('Urllib', 'http'),
  1204. (HTTPSupportedRH, 'http'),
  1205. ('Requests', 'http'),
  1206. ('CurlCFFI', 'http'),
  1207. ('Websockets', 'ws'),
  1208. ], indirect=['handler'])
  1209. def test_invalid_proxy_url(self, handler, scheme, proxy_url):
  1210. run_validation(handler, UnsupportedRequest, Request(f'{scheme}://', proxies={scheme: proxy_url}))
  1211. @pytest.mark.parametrize('handler,scheme,fail,handler_kwargs', [
  1212. (handler_tests[0], scheme, fail, handler_kwargs)
  1213. for handler_tests in URL_SCHEME_TESTS
  1214. for scheme, fail, handler_kwargs in handler_tests[1]
  1215. ], indirect=['handler'])
  1216. def test_url_scheme(self, handler, scheme, fail, handler_kwargs):
  1217. run_validation(handler, fail, Request(f'{scheme}://'), **(handler_kwargs or {}))
  1218. @pytest.mark.parametrize('handler,scheme,proxy_key,proxy_scheme,fail', [
  1219. (handler_tests[0], handler_tests[1], proxy_key, proxy_scheme, fail)
  1220. for handler_tests in PROXY_KEY_TESTS
  1221. for proxy_key, proxy_scheme, fail in handler_tests[2]
  1222. ], indirect=['handler'])
  1223. def test_proxy_key(self, handler, scheme, proxy_key, proxy_scheme, fail):
  1224. run_validation(handler, fail, Request(f'{scheme}://', proxies={proxy_key: f'{proxy_scheme}://example.com'}))
  1225. run_validation(handler, fail, Request(f'{scheme}://'), proxies={proxy_key: f'{proxy_scheme}://example.com'})
  1226. @pytest.mark.parametrize('handler,req_scheme,scheme,fail', [
  1227. (handler_tests[0], handler_tests[1], scheme, fail)
  1228. for handler_tests in PROXY_SCHEME_TESTS
  1229. for scheme, fail in handler_tests[2]
  1230. ], indirect=['handler'])
  1231. def test_proxy_scheme(self, handler, req_scheme, scheme, fail):
  1232. run_validation(handler, fail, Request(f'{req_scheme}://', proxies={req_scheme: f'{scheme}://example.com'}))
  1233. run_validation(handler, fail, Request(f'{req_scheme}://'), proxies={req_scheme: f'{scheme}://example.com'})
  1234. @pytest.mark.parametrize('handler,scheme,extensions,fail', [
  1235. (handler_tests[0], handler_tests[1], extensions, fail)
  1236. for handler_tests in EXTENSION_TESTS
  1237. for extensions, fail in handler_tests[2]
  1238. ], indirect=['handler'])
  1239. def test_extension(self, handler, scheme, extensions, fail):
  1240. run_validation(
  1241. handler, fail, Request(f'{scheme}://', extensions=extensions))
  1242. def test_invalid_request_type(self):
  1243. rh = self.ValidationRH(logger=FakeLogger())
  1244. for method in (rh.validate, rh.send):
  1245. with pytest.raises(TypeError, match='Expected an instance of Request'):
  1246. method('not a request')
  1247. class FakeResponse(Response):
  1248. def __init__(self, request):
  1249. # XXX: we could make request part of standard response interface
  1250. self.request = request
  1251. super().__init__(fp=io.BytesIO(b''), headers={}, url=request.url)
  1252. class FakeRH(RequestHandler):
  1253. def __init__(self, *args, **params):
  1254. self.params = params
  1255. super().__init__(*args, **params)
  1256. def _validate(self, request):
  1257. return
  1258. def _send(self, request: Request):
  1259. if request.url.startswith('ssl://'):
  1260. raise SSLError(request.url[len('ssl://'):])
  1261. return FakeResponse(request)
  1262. class FakeRHYDL(FakeYDL):
  1263. def __init__(self, *args, **kwargs):
  1264. super().__init__(*args, **kwargs)
  1265. self._request_director = self.build_request_director([FakeRH])
  1266. class AllUnsupportedRHYDL(FakeYDL):
  1267. def __init__(self, *args, **kwargs):
  1268. class UnsupportedRH(RequestHandler):
  1269. def _send(self, request: Request):
  1270. pass
  1271. _SUPPORTED_FEATURES = ()
  1272. _SUPPORTED_PROXY_SCHEMES = ()
  1273. _SUPPORTED_URL_SCHEMES = ()
  1274. super().__init__(*args, **kwargs)
  1275. self._request_director = self.build_request_director([UnsupportedRH])
  1276. class TestRequestDirector:
  1277. def test_handler_operations(self):
  1278. director = RequestDirector(logger=FakeLogger())
  1279. handler = FakeRH(logger=FakeLogger())
  1280. director.add_handler(handler)
  1281. assert director.handlers.get(FakeRH.RH_KEY) is handler
  1282. # Handler should overwrite
  1283. handler2 = FakeRH(logger=FakeLogger())
  1284. director.add_handler(handler2)
  1285. assert director.handlers.get(FakeRH.RH_KEY) is not handler
  1286. assert director.handlers.get(FakeRH.RH_KEY) is handler2
  1287. assert len(director.handlers) == 1
  1288. class AnotherFakeRH(FakeRH):
  1289. pass
  1290. director.add_handler(AnotherFakeRH(logger=FakeLogger()))
  1291. assert len(director.handlers) == 2
  1292. assert director.handlers.get(AnotherFakeRH.RH_KEY).RH_KEY == AnotherFakeRH.RH_KEY
  1293. director.handlers.pop(FakeRH.RH_KEY, None)
  1294. assert director.handlers.get(FakeRH.RH_KEY) is None
  1295. assert len(director.handlers) == 1
  1296. # RequestErrors should passthrough
  1297. with pytest.raises(SSLError):
  1298. director.send(Request('ssl://something'))
  1299. def test_send(self):
  1300. director = RequestDirector(logger=FakeLogger())
  1301. with pytest.raises(RequestError):
  1302. director.send(Request('any://'))
  1303. director.add_handler(FakeRH(logger=FakeLogger()))
  1304. assert isinstance(director.send(Request('http://')), FakeResponse)
  1305. def test_unsupported_handlers(self):
  1306. class SupportedRH(RequestHandler):
  1307. _SUPPORTED_URL_SCHEMES = ['http']
  1308. def _send(self, request: Request):
  1309. return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
  1310. director = RequestDirector(logger=FakeLogger())
  1311. director.add_handler(SupportedRH(logger=FakeLogger()))
  1312. director.add_handler(FakeRH(logger=FakeLogger()))
  1313. # First should take preference
  1314. assert director.send(Request('http://')).read() == b'supported'
  1315. assert director.send(Request('any://')).read() == b''
  1316. director.handlers.pop(FakeRH.RH_KEY)
  1317. with pytest.raises(NoSupportingHandlers):
  1318. director.send(Request('any://'))
  1319. def test_unexpected_error(self):
  1320. director = RequestDirector(logger=FakeLogger())
  1321. class UnexpectedRH(FakeRH):
  1322. def _send(self, request: Request):
  1323. raise TypeError('something')
  1324. director.add_handler(UnexpectedRH(logger=FakeLogger))
  1325. with pytest.raises(NoSupportingHandlers, match=r'1 unexpected error'):
  1326. director.send(Request('any://'))
  1327. director.handlers.clear()
  1328. assert len(director.handlers) == 0
  1329. # Should not be fatal
  1330. director.add_handler(FakeRH(logger=FakeLogger()))
  1331. director.add_handler(UnexpectedRH(logger=FakeLogger))
  1332. assert director.send(Request('any://'))
  1333. def test_preference(self):
  1334. director = RequestDirector(logger=FakeLogger())
  1335. director.add_handler(FakeRH(logger=FakeLogger()))
  1336. class SomeRH(RequestHandler):
  1337. _SUPPORTED_URL_SCHEMES = ['http']
  1338. def _send(self, request: Request):
  1339. return Response(fp=io.BytesIO(b'supported'), headers={}, url=request.url)
  1340. def some_preference(rh, request):
  1341. return (0 if not isinstance(rh, SomeRH)
  1342. else 100 if 'prefer' in request.headers
  1343. else -1)
  1344. director.add_handler(SomeRH(logger=FakeLogger()))
  1345. director.preferences.add(some_preference)
  1346. assert director.send(Request('http://')).read() == b''
  1347. assert director.send(Request('http://', headers={'prefer': '1'})).read() == b'supported'
  1348. def test_close(self, monkeypatch):
  1349. director = RequestDirector(logger=FakeLogger())
  1350. director.add_handler(FakeRH(logger=FakeLogger()))
  1351. called = False
  1352. def mock_close(*args, **kwargs):
  1353. nonlocal called
  1354. called = True
  1355. monkeypatch.setattr(director.handlers[FakeRH.RH_KEY], 'close', mock_close)
  1356. director.close()
  1357. assert called
  1358. # XXX: do we want to move this to test_YoutubeDL.py?
  1359. class TestYoutubeDLNetworking:
  1360. @staticmethod
  1361. def build_handler(ydl, handler: RequestHandler = FakeRH):
  1362. return ydl.build_request_director([handler]).handlers.get(handler.RH_KEY)
  1363. def test_compat_opener(self):
  1364. with FakeYDL() as ydl:
  1365. with warnings.catch_warnings():
  1366. warnings.simplefilter('ignore', category=DeprecationWarning)
  1367. assert isinstance(ydl._opener, urllib.request.OpenerDirector)
  1368. @pytest.mark.parametrize('proxy,expected', [
  1369. ('http://127.0.0.1:8080', {'all': 'http://127.0.0.1:8080'}),
  1370. ('', {'all': '__noproxy__'}),
  1371. (None, {'http': 'http://127.0.0.1:8081', 'https': 'http://127.0.0.1:8081'}), # env, set https
  1372. ])
  1373. def test_proxy(self, proxy, expected, monkeypatch):
  1374. monkeypatch.setenv('HTTP_PROXY', 'http://127.0.0.1:8081')
  1375. with FakeYDL({'proxy': proxy}) as ydl:
  1376. assert ydl.proxies == expected
  1377. def test_compat_request(self):
  1378. with FakeRHYDL() as ydl:
  1379. assert ydl.urlopen('test://')
  1380. urllib_req = urllib.request.Request('http://foo.bar', data=b'test', method='PUT', headers={'X-Test': '1'})
  1381. urllib_req.add_unredirected_header('Cookie', 'bob=bob')
  1382. urllib_req.timeout = 2
  1383. with warnings.catch_warnings():
  1384. warnings.simplefilter('ignore', category=DeprecationWarning)
  1385. req = ydl.urlopen(urllib_req).request
  1386. assert req.url == urllib_req.get_full_url()
  1387. assert req.data == urllib_req.data
  1388. assert req.method == urllib_req.get_method()
  1389. assert 'X-Test' in req.headers
  1390. assert 'Cookie' in req.headers
  1391. assert req.extensions.get('timeout') == 2
  1392. with pytest.raises(AssertionError):
  1393. ydl.urlopen(None)
  1394. def test_extract_basic_auth(self):
  1395. with FakeRHYDL() as ydl:
  1396. res = ydl.urlopen(Request('http://user:pass@foo.bar'))
  1397. assert res.request.headers['Authorization'] == 'Basic dXNlcjpwYXNz'
  1398. def test_sanitize_url(self):
  1399. with FakeRHYDL() as ydl:
  1400. res = ydl.urlopen(Request('httpss://foo.bar'))
  1401. assert res.request.url == 'https://foo.bar'
  1402. def test_file_urls_error(self):
  1403. # use urllib handler
  1404. with FakeYDL() as ydl:
  1405. with pytest.raises(RequestError, match=r'file:// URLs are disabled by default'):
  1406. ydl.urlopen('file://')
  1407. @pytest.mark.parametrize('scheme', (['ws', 'wss']))
  1408. def test_websocket_unavailable_error(self, scheme):
  1409. with AllUnsupportedRHYDL() as ydl:
  1410. with pytest.raises(RequestError, match=r'This request requires WebSocket support'):
  1411. ydl.urlopen(f'{scheme}://')
  1412. def test_legacy_server_connect_error(self):
  1413. with FakeRHYDL() as ydl:
  1414. for error in ('UNSAFE_LEGACY_RENEGOTIATION_DISABLED', 'SSLV3_ALERT_HANDSHAKE_FAILURE'):
  1415. with pytest.raises(RequestError, match=r'Try using --legacy-server-connect'):
  1416. ydl.urlopen(f'ssl://{error}')
  1417. with pytest.raises(SSLError, match='testerror'):
  1418. ydl.urlopen('ssl://testerror')
  1419. def test_unsupported_impersonate_target(self):
  1420. class FakeImpersonationRHYDL(FakeYDL):
  1421. def __init__(self, *args, **kwargs):
  1422. class HTTPRH(RequestHandler):
  1423. def _send(self, request: Request):
  1424. pass
  1425. _SUPPORTED_URL_SCHEMES = ('http',)
  1426. _SUPPORTED_PROXY_SCHEMES = None
  1427. super().__init__(*args, **kwargs)
  1428. self._request_director = self.build_request_director([HTTPRH])
  1429. with FakeImpersonationRHYDL() as ydl:
  1430. with pytest.raises(
  1431. RequestError,
  1432. match=r'Impersonate target "test" is not available',
  1433. ):
  1434. ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
  1435. def test_unsupported_impersonate_extension(self):
  1436. class FakeHTTPRHYDL(FakeYDL):
  1437. def __init__(self, *args, **kwargs):
  1438. class IRH(ImpersonateRequestHandler):
  1439. def _send(self, request: Request):
  1440. pass
  1441. _SUPPORTED_URL_SCHEMES = ('http',)
  1442. _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
  1443. _SUPPORTED_PROXY_SCHEMES = None
  1444. super().__init__(*args, **kwargs)
  1445. self._request_director = self.build_request_director([IRH])
  1446. with FakeHTTPRHYDL() as ydl:
  1447. with pytest.raises(
  1448. RequestError,
  1449. match=r'Impersonate target "test" is not available',
  1450. ):
  1451. ydl.urlopen(Request('http://', extensions={'impersonate': ImpersonateTarget('test', None, None, None)}))
  1452. def test_raise_impersonate_error(self):
  1453. with pytest.raises(
  1454. YoutubeDLError,
  1455. match=r'Impersonate target "test" is not available',
  1456. ):
  1457. FakeYDL({'impersonate': ImpersonateTarget('test', None, None, None)})
  1458. def test_pass_impersonate_param(self, monkeypatch):
  1459. class IRH(ImpersonateRequestHandler):
  1460. def _send(self, request: Request):
  1461. pass
  1462. _SUPPORTED_URL_SCHEMES = ('http',)
  1463. _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget('abc'): 'test'}
  1464. # Bypass the check on initialize
  1465. brh = FakeYDL.build_request_director
  1466. monkeypatch.setattr(FakeYDL, 'build_request_director', lambda cls, handlers, preferences=None: brh(cls, handlers=[IRH]))
  1467. with FakeYDL({
  1468. 'impersonate': ImpersonateTarget('abc', None, None, None),
  1469. }) as ydl:
  1470. rh = self.build_handler(ydl, IRH)
  1471. assert rh.impersonate == ImpersonateTarget('abc', None, None, None)
  1472. def test_get_impersonate_targets(self):
  1473. handlers = []
  1474. for target_client in ('abc', 'xyz', 'asd'):
  1475. class TestRH(ImpersonateRequestHandler):
  1476. def _send(self, request: Request):
  1477. pass
  1478. _SUPPORTED_URL_SCHEMES = ('http',)
  1479. _SUPPORTED_IMPERSONATE_TARGET_MAP = {ImpersonateTarget(target_client): 'test'}
  1480. RH_KEY = target_client
  1481. RH_NAME = target_client
  1482. handlers.append(TestRH)
  1483. with FakeYDL() as ydl:
  1484. ydl._request_director = ydl.build_request_director(handlers)
  1485. assert set(ydl._get_available_impersonate_targets()) == {
  1486. (ImpersonateTarget('xyz'), 'xyz'),
  1487. (ImpersonateTarget('abc'), 'abc'),
  1488. (ImpersonateTarget('asd'), 'asd'),
  1489. }
  1490. assert ydl._impersonate_target_available(ImpersonateTarget('abc'))
  1491. assert ydl._impersonate_target_available(ImpersonateTarget())
  1492. assert not ydl._impersonate_target_available(ImpersonateTarget('zxy'))
  1493. @pytest.mark.parametrize('proxy_key,proxy_url,expected', [
  1494. ('http', '__noproxy__', None),
  1495. ('no', '127.0.0.1,foo.bar', '127.0.0.1,foo.bar'),
  1496. ('https', 'example.com', 'http://example.com'),
  1497. ('https', '//example.com', 'http://example.com'),
  1498. ('https', 'socks5://example.com', 'socks5h://example.com'),
  1499. ('http', 'socks://example.com', 'socks4://example.com'),
  1500. ('http', 'socks4://example.com', 'socks4://example.com'),
  1501. ('unrelated', '/bad/proxy', '/bad/proxy'), # clean_proxies should ignore bad proxies
  1502. ])
  1503. def test_clean_proxy(self, proxy_key, proxy_url, expected, monkeypatch):
  1504. # proxies should be cleaned in urlopen()
  1505. with FakeRHYDL() as ydl:
  1506. req = ydl.urlopen(Request('test://', proxies={proxy_key: proxy_url})).request
  1507. assert req.proxies[proxy_key] == expected
  1508. # and should also be cleaned when building the handler
  1509. monkeypatch.setenv(f'{proxy_key.upper()}_PROXY', proxy_url)
  1510. with FakeYDL() as ydl:
  1511. rh = self.build_handler(ydl)
  1512. assert rh.proxies[proxy_key] == expected
  1513. def test_clean_proxy_header(self):
  1514. with FakeRHYDL() as ydl:
  1515. req = ydl.urlopen(Request('test://', headers={'ytdl-request-proxy': '//foo.bar'})).request
  1516. assert 'ytdl-request-proxy' not in req.headers
  1517. assert req.proxies == {'all': 'http://foo.bar'}
  1518. with FakeYDL({'http_headers': {'ytdl-request-proxy': '//foo.bar'}}) as ydl:
  1519. rh = self.build_handler(ydl)
  1520. assert 'ytdl-request-proxy' not in rh.headers
  1521. assert rh.proxies == {'all': 'http://foo.bar'}
  1522. def test_clean_header(self):
  1523. with FakeRHYDL() as ydl:
  1524. res = ydl.urlopen(Request('test://', headers={'Youtubedl-no-compression': True}))
  1525. assert 'Youtubedl-no-compression' not in res.request.headers
  1526. assert res.request.headers.get('Accept-Encoding') == 'identity'
  1527. with FakeYDL({'http_headers': {'Youtubedl-no-compression': True}}) as ydl:
  1528. rh = self.build_handler(ydl)
  1529. assert 'Youtubedl-no-compression' not in rh.headers
  1530. assert rh.headers.get('Accept-Encoding') == 'identity'
  1531. with FakeYDL({'http_headers': {'Ytdl-socks-proxy': 'socks://localhost:1080'}}) as ydl:
  1532. rh = self.build_handler(ydl)
  1533. assert 'Ytdl-socks-proxy' not in rh.headers
  1534. def test_build_handler_params(self):
  1535. with FakeYDL({
  1536. 'http_headers': {'test': 'testtest'},
  1537. 'socket_timeout': 2,
  1538. 'proxy': 'http://127.0.0.1:8080',
  1539. 'source_address': '127.0.0.45',
  1540. 'debug_printtraffic': True,
  1541. 'compat_opts': ['no-certifi'],
  1542. 'nocheckcertificate': True,
  1543. 'legacyserverconnect': True,
  1544. }) as ydl:
  1545. rh = self.build_handler(ydl)
  1546. assert rh.headers.get('test') == 'testtest'
  1547. assert 'Accept' in rh.headers # ensure std_headers are still there
  1548. assert rh.timeout == 2
  1549. assert rh.proxies.get('all') == 'http://127.0.0.1:8080'
  1550. assert rh.source_address == '127.0.0.45'
  1551. assert rh.verbose is True
  1552. assert rh.prefer_system_certs is True
  1553. assert rh.verify is False
  1554. assert rh.legacy_ssl_support is True
  1555. @pytest.mark.parametrize('ydl_params', [
  1556. {'client_certificate': 'fakecert.crt'},
  1557. {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key'},
  1558. {'client_certificate': 'fakecert.crt', 'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
  1559. {'client_certificate_key': 'fakekey.key', 'client_certificate_password': 'foobar'},
  1560. ])
  1561. def test_client_certificate(self, ydl_params):
  1562. with FakeYDL(ydl_params) as ydl:
  1563. rh = self.build_handler(ydl)
  1564. assert rh._client_cert == ydl_params # XXX: Too bound to implementation
  1565. def test_urllib_file_urls(self):
  1566. with FakeYDL({'enable_file_urls': False}) as ydl:
  1567. rh = self.build_handler(ydl, UrllibRH)
  1568. assert rh.enable_file_urls is False
  1569. with FakeYDL({'enable_file_urls': True}) as ydl:
  1570. rh = self.build_handler(ydl, UrllibRH)
  1571. assert rh.enable_file_urls is True
  1572. def test_compat_opt_prefer_urllib(self):
  1573. # This assumes urllib only has a preference when this compat opt is given
  1574. with FakeYDL({'compat_opts': ['prefer-legacy-http-handler']}) as ydl:
  1575. director = ydl.build_request_director([UrllibRH])
  1576. assert len(director.preferences) == 1
  1577. assert director.preferences.pop()(UrllibRH, None)
  1578. class TestRequest:
  1579. def test_query(self):
  1580. req = Request('http://example.com?q=something', query={'v': 'xyz'})
  1581. assert req.url == 'http://example.com?q=something&v=xyz'
  1582. req.update(query={'v': '123'})
  1583. assert req.url == 'http://example.com?q=something&v=123'
  1584. req.update(url='http://example.com', query={'v': 'xyz'})
  1585. assert req.url == 'http://example.com?v=xyz'
  1586. def test_method(self):
  1587. req = Request('http://example.com')
  1588. assert req.method == 'GET'
  1589. req.data = b'test'
  1590. assert req.method == 'POST'
  1591. req.data = None
  1592. assert req.method == 'GET'
  1593. req.data = b'test2'
  1594. req.method = 'PUT'
  1595. assert req.method == 'PUT'
  1596. req.data = None
  1597. assert req.method == 'PUT'
  1598. with pytest.raises(TypeError):
  1599. req.method = 1
  1600. def test_request_helpers(self):
  1601. assert HEADRequest('http://example.com').method == 'HEAD'
  1602. assert PUTRequest('http://example.com').method == 'PUT'
  1603. def test_headers(self):
  1604. req = Request('http://example.com', headers={'tesT': 'test'})
  1605. assert req.headers == HTTPHeaderDict({'test': 'test'})
  1606. req.update(headers={'teSt2': 'test2'})
  1607. assert req.headers == HTTPHeaderDict({'test': 'test', 'test2': 'test2'})
  1608. req.headers = new_headers = HTTPHeaderDict({'test': 'test'})
  1609. assert req.headers == HTTPHeaderDict({'test': 'test'})
  1610. assert req.headers is new_headers
  1611. # test converts dict to case insensitive dict
  1612. req.headers = new_headers = {'test2': 'test2'}
  1613. assert isinstance(req.headers, HTTPHeaderDict)
  1614. assert req.headers is not new_headers
  1615. with pytest.raises(TypeError):
  1616. req.headers = None
  1617. def test_data_type(self):
  1618. req = Request('http://example.com')
  1619. assert req.data is None
  1620. # test bytes is allowed
  1621. req.data = b'test'
  1622. assert req.data == b'test'
  1623. # test iterable of bytes is allowed
  1624. i = [b'test', b'test2']
  1625. req.data = i
  1626. assert req.data == i
  1627. # test file-like object is allowed
  1628. f = io.BytesIO(b'test')
  1629. req.data = f
  1630. assert req.data == f
  1631. # common mistake: test str not allowed
  1632. with pytest.raises(TypeError):
  1633. req.data = 'test'
  1634. assert req.data != 'test'
  1635. # common mistake: test dict is not allowed
  1636. with pytest.raises(TypeError):
  1637. req.data = {'test': 'test'}
  1638. assert req.data != {'test': 'test'}
  1639. def test_content_length_header(self):
  1640. req = Request('http://example.com', headers={'Content-Length': '0'}, data=b'')
  1641. assert req.headers.get('Content-Length') == '0'
  1642. req.data = b'test'
  1643. assert 'Content-Length' not in req.headers
  1644. req = Request('http://example.com', headers={'Content-Length': '10'})
  1645. assert 'Content-Length' not in req.headers
  1646. def test_content_type_header(self):
  1647. req = Request('http://example.com', headers={'Content-Type': 'test'}, data=b'test')
  1648. assert req.headers.get('Content-Type') == 'test'
  1649. req.data = b'test2'
  1650. assert req.headers.get('Content-Type') == 'test'
  1651. req.data = None
  1652. assert 'Content-Type' not in req.headers
  1653. req.data = b'test3'
  1654. assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
  1655. def test_update_req(self):
  1656. req = Request('http://example.com')
  1657. assert req.data is None
  1658. assert req.method == 'GET'
  1659. assert 'Content-Type' not in req.headers
  1660. # Test that zero-byte payloads will be sent
  1661. req.update(data=b'')
  1662. assert req.data == b''
  1663. assert req.method == 'POST'
  1664. assert req.headers.get('Content-Type') == 'application/x-www-form-urlencoded'
  1665. def test_proxies(self):
  1666. req = Request(url='http://example.com', proxies={'http': 'http://127.0.0.1:8080'})
  1667. assert req.proxies == {'http': 'http://127.0.0.1:8080'}
  1668. def test_extensions(self):
  1669. req = Request(url='http://example.com', extensions={'timeout': 2})
  1670. assert req.extensions == {'timeout': 2}
  1671. def test_copy(self):
  1672. req = Request(
  1673. url='http://example.com',
  1674. extensions={'cookiejar': CookieJar()},
  1675. headers={'Accept-Encoding': 'br'},
  1676. proxies={'http': 'http://127.0.0.1'},
  1677. data=[b'123'],
  1678. )
  1679. req_copy = req.copy()
  1680. assert req_copy is not req
  1681. assert req_copy.url == req.url
  1682. assert req_copy.headers == req.headers
  1683. assert req_copy.headers is not req.headers
  1684. assert req_copy.proxies == req.proxies
  1685. assert req_copy.proxies is not req.proxies
  1686. # Data is not able to be copied
  1687. assert req_copy.data == req.data
  1688. assert req_copy.data is req.data
  1689. # Shallow copy extensions
  1690. assert req_copy.extensions is not req.extensions
  1691. assert req_copy.extensions['cookiejar'] == req.extensions['cookiejar']
  1692. # Subclasses are copied by default
  1693. class AnotherRequest(Request):
  1694. pass
  1695. req = AnotherRequest(url='http://127.0.0.1')
  1696. assert isinstance(req.copy(), AnotherRequest)
  1697. def test_url(self):
  1698. req = Request(url='https://фtest.example.com/ some spaceв?ä=c')
  1699. assert req.url == 'https://xn--test-z6d.example.com/%20some%20space%D0%B2?%C3%A4=c'
  1700. assert Request(url='//example.com').url == 'http://example.com'
  1701. with pytest.raises(TypeError):
  1702. Request(url='https://').url = None
  1703. class TestResponse:
  1704. @pytest.mark.parametrize('reason,status,expected', [
  1705. ('custom', 200, 'custom'),
  1706. (None, 404, 'Not Found'), # fallback status
  1707. ('', 403, 'Forbidden'),
  1708. (None, 999, None),
  1709. ])
  1710. def test_reason(self, reason, status, expected):
  1711. res = Response(io.BytesIO(b''), url='test://', headers={}, status=status, reason=reason)
  1712. assert res.reason == expected
  1713. def test_headers(self):
  1714. headers = Message()
  1715. headers.add_header('Test', 'test')
  1716. headers.add_header('Test', 'test2')
  1717. headers.add_header('content-encoding', 'br')
  1718. res = Response(io.BytesIO(b''), headers=headers, url='test://')
  1719. assert res.headers.get_all('test') == ['test', 'test2']
  1720. assert 'Content-Encoding' in res.headers
  1721. def test_get_header(self):
  1722. headers = Message()
  1723. headers.add_header('Set-Cookie', 'cookie1')
  1724. headers.add_header('Set-cookie', 'cookie2')
  1725. headers.add_header('Test', 'test')
  1726. headers.add_header('Test', 'test2')
  1727. res = Response(io.BytesIO(b''), headers=headers, url='test://')
  1728. assert res.get_header('test') == 'test, test2'
  1729. assert res.get_header('set-Cookie') == 'cookie1'
  1730. assert res.get_header('notexist', 'default') == 'default'
  1731. def test_compat(self):
  1732. res = Response(io.BytesIO(b''), url='test://', status=404, headers={'test': 'test'})
  1733. with warnings.catch_warnings():
  1734. warnings.simplefilter('ignore', category=DeprecationWarning)
  1735. assert res.code == res.getcode() == res.status
  1736. assert res.geturl() == res.url
  1737. assert res.info() is res.headers
  1738. assert res.getheader('test') == res.get_header('test')
  1739. class TestImpersonateTarget:
  1740. @pytest.mark.parametrize('target_str,expected', [
  1741. ('abc', ImpersonateTarget('abc', None, None, None)),
  1742. ('abc-120_esr', ImpersonateTarget('abc', '120_esr', None, None)),
  1743. ('abc-120:xyz', ImpersonateTarget('abc', '120', 'xyz', None)),
  1744. ('abc-120:xyz-5.6', ImpersonateTarget('abc', '120', 'xyz', '5.6')),
  1745. ('abc:xyz', ImpersonateTarget('abc', None, 'xyz', None)),
  1746. ('abc:', ImpersonateTarget('abc', None, None, None)),
  1747. ('abc-120:', ImpersonateTarget('abc', '120', None, None)),
  1748. (':xyz', ImpersonateTarget(None, None, 'xyz', None)),
  1749. (':xyz-6.5', ImpersonateTarget(None, None, 'xyz', '6.5')),
  1750. (':', ImpersonateTarget(None, None, None, None)),
  1751. ('', ImpersonateTarget(None, None, None, None)),
  1752. ])
  1753. def test_target_from_str(self, target_str, expected):
  1754. assert ImpersonateTarget.from_str(target_str) == expected
  1755. @pytest.mark.parametrize('target_str', [
  1756. '-120', ':-12.0', '-12:-12', '-:-',
  1757. '::', 'a-c-d:', 'a-c-d:e-f-g', 'a:b:',
  1758. ])
  1759. def test_target_from_invalid_str(self, target_str):
  1760. with pytest.raises(ValueError):
  1761. ImpersonateTarget.from_str(target_str)
  1762. @pytest.mark.parametrize('target,expected', [
  1763. (ImpersonateTarget('abc', None, None, None), 'abc'),
  1764. (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
  1765. (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
  1766. (ImpersonateTarget('abc', '120', 'xyz', '5'), 'abc-120:xyz-5'),
  1767. (ImpersonateTarget('abc', None, 'xyz', None), 'abc:xyz'),
  1768. (ImpersonateTarget('abc', '120', None, None), 'abc-120'),
  1769. (ImpersonateTarget('abc', '120', 'xyz', None), 'abc-120:xyz'),
  1770. (ImpersonateTarget('abc', None, 'xyz'), 'abc:xyz'),
  1771. (ImpersonateTarget(None, None, 'xyz', '6.5'), ':xyz-6.5'),
  1772. (ImpersonateTarget('abc'), 'abc'),
  1773. (ImpersonateTarget(None, None, None, None), ''),
  1774. ])
  1775. def test_str(self, target, expected):
  1776. assert str(target) == expected
  1777. @pytest.mark.parametrize('args', [
  1778. ('abc', None, None, '5'),
  1779. ('abc', '120', None, '5'),
  1780. (None, '120', None, None),
  1781. (None, '120', None, '5'),
  1782. (None, None, None, '5'),
  1783. (None, '120', 'xyz', '5'),
  1784. ])
  1785. def test_invalid_impersonate_target(self, args):
  1786. with pytest.raises(ValueError):
  1787. ImpersonateTarget(*args)
  1788. @pytest.mark.parametrize('target1,target2,is_in,is_eq', [
  1789. (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', None, None, None), True, True),
  1790. (ImpersonateTarget('abc', None, None, None), ImpersonateTarget('abc', '120', None, None), True, False),
  1791. (ImpersonateTarget('abc', None, 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', None), True, False),
  1792. (ImpersonateTarget('abc', '121', 'xyz', 'test'), ImpersonateTarget('abc', '120', 'xyz', 'test'), False, False),
  1793. (ImpersonateTarget('abc'), ImpersonateTarget('abc', '120', 'xyz', 'test'), True, False),
  1794. (ImpersonateTarget('abc', '120', 'xyz', 'test'), ImpersonateTarget('abc'), True, False),
  1795. (ImpersonateTarget(), ImpersonateTarget('abc', '120', 'xyz'), True, False),
  1796. (ImpersonateTarget(), ImpersonateTarget(), True, True),
  1797. ])
  1798. def test_impersonate_target_in(self, target1, target2, is_in, is_eq):
  1799. assert (target1 in target2) is is_in
  1800. assert (target1 == target2) is is_eq