network.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # pylint: disable=global-statement
  3. # pylint: disable=missing-module-docstring, missing-class-docstring
  4. from __future__ import annotations
  5. import typing
  6. import atexit
  7. import asyncio
  8. import ipaddress
  9. from itertools import cycle
  10. from typing import Dict
  11. import httpx
  12. from searx import logger, searx_debug
  13. from searx.extended_types import SXNG_Response
  14. from .client import new_client, get_loop, AsyncHTTPTransportNoHttp
  15. from .raise_for_httperror import raise_for_httperror
  16. logger = logger.getChild('network')
  17. DEFAULT_NAME = '__DEFAULT__'
  18. NETWORKS: Dict[str, 'Network'] = {}
  19. # requests compatibility when reading proxy settings from settings.yml
  20. PROXY_PATTERN_MAPPING = {
  21. 'http': 'http://',
  22. 'https': 'https://',
  23. 'socks4': 'socks4://',
  24. 'socks5': 'socks5://',
  25. 'socks5h': 'socks5h://',
  26. 'http:': 'http://',
  27. 'https:': 'https://',
  28. 'socks4:': 'socks4://',
  29. 'socks5:': 'socks5://',
  30. 'socks5h:': 'socks5h://',
  31. }
  32. ADDRESS_MAPPING = {'ipv4': '0.0.0.0', 'ipv6': '::'}
  33. class Network:
  34. __slots__ = (
  35. 'enable_http',
  36. 'verify',
  37. 'enable_http2',
  38. 'max_connections',
  39. 'max_keepalive_connections',
  40. 'keepalive_expiry',
  41. 'local_addresses',
  42. 'proxies',
  43. 'using_tor_proxy',
  44. 'max_redirects',
  45. 'retries',
  46. 'retry_on_http_error',
  47. '_local_addresses_cycle',
  48. '_proxies_cycle',
  49. '_clients',
  50. '_logger',
  51. )
  52. _TOR_CHECK_RESULT = {}
  53. def __init__(
  54. # pylint: disable=too-many-arguments
  55. self,
  56. enable_http=True,
  57. verify=True,
  58. enable_http2=False,
  59. max_connections=None,
  60. max_keepalive_connections=None,
  61. keepalive_expiry=None,
  62. proxies=None,
  63. using_tor_proxy=False,
  64. local_addresses=None,
  65. retries=0,
  66. retry_on_http_error=None,
  67. max_redirects=30,
  68. logger_name=None,
  69. ):
  70. self.enable_http = enable_http
  71. self.verify = verify
  72. self.enable_http2 = enable_http2
  73. self.max_connections = max_connections
  74. self.max_keepalive_connections = max_keepalive_connections
  75. self.keepalive_expiry = keepalive_expiry
  76. self.proxies = proxies
  77. self.using_tor_proxy = using_tor_proxy
  78. self.local_addresses = local_addresses
  79. self.retries = retries
  80. self.retry_on_http_error = retry_on_http_error
  81. self.max_redirects = max_redirects
  82. self._local_addresses_cycle = self.get_ipaddress_cycle()
  83. self._proxies_cycle = self.get_proxy_cycles()
  84. self._clients = {}
  85. self._logger = logger.getChild(logger_name) if logger_name else logger
  86. self.check_parameters()
  87. def check_parameters(self):
  88. for address in self.iter_ipaddresses():
  89. if '/' in address:
  90. ipaddress.ip_network(address, False)
  91. else:
  92. ipaddress.ip_address(address)
  93. if self.proxies is not None and not isinstance(self.proxies, (str, dict)):
  94. raise ValueError('proxies type has to be str, dict or None')
  95. def iter_ipaddresses(self):
  96. local_addresses = self.local_addresses
  97. if not local_addresses:
  98. return
  99. if isinstance(local_addresses, str):
  100. local_addresses = [local_addresses]
  101. yield from local_addresses
  102. def get_ipaddress_cycle(self):
  103. while True:
  104. count = 0
  105. for address in self.iter_ipaddresses():
  106. if '/' in address:
  107. for a in ipaddress.ip_network(address, False).hosts():
  108. yield str(a)
  109. count += 1
  110. else:
  111. a = ipaddress.ip_address(address)
  112. yield str(a)
  113. count += 1
  114. if count == 0:
  115. yield None
  116. def iter_proxies(self):
  117. if not self.proxies:
  118. return
  119. # https://www.python-httpx.org/compatibility/#proxy-keys
  120. if isinstance(self.proxies, str):
  121. yield 'all://', [self.proxies]
  122. else:
  123. for pattern, proxy_url in self.proxies.items():
  124. pattern = PROXY_PATTERN_MAPPING.get(pattern, pattern)
  125. if isinstance(proxy_url, str):
  126. proxy_url = [proxy_url]
  127. yield pattern, proxy_url
  128. def get_proxy_cycles(self):
  129. proxy_settings = {}
  130. for pattern, proxy_urls in self.iter_proxies():
  131. proxy_settings[pattern] = cycle(proxy_urls)
  132. while True:
  133. # pylint: disable=stop-iteration-return
  134. yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items())
  135. async def log_response(self, response: httpx.Response):
  136. request = response.request
  137. status = f"{response.status_code} {response.reason_phrase}"
  138. response_line = f"{response.http_version} {status}"
  139. content_type = response.headers.get("Content-Type")
  140. content_type = f' ({content_type})' if content_type else ''
  141. self._logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"{content_type}')
  142. @staticmethod
  143. async def check_tor_proxy(client: httpx.AsyncClient, proxies) -> bool:
  144. if proxies in Network._TOR_CHECK_RESULT:
  145. return Network._TOR_CHECK_RESULT[proxies]
  146. result = True
  147. # ignore client._transport because it is not used with all://
  148. for transport in client._mounts.values(): # pylint: disable=protected-access
  149. if isinstance(transport, AsyncHTTPTransportNoHttp):
  150. continue
  151. if getattr(transport, "_pool") and getattr(
  152. transport._pool, "_rdns", False # pylint: disable=protected-access
  153. ):
  154. continue
  155. return False
  156. response = await client.get("https://check.torproject.org/api/ip", timeout=60)
  157. if not response.json()["IsTor"]:
  158. result = False
  159. Network._TOR_CHECK_RESULT[proxies] = result
  160. return result
  161. async def get_client(self, verify=None, max_redirects=None):
  162. verify = self.verify if verify is None else verify
  163. max_redirects = self.max_redirects if max_redirects is None else max_redirects
  164. local_address = next(self._local_addresses_cycle)
  165. proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key
  166. key = (verify, max_redirects, local_address, proxies)
  167. hook_log_response = self.log_response if searx_debug else None
  168. if key not in self._clients or self._clients[key].is_closed:
  169. client = new_client(
  170. self.enable_http,
  171. verify,
  172. self.enable_http2,
  173. self.max_connections,
  174. self.max_keepalive_connections,
  175. self.keepalive_expiry,
  176. dict(proxies),
  177. local_address,
  178. 0,
  179. max_redirects,
  180. hook_log_response,
  181. )
  182. if self.using_tor_proxy and not await self.check_tor_proxy(client, proxies):
  183. await client.aclose()
  184. raise httpx.ProxyError('Network configuration problem: not using Tor')
  185. self._clients[key] = client
  186. return self._clients[key]
  187. async def aclose(self):
  188. async def close_client(client):
  189. try:
  190. await client.aclose()
  191. except httpx.HTTPError:
  192. pass
  193. await asyncio.gather(*[close_client(client) for client in self._clients.values()], return_exceptions=False)
  194. @staticmethod
  195. def extract_kwargs_clients(kwargs):
  196. kwargs_clients = {}
  197. if 'verify' in kwargs:
  198. kwargs_clients['verify'] = kwargs.pop('verify')
  199. if 'max_redirects' in kwargs:
  200. kwargs_clients['max_redirects'] = kwargs.pop('max_redirects')
  201. if 'allow_redirects' in kwargs:
  202. # see https://github.com/encode/httpx/pull/1808
  203. kwargs['follow_redirects'] = kwargs.pop('allow_redirects')
  204. return kwargs_clients
  205. @staticmethod
  206. def extract_do_raise_for_httperror(kwargs):
  207. do_raise_for_httperror = True
  208. if 'raise_for_httperror' in kwargs:
  209. do_raise_for_httperror = kwargs['raise_for_httperror']
  210. del kwargs['raise_for_httperror']
  211. return do_raise_for_httperror
  212. def patch_response(self, response, do_raise_for_httperror) -> SXNG_Response:
  213. if isinstance(response, httpx.Response):
  214. response = typing.cast(SXNG_Response, response)
  215. # requests compatibility (response is not streamed)
  216. # see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
  217. response.ok = not response.is_error
  218. # raise an exception
  219. if do_raise_for_httperror:
  220. try:
  221. raise_for_httperror(response)
  222. except:
  223. self._logger.warning(f"HTTP Request failed: {response.request.method} {response.request.url}")
  224. raise
  225. return response
  226. def is_valid_response(self, response):
  227. # pylint: disable=too-many-boolean-expressions
  228. if (
  229. (self.retry_on_http_error is True and 400 <= response.status_code <= 599)
  230. or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error)
  231. or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error)
  232. ):
  233. return False
  234. return True
  235. async def call_client(self, stream, method, url, **kwargs) -> SXNG_Response:
  236. retries = self.retries
  237. was_disconnected = False
  238. do_raise_for_httperror = Network.extract_do_raise_for_httperror(kwargs)
  239. kwargs_clients = Network.extract_kwargs_clients(kwargs)
  240. while retries >= 0: # pragma: no cover
  241. client = await self.get_client(**kwargs_clients)
  242. try:
  243. if stream:
  244. response = client.stream(method, url, **kwargs)
  245. else:
  246. response = await client.request(method, url, **kwargs)
  247. if self.is_valid_response(response) or retries <= 0:
  248. return self.patch_response(response, do_raise_for_httperror)
  249. except httpx.RemoteProtocolError as e:
  250. if not was_disconnected:
  251. # the server has closed the connection:
  252. # try again without decreasing the retries variable & with a new HTTP client
  253. was_disconnected = True
  254. await client.aclose()
  255. self._logger.warning('httpx.RemoteProtocolError: the server has disconnected, retrying')
  256. continue
  257. if retries <= 0:
  258. raise e
  259. except (httpx.RequestError, httpx.HTTPStatusError) as e:
  260. if retries <= 0:
  261. raise e
  262. retries -= 1
  263. async def request(self, method, url, **kwargs):
  264. return await self.call_client(False, method, url, **kwargs)
  265. async def stream(self, method, url, **kwargs):
  266. return await self.call_client(True, method, url, **kwargs)
  267. @classmethod
  268. async def aclose_all(cls):
  269. await asyncio.gather(*[network.aclose() for network in NETWORKS.values()], return_exceptions=False)
  270. def get_network(name=None):
  271. return NETWORKS.get(name or DEFAULT_NAME)
  272. def check_network_configuration():
  273. async def check():
  274. exception_count = 0
  275. for network in NETWORKS.values():
  276. if network.using_tor_proxy:
  277. try:
  278. await network.get_client()
  279. except Exception: # pylint: disable=broad-except
  280. network._logger.exception('Error') # pylint: disable=protected-access
  281. exception_count += 1
  282. return exception_count
  283. future = asyncio.run_coroutine_threadsafe(check(), get_loop())
  284. exception_count = future.result()
  285. if exception_count > 0:
  286. raise RuntimeError("Invalid network configuration")
  287. def initialize(settings_engines=None, settings_outgoing=None):
  288. # pylint: disable=import-outside-toplevel)
  289. from searx.engines import engines
  290. from searx import settings
  291. # pylint: enable=import-outside-toplevel)
  292. settings_engines = settings_engines or settings['engines']
  293. settings_outgoing = settings_outgoing or settings['outgoing']
  294. # default parameters for AsyncHTTPTransport
  295. # see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # pylint: disable=line-too-long
  296. default_params = {
  297. 'enable_http': False,
  298. 'verify': settings_outgoing['verify'],
  299. 'enable_http2': settings_outgoing['enable_http2'],
  300. 'max_connections': settings_outgoing['pool_connections'],
  301. 'max_keepalive_connections': settings_outgoing['pool_maxsize'],
  302. 'keepalive_expiry': settings_outgoing['keepalive_expiry'],
  303. 'local_addresses': settings_outgoing['source_ips'],
  304. 'using_tor_proxy': settings_outgoing['using_tor_proxy'],
  305. 'proxies': settings_outgoing['proxies'],
  306. 'max_redirects': settings_outgoing['max_redirects'],
  307. 'retries': settings_outgoing['retries'],
  308. 'retry_on_http_error': None,
  309. }
  310. def new_network(params, logger_name=None):
  311. nonlocal default_params
  312. result = {}
  313. result.update(default_params)
  314. result.update(params)
  315. if logger_name:
  316. result['logger_name'] = logger_name
  317. return Network(**result)
  318. def iter_networks():
  319. nonlocal settings_engines
  320. for engine_spec in settings_engines:
  321. engine_name = engine_spec['name']
  322. engine = engines.get(engine_name)
  323. if engine is None:
  324. continue
  325. network = getattr(engine, 'network', None)
  326. yield engine_name, engine, network
  327. if NETWORKS:
  328. done()
  329. NETWORKS.clear()
  330. NETWORKS[DEFAULT_NAME] = new_network({}, logger_name='default')
  331. NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'}, logger_name='ipv4')
  332. NETWORKS['ipv6'] = new_network({'local_addresses': '::'}, logger_name='ipv6')
  333. # define networks from outgoing.networks
  334. for network_name, network in settings_outgoing['networks'].items():
  335. NETWORKS[network_name] = new_network(network, logger_name=network_name)
  336. # define networks from engines.[i].network (except references)
  337. for engine_name, engine, network in iter_networks():
  338. if network is None:
  339. network = {}
  340. for attribute_name, attribute_value in default_params.items():
  341. if hasattr(engine, attribute_name):
  342. network[attribute_name] = getattr(engine, attribute_name)
  343. else:
  344. network[attribute_name] = attribute_value
  345. NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
  346. elif isinstance(network, dict):
  347. NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
  348. # define networks from engines.[i].network (references)
  349. for engine_name, engine, network in iter_networks():
  350. if isinstance(network, str):
  351. NETWORKS[engine_name] = NETWORKS[network]
  352. # the /image_proxy endpoint has a dedicated network.
  353. # same parameters than the default network, but HTTP/2 is disabled.
  354. # It decreases the CPU load average, and the total time is more or less the same
  355. if 'image_proxy' not in NETWORKS:
  356. image_proxy_params = default_params.copy()
  357. image_proxy_params['enable_http2'] = False
  358. NETWORKS['image_proxy'] = new_network(image_proxy_params, logger_name='image_proxy')
  359. @atexit.register
  360. def done():
  361. """Close all HTTP client
  362. Avoid a warning at exit
  363. See https://github.com/encode/httpx/pull/2026
  364. Note: since Network.aclose has to be async, it is not possible to call this method on Network.__del__
  365. So Network.aclose is called here using atexit.register
  366. """
  367. try:
  368. loop = get_loop()
  369. if loop:
  370. future = asyncio.run_coroutine_threadsafe(Network.aclose_all(), loop)
  371. # wait 3 seconds to close the HTTP clients
  372. future.result(3)
  373. finally:
  374. NETWORKS.clear()
  375. NETWORKS[DEFAULT_NAME] = Network()