poolrequests.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. import sys
  2. from time import time
  3. from itertools import cycle
  4. from threading import local
  5. import requests
  6. from searx import settings
  7. from searx import logger
  8. from searx.raise_for_httperror import raise_for_httperror
  9. logger = logger.getChild('poolrequests')
  10. try:
  11. import ssl
  12. if ssl.OPENSSL_VERSION_INFO[0:3] < (1, 0, 2):
  13. # https://github.com/certifi/python-certifi#1024-bit-root-certificates
  14. logger.critical('You are using an old openssl version({0}), please upgrade above 1.0.2!'
  15. .format(ssl.OPENSSL_VERSION))
  16. sys.exit(1)
  17. except ImportError:
  18. ssl = None
  19. if not getattr(ssl, "HAS_SNI", False):
  20. try:
  21. import OpenSSL # pylint: disable=unused-import
  22. except ImportError:
  23. logger.critical("ssl doesn't support SNI and the pyopenssl module is not installed.\n"
  24. "Some HTTPS connections will fail")
  25. sys.exit(1)
  26. class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):
  27. def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
  28. pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
  29. max_retries=requests.adapters.DEFAULT_RETRIES,
  30. pool_block=requests.adapters.DEFAULT_POOLBLOCK,
  31. **conn_params):
  32. if max_retries == requests.adapters.DEFAULT_RETRIES:
  33. self.max_retries = requests.adapters.Retry(0, read=False)
  34. else:
  35. self.max_retries = requests.adapters.Retry.from_int(max_retries)
  36. self.config = {}
  37. self.proxy_manager = {}
  38. super().__init__()
  39. self._pool_connections = pool_connections
  40. self._pool_maxsize = pool_maxsize
  41. self._pool_block = pool_block
  42. self._conn_params = conn_params
  43. self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)
  44. def __setstate__(self, state):
  45. # Can't handle by adding 'proxy_manager' to self.__attrs__ because
  46. # because self.poolmanager uses a lambda function, which isn't pickleable.
  47. self.proxy_manager = {}
  48. self.config = {}
  49. for attr, value in state.items():
  50. setattr(self, attr, value)
  51. self.init_poolmanager(self._pool_connections, self._pool_maxsize,
  52. block=self._pool_block, **self._conn_params)
  53. threadLocal = local()
  54. connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
  55. maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
  56. if settings['outgoing'].get('source_ips'):
  57. http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  58. source_address=(source_ip, 0))
  59. for source_ip in settings['outgoing']['source_ips'])
  60. https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  61. source_address=(source_ip, 0))
  62. for source_ip in settings['outgoing']['source_ips'])
  63. else:
  64. http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  65. https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  66. class SessionSinglePool(requests.Session):
  67. def __init__(self):
  68. super().__init__()
  69. # reuse the same adapters
  70. self.adapters.clear()
  71. https_adapter = threadLocal.__dict__.setdefault('https_adapter', next(https_adapters))
  72. self.mount('https://', https_adapter)
  73. if get_enable_http_protocol():
  74. http_adapter = threadLocal.__dict__.setdefault('http_adapter', next(http_adapters))
  75. self.mount('http://', http_adapter)
  76. def close(self):
  77. """Call super, but clear adapters since there are managed globally"""
  78. self.adapters.clear()
  79. super().close()
  80. def set_timeout_for_thread(timeout, start_time=None):
  81. threadLocal.timeout = timeout
  82. threadLocal.start_time = start_time
  83. def set_enable_http_protocol(enable_http):
  84. threadLocal.enable_http = enable_http
  85. def get_enable_http_protocol():
  86. try:
  87. return threadLocal.enable_http
  88. except AttributeError:
  89. return False
  90. def reset_time_for_thread():
  91. threadLocal.total_time = 0
  92. def get_time_for_thread():
  93. return threadLocal.total_time
  94. def get_proxy_cycles(proxy_settings):
  95. if not proxy_settings:
  96. return None
  97. # Backwards compatibility for single proxy in settings.yml
  98. for protocol, proxy in proxy_settings.items():
  99. if isinstance(proxy, str):
  100. proxy_settings[protocol] = [proxy]
  101. for protocol in proxy_settings:
  102. proxy_settings[protocol] = cycle(proxy_settings[protocol])
  103. return proxy_settings
  104. GLOBAL_PROXY_CYCLES = get_proxy_cycles(settings['outgoing'].get('proxies'))
  105. def get_proxies(proxy_cycles):
  106. if proxy_cycles:
  107. return {protocol: next(proxy_cycle) for protocol, proxy_cycle in proxy_cycles.items()}
  108. return None
  109. def get_global_proxies():
  110. return get_proxies(GLOBAL_PROXY_CYCLES)
  111. def request(method, url, **kwargs):
  112. """same as requests/requests/api.py request(...)"""
  113. time_before_request = time()
  114. # session start
  115. session = SessionSinglePool()
  116. # proxies
  117. if not kwargs.get('proxies'):
  118. kwargs['proxies'] = get_global_proxies()
  119. # timeout
  120. if 'timeout' in kwargs:
  121. timeout = kwargs['timeout']
  122. else:
  123. timeout = getattr(threadLocal, 'timeout', None)
  124. if timeout is not None:
  125. kwargs['timeout'] = timeout
  126. # raise_for_error
  127. check_for_httperror = True
  128. if 'raise_for_httperror' in kwargs:
  129. check_for_httperror = kwargs['raise_for_httperror']
  130. del kwargs['raise_for_httperror']
  131. # do request
  132. response = session.request(method=method, url=url, **kwargs)
  133. time_after_request = time()
  134. # is there a timeout for this engine ?
  135. if timeout is not None:
  136. timeout_overhead = 0.2 # seconds
  137. # start_time = when the user request started
  138. start_time = getattr(threadLocal, 'start_time', time_before_request)
  139. search_duration = time_after_request - start_time
  140. if search_duration > timeout + timeout_overhead:
  141. raise requests.exceptions.Timeout(response=response)
  142. # session end
  143. session.close()
  144. if hasattr(threadLocal, 'total_time'):
  145. threadLocal.total_time += time_after_request - time_before_request
  146. # raise an exception
  147. if check_for_httperror:
  148. raise_for_httperror(response)
  149. return response
  150. def get(url, **kwargs):
  151. kwargs.setdefault('allow_redirects', True)
  152. return request('get', url, **kwargs)
  153. def options(url, **kwargs):
  154. kwargs.setdefault('allow_redirects', True)
  155. return request('options', url, **kwargs)
  156. def head(url, **kwargs):
  157. kwargs.setdefault('allow_redirects', False)
  158. return request('head', url, **kwargs)
  159. def post(url, data=None, **kwargs):
  160. return request('post', url, data=data, **kwargs)
  161. def put(url, data=None, **kwargs):
  162. return request('put', url, data=data, **kwargs)
  163. def patch(url, data=None, **kwargs):
  164. return request('patch', url, data=data, **kwargs)
  165. def delete(url, **kwargs):
  166. return request('delete', url, **kwargs)