test_socks.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. #!/usr/bin/env python3
  2. # Allow direct execution
  3. import os
  4. import sys
  5. import threading
  6. import unittest
  7. import pytest
  8. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  9. import abc
  10. import contextlib
  11. import enum
  12. import functools
  13. import http.server
  14. import json
  15. import random
  16. import socket
  17. import struct
  18. import time
  19. from socketserver import (
  20. BaseRequestHandler,
  21. StreamRequestHandler,
  22. ThreadingTCPServer,
  23. )
  24. from test.helper import http_server_port
  25. from hypervideo_dl.networking import Request
  26. from hypervideo_dl.networking.exceptions import ProxyError, TransportError
  27. from hypervideo_dl.socks import (
  28. SOCKS4_REPLY_VERSION,
  29. SOCKS4_VERSION,
  30. SOCKS5_USER_AUTH_SUCCESS,
  31. SOCKS5_USER_AUTH_VERSION,
  32. SOCKS5_VERSION,
  33. Socks5AddressType,
  34. Socks5Auth,
  35. )
  36. SOCKS5_USER_AUTH_FAILURE = 0x1
  37. class Socks4CD(enum.IntEnum):
  38. REQUEST_GRANTED = 90
  39. REQUEST_REJECTED_OR_FAILED = 91
  40. REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD = 92
  41. REQUEST_REJECTED_DIFFERENT_USERID = 93
  42. class Socks5Reply(enum.IntEnum):
  43. SUCCEEDED = 0x0
  44. GENERAL_FAILURE = 0x1
  45. CONNECTION_NOT_ALLOWED = 0x2
  46. NETWORK_UNREACHABLE = 0x3
  47. HOST_UNREACHABLE = 0x4
  48. CONNECTION_REFUSED = 0x5
  49. TTL_EXPIRED = 0x6
  50. COMMAND_NOT_SUPPORTED = 0x7
  51. ADDRESS_TYPE_NOT_SUPPORTED = 0x8
  52. class SocksTestRequestHandler(BaseRequestHandler):
  53. def __init__(self, *args, socks_info=None, **kwargs):
  54. self.socks_info = socks_info
  55. super().__init__(*args, **kwargs)
  56. class SocksProxyHandler(BaseRequestHandler):
  57. def __init__(self, request_handler_class, socks_server_kwargs, *args, **kwargs):
  58. self.socks_kwargs = socks_server_kwargs or {}
  59. self.request_handler_class = request_handler_class
  60. super().__init__(*args, **kwargs)
  61. class Socks5ProxyHandler(StreamRequestHandler, SocksProxyHandler):
  62. # SOCKS5 protocol https://tools.ietf.org/html/rfc1928
  63. # SOCKS5 username/password authentication https://tools.ietf.org/html/rfc1929
  64. def handle(self):
  65. sleep = self.socks_kwargs.get('sleep')
  66. if sleep:
  67. time.sleep(sleep)
  68. version, nmethods = self.connection.recv(2)
  69. assert version == SOCKS5_VERSION
  70. methods = list(self.connection.recv(nmethods))
  71. auth = self.socks_kwargs.get('auth')
  72. if auth is not None and Socks5Auth.AUTH_USER_PASS not in methods:
  73. self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
  74. self.server.close_request(self.request)
  75. return
  76. elif Socks5Auth.AUTH_USER_PASS in methods:
  77. self.connection.sendall(struct.pack("!BB", SOCKS5_VERSION, Socks5Auth.AUTH_USER_PASS))
  78. _, user_len = struct.unpack('!BB', self.connection.recv(2))
  79. username = self.connection.recv(user_len).decode()
  80. pass_len = ord(self.connection.recv(1))
  81. password = self.connection.recv(pass_len).decode()
  82. if username == auth[0] and password == auth[1]:
  83. self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_SUCCESS))
  84. else:
  85. self.connection.sendall(struct.pack('!BB', SOCKS5_USER_AUTH_VERSION, SOCKS5_USER_AUTH_FAILURE))
  86. self.server.close_request(self.request)
  87. return
  88. elif Socks5Auth.AUTH_NONE in methods:
  89. self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NONE))
  90. else:
  91. self.connection.sendall(struct.pack('!BB', SOCKS5_VERSION, Socks5Auth.AUTH_NO_ACCEPTABLE))
  92. self.server.close_request(self.request)
  93. return
  94. version, command, _, address_type = struct.unpack('!BBBB', self.connection.recv(4))
  95. socks_info = {
  96. 'version': version,
  97. 'auth_methods': methods,
  98. 'command': command,
  99. 'client_address': self.client_address,
  100. 'ipv4_address': None,
  101. 'domain_address': None,
  102. 'ipv6_address': None,
  103. }
  104. if address_type == Socks5AddressType.ATYP_IPV4:
  105. socks_info['ipv4_address'] = socket.inet_ntoa(self.connection.recv(4))
  106. elif address_type == Socks5AddressType.ATYP_DOMAINNAME:
  107. socks_info['domain_address'] = self.connection.recv(ord(self.connection.recv(1))).decode()
  108. elif address_type == Socks5AddressType.ATYP_IPV6:
  109. socks_info['ipv6_address'] = socket.inet_ntop(socket.AF_INET6, self.connection.recv(16))
  110. else:
  111. self.server.close_request(self.request)
  112. socks_info['port'] = struct.unpack('!H', self.connection.recv(2))[0]
  113. # dummy response, the returned IP is just a placeholder
  114. self.connection.sendall(struct.pack(
  115. '!BBBBIH', SOCKS5_VERSION, self.socks_kwargs.get('reply', Socks5Reply.SUCCEEDED), 0x0, 0x1, 0x7f000001, 40000))
  116. self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
  117. class Socks4ProxyHandler(StreamRequestHandler, SocksProxyHandler):
  118. # SOCKS4 protocol http://www.openssh.com/txt/socks4.protocol
  119. # SOCKS4A protocol http://www.openssh.com/txt/socks4a.protocol
  120. def _read_until_null(self):
  121. return b''.join(iter(functools.partial(self.connection.recv, 1), b'\x00'))
  122. def handle(self):
  123. sleep = self.socks_kwargs.get('sleep')
  124. if sleep:
  125. time.sleep(sleep)
  126. socks_info = {
  127. 'version': SOCKS4_VERSION,
  128. 'command': None,
  129. 'client_address': self.client_address,
  130. 'ipv4_address': None,
  131. 'port': None,
  132. 'domain_address': None,
  133. }
  134. version, command, dest_port, dest_ip = struct.unpack('!BBHI', self.connection.recv(8))
  135. socks_info['port'] = dest_port
  136. socks_info['command'] = command
  137. if version != SOCKS4_VERSION:
  138. self.server.close_request(self.request)
  139. return
  140. use_remote_dns = False
  141. if 0x0 < dest_ip <= 0xFF:
  142. use_remote_dns = True
  143. else:
  144. socks_info['ipv4_address'] = socket.inet_ntoa(struct.pack("!I", dest_ip))
  145. user_id = self._read_until_null().decode()
  146. if user_id != (self.socks_kwargs.get('user_id') or ''):
  147. self.connection.sendall(struct.pack(
  148. '!BBHI', SOCKS4_REPLY_VERSION, Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID, 0x00, 0x00000000))
  149. self.server.close_request(self.request)
  150. return
  151. if use_remote_dns:
  152. socks_info['domain_address'] = self._read_until_null().decode()
  153. # dummy response, the returned IP is just a placeholder
  154. self.connection.sendall(
  155. struct.pack(
  156. '!BBHI', SOCKS4_REPLY_VERSION,
  157. self.socks_kwargs.get('cd_reply', Socks4CD.REQUEST_GRANTED), 40000, 0x7f000001))
  158. self.request_handler_class(self.request, self.client_address, self.server, socks_info=socks_info)
  159. class IPv6ThreadingTCPServer(ThreadingTCPServer):
  160. address_family = socket.AF_INET6
  161. class SocksHTTPTestRequestHandler(http.server.BaseHTTPRequestHandler, SocksTestRequestHandler):
  162. def do_GET(self):
  163. if self.path == '/socks_info':
  164. payload = json.dumps(self.socks_info.copy())
  165. self.send_response(200)
  166. self.send_header('Content-Type', 'application/json; charset=utf-8')
  167. self.send_header('Content-Length', str(len(payload)))
  168. self.end_headers()
  169. self.wfile.write(payload.encode())
  170. @contextlib.contextmanager
  171. def socks_server(socks_server_class, request_handler, bind_ip=None, **socks_server_kwargs):
  172. server = server_thread = None
  173. try:
  174. bind_address = bind_ip or '127.0.0.1'
  175. server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
  176. server = server_type(
  177. (bind_address, 0), functools.partial(socks_server_class, request_handler, socks_server_kwargs))
  178. server_port = http_server_port(server)
  179. server_thread = threading.Thread(target=server.serve_forever)
  180. server_thread.daemon = True
  181. server_thread.start()
  182. if '.' not in bind_address:
  183. yield f'[{bind_address}]:{server_port}'
  184. else:
  185. yield f'{bind_address}:{server_port}'
  186. finally:
  187. server.shutdown()
  188. server.server_close()
  189. server_thread.join(2.0)
  190. class SocksProxyTestContext(abc.ABC):
  191. REQUEST_HANDLER_CLASS = None
  192. def socks_server(self, server_class, *args, **kwargs):
  193. return socks_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
  194. @abc.abstractmethod
  195. def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
  196. """return a dict of socks_info"""
  197. class HTTPSocksTestProxyContext(SocksProxyTestContext):
  198. REQUEST_HANDLER_CLASS = SocksHTTPTestRequestHandler
  199. def socks_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
  200. request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/socks_info', **req_kwargs)
  201. handler.validate(request)
  202. return json.loads(handler.send(request).read().decode())
  203. CTX_MAP = {
  204. 'http': HTTPSocksTestProxyContext,
  205. }
  206. @pytest.fixture(scope='module')
  207. def ctx(request):
  208. return CTX_MAP[request.param]()
  209. class TestSocks4Proxy:
  210. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  211. def test_socks4_no_auth(self, handler, ctx):
  212. with handler() as rh:
  213. with ctx.socks_server(Socks4ProxyHandler) as server_address:
  214. response = ctx.socks_info_request(
  215. rh, proxies={'all': f'socks4://{server_address}'})
  216. assert response['version'] == 4
  217. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  218. def test_socks4_auth(self, handler, ctx):
  219. with handler() as rh:
  220. with ctx.socks_server(Socks4ProxyHandler, user_id='user') as server_address:
  221. with pytest.raises(ProxyError):
  222. ctx.socks_info_request(rh, proxies={'all': f'socks4://{server_address}'})
  223. response = ctx.socks_info_request(
  224. rh, proxies={'all': f'socks4://user:@{server_address}'})
  225. assert response['version'] == 4
  226. @pytest.mark.parametrize('handler,ctx', [
  227. pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
  228. reason='socks4a implementation currently broken when destination is not a domain name'))
  229. ], indirect=True)
  230. def test_socks4a_ipv4_target(self, handler, ctx):
  231. with ctx.socks_server(Socks4ProxyHandler) as server_address:
  232. with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
  233. response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
  234. assert response['version'] == 4
  235. assert response['ipv4_address'] == '127.0.0.1'
  236. assert response['domain_address'] is None
  237. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  238. def test_socks4a_domain_target(self, handler, ctx):
  239. with ctx.socks_server(Socks4ProxyHandler) as server_address:
  240. with handler(proxies={'all': f'socks4a://{server_address}'}) as rh:
  241. response = ctx.socks_info_request(rh, target_domain='localhost')
  242. assert response['version'] == 4
  243. assert response['ipv4_address'] is None
  244. assert response['domain_address'] == 'localhost'
  245. @pytest.mark.parametrize('handler,ctx', [
  246. pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
  247. reason='source_address is not yet supported for socks4 proxies'))
  248. ], indirect=True)
  249. def test_ipv4_client_source_address(self, handler, ctx):
  250. with ctx.socks_server(Socks4ProxyHandler) as server_address:
  251. source_address = f'127.0.0.{random.randint(5, 255)}'
  252. with handler(proxies={'all': f'socks4://{server_address}'},
  253. source_address=source_address) as rh:
  254. response = ctx.socks_info_request(rh)
  255. assert response['client_address'][0] == source_address
  256. assert response['version'] == 4
  257. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  258. @pytest.mark.parametrize('reply_code', [
  259. Socks4CD.REQUEST_REJECTED_OR_FAILED,
  260. Socks4CD.REQUEST_REJECTED_CANNOT_CONNECT_TO_IDENTD,
  261. Socks4CD.REQUEST_REJECTED_DIFFERENT_USERID,
  262. ])
  263. def test_socks4_errors(self, handler, ctx, reply_code):
  264. with ctx.socks_server(Socks4ProxyHandler, cd_reply=reply_code) as server_address:
  265. with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
  266. with pytest.raises(ProxyError):
  267. ctx.socks_info_request(rh)
  268. @pytest.mark.parametrize('handler,ctx', [
  269. pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
  270. reason='IPv6 socks4 proxies are not yet supported'))
  271. ], indirect=True)
  272. def test_ipv6_socks4_proxy(self, handler, ctx):
  273. with ctx.socks_server(Socks4ProxyHandler, bind_ip='::1') as server_address:
  274. with handler(proxies={'all': f'socks4://{server_address}'}) as rh:
  275. response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
  276. assert response['client_address'][0] == '::1'
  277. assert response['ipv4_address'] == '127.0.0.1'
  278. assert response['version'] == 4
  279. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  280. def test_timeout(self, handler, ctx):
  281. with ctx.socks_server(Socks4ProxyHandler, sleep=2) as server_address:
  282. with handler(proxies={'all': f'socks4://{server_address}'}, timeout=1) as rh:
  283. with pytest.raises(TransportError):
  284. ctx.socks_info_request(rh)
  285. class TestSocks5Proxy:
  286. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  287. def test_socks5_no_auth(self, handler, ctx):
  288. with ctx.socks_server(Socks5ProxyHandler) as server_address:
  289. with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
  290. response = ctx.socks_info_request(rh)
  291. assert response['auth_methods'] == [0x0]
  292. assert response['version'] == 5
  293. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  294. def test_socks5_user_pass(self, handler, ctx):
  295. with ctx.socks_server(Socks5ProxyHandler, auth=('test', 'testpass')) as server_address:
  296. with handler() as rh:
  297. with pytest.raises(ProxyError):
  298. ctx.socks_info_request(rh, proxies={'all': f'socks5://{server_address}'})
  299. response = ctx.socks_info_request(
  300. rh, proxies={'all': f'socks5://test:testpass@{server_address}'})
  301. assert response['auth_methods'] == [Socks5Auth.AUTH_NONE, Socks5Auth.AUTH_USER_PASS]
  302. assert response['version'] == 5
  303. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  304. def test_socks5_ipv4_target(self, handler, ctx):
  305. with ctx.socks_server(Socks5ProxyHandler) as server_address:
  306. with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
  307. response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
  308. assert response['ipv4_address'] == '127.0.0.1'
  309. assert response['version'] == 5
  310. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  311. def test_socks5_domain_target(self, handler, ctx):
  312. with ctx.socks_server(Socks5ProxyHandler) as server_address:
  313. with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
  314. response = ctx.socks_info_request(rh, target_domain='localhost')
  315. assert response['ipv4_address'] == '127.0.0.1'
  316. assert response['version'] == 5
  317. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  318. def test_socks5h_domain_target(self, handler, ctx):
  319. with ctx.socks_server(Socks5ProxyHandler) as server_address:
  320. with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
  321. response = ctx.socks_info_request(rh, target_domain='localhost')
  322. assert response['ipv4_address'] is None
  323. assert response['domain_address'] == 'localhost'
  324. assert response['version'] == 5
  325. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  326. def test_socks5h_ip_target(self, handler, ctx):
  327. with ctx.socks_server(Socks5ProxyHandler) as server_address:
  328. with handler(proxies={'all': f'socks5h://{server_address}'}) as rh:
  329. response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
  330. assert response['ipv4_address'] == '127.0.0.1'
  331. assert response['domain_address'] is None
  332. assert response['version'] == 5
  333. @pytest.mark.parametrize('handler,ctx', [
  334. pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
  335. reason='IPv6 destination addresses are not yet supported'))
  336. ], indirect=True)
  337. def test_socks5_ipv6_destination(self, handler, ctx):
  338. with ctx.socks_server(Socks5ProxyHandler) as server_address:
  339. with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
  340. response = ctx.socks_info_request(rh, target_domain='[::1]')
  341. assert response['ipv6_address'] == '::1'
  342. assert response['port'] == 80
  343. assert response['version'] == 5
  344. @pytest.mark.parametrize('handler,ctx', [
  345. pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
  346. reason='IPv6 socks5 proxies are not yet supported'))
  347. ], indirect=True)
  348. def test_ipv6_socks5_proxy(self, handler, ctx):
  349. with ctx.socks_server(Socks5ProxyHandler, bind_ip='::1') as server_address:
  350. with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
  351. response = ctx.socks_info_request(rh, target_domain='127.0.0.1')
  352. assert response['client_address'][0] == '::1'
  353. assert response['ipv4_address'] == '127.0.0.1'
  354. assert response['version'] == 5
  355. # XXX: is there any feasible way of testing IPv6 source addresses?
  356. # Same would go for non-proxy source_address test...
  357. @pytest.mark.parametrize('handler,ctx', [
  358. pytest.param('Urllib', 'http', marks=pytest.mark.xfail(
  359. reason='source_address is not yet supported for socks5 proxies'))
  360. ], indirect=True)
  361. def test_ipv4_client_source_address(self, handler, ctx):
  362. with ctx.socks_server(Socks5ProxyHandler) as server_address:
  363. source_address = f'127.0.0.{random.randint(5, 255)}'
  364. with handler(proxies={'all': f'socks5://{server_address}'}, source_address=source_address) as rh:
  365. response = ctx.socks_info_request(rh)
  366. assert response['client_address'][0] == source_address
  367. assert response['version'] == 5
  368. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  369. @pytest.mark.parametrize('reply_code', [
  370. Socks5Reply.GENERAL_FAILURE,
  371. Socks5Reply.CONNECTION_NOT_ALLOWED,
  372. Socks5Reply.NETWORK_UNREACHABLE,
  373. Socks5Reply.HOST_UNREACHABLE,
  374. Socks5Reply.CONNECTION_REFUSED,
  375. Socks5Reply.TTL_EXPIRED,
  376. Socks5Reply.COMMAND_NOT_SUPPORTED,
  377. Socks5Reply.ADDRESS_TYPE_NOT_SUPPORTED,
  378. ])
  379. def test_socks5_errors(self, handler, ctx, reply_code):
  380. with ctx.socks_server(Socks5ProxyHandler, reply=reply_code) as server_address:
  381. with handler(proxies={'all': f'socks5://{server_address}'}) as rh:
  382. with pytest.raises(ProxyError):
  383. ctx.socks_info_request(rh)
  384. @pytest.mark.parametrize('handler,ctx', [('Urllib', 'http')], indirect=True)
  385. def test_timeout(self, handler, ctx):
  386. with ctx.socks_server(Socks5ProxyHandler, sleep=2) as server_address:
  387. with handler(proxies={'all': f'socks5://{server_address}'}, timeout=1) as rh:
  388. with pytest.raises(TransportError):
  389. ctx.socks_info_request(rh)
  390. if __name__ == '__main__':
  391. unittest.main()