poolrequests.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. import sys
  2. from time import time
  3. from itertools import cycle
  4. from threading import local
  5. import requests
  6. from searx import settings
  7. from searx import logger
  8. from searx.raise_for_httperror import raise_for_httperror
  9. logger = logger.getChild('poolrequests')
  10. try:
  11. import ssl
  12. if ssl.OPENSSL_VERSION_INFO[0:3] < (1, 0, 2):
  13. # https://github.com/certifi/python-certifi#1024-bit-root-certificates
  14. logger.critical('You are using an old openssl version({0}), please upgrade above 1.0.2!'
  15. .format(ssl.OPENSSL_VERSION))
  16. sys.exit(1)
  17. except ImportError:
  18. ssl = None
  19. if not getattr(ssl, "HAS_SNI", False):
  20. try:
  21. import OpenSSL # pylint: disable=unused-import
  22. except ImportError:
  23. logger.critical("ssl doesn't support SNI and the pyopenssl module is not installed.\n"
  24. "Some HTTPS connections will fail")
  25. sys.exit(1)
  26. class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):
  27. def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
  28. pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
  29. max_retries=requests.adapters.DEFAULT_RETRIES,
  30. pool_block=requests.adapters.DEFAULT_POOLBLOCK,
  31. **conn_params):
  32. if max_retries == requests.adapters.DEFAULT_RETRIES:
  33. self.max_retries = requests.adapters.Retry(0, read=False)
  34. else:
  35. self.max_retries = requests.adapters.Retry.from_int(max_retries)
  36. self.config = {}
  37. self.proxy_manager = {}
  38. super().__init__()
  39. self._pool_connections = pool_connections
  40. self._pool_maxsize = pool_maxsize
  41. self._pool_block = pool_block
  42. self._conn_params = conn_params
  43. self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)
  44. def __setstate__(self, state):
  45. # Can't handle by adding 'proxy_manager' to self.__attrs__ because
  46. # because self.poolmanager uses a lambda function, which isn't pickleable.
  47. self.proxy_manager = {}
  48. self.config = {}
  49. for attr, value in state.items():
  50. setattr(self, attr, value)
  51. self.init_poolmanager(self._pool_connections, self._pool_maxsize,
  52. block=self._pool_block, **self._conn_params)
  53. threadLocal = local()
  54. connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
  55. maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
  56. if settings['outgoing'].get('source_ips'):
  57. http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  58. source_address=(source_ip, 0))
  59. for source_ip in settings['outgoing']['source_ips'])
  60. https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
  61. source_address=(source_ip, 0))
  62. for source_ip in settings['outgoing']['source_ips'])
  63. else:
  64. http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  65. https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
  66. class SessionSinglePool(requests.Session):
  67. def __init__(self):
  68. super().__init__()
  69. # reuse the same adapters
  70. self.adapters.clear()
  71. https_adapter = threadLocal.__dict__.setdefault('https_adapter', next(https_adapters))
  72. http_adapter = threadLocal.__dict__.setdefault('http_adapter', next(http_adapters))
  73. self.mount('https://', https_adapter)
  74. self.mount('http://', http_adapter)
  75. def close(self):
  76. """Call super, but clear adapters since there are managed globaly"""
  77. self.adapters.clear()
  78. super().close()
  79. def set_timeout_for_thread(timeout, start_time=None):
  80. threadLocal.timeout = timeout
  81. threadLocal.start_time = start_time
  82. def reset_time_for_thread():
  83. threadLocal.total_time = 0
  84. def get_time_for_thread():
  85. return threadLocal.total_time
  86. def get_proxy_cycles(proxy_settings):
  87. if not proxy_settings:
  88. return None
  89. # Backwards compatibility for single proxy in settings.yml
  90. for protocol, proxy in proxy_settings.items():
  91. if isinstance(proxy, str):
  92. proxy_settings[protocol] = [proxy]
  93. for protocol in proxy_settings:
  94. proxy_settings[protocol] = cycle(proxy_settings[protocol])
  95. return proxy_settings
  96. GLOBAL_PROXY_CYCLES = get_proxy_cycles(settings['outgoing'].get('proxies'))
  97. def get_proxies(proxy_cycles):
  98. if proxy_cycles:
  99. return {protocol: next(proxy_cycle) for protocol, proxy_cycle in proxy_cycles.items()}
  100. return None
  101. def get_global_proxies():
  102. return get_proxies(GLOBAL_PROXY_CYCLES)
  103. def request(method, url, **kwargs):
  104. """same as requests/requests/api.py request(...)"""
  105. time_before_request = time()
  106. # session start
  107. session = SessionSinglePool()
  108. # proxies
  109. if not kwargs.get('proxies'):
  110. kwargs['proxies'] = get_global_proxies()
  111. # timeout
  112. if 'timeout' in kwargs:
  113. timeout = kwargs['timeout']
  114. else:
  115. timeout = getattr(threadLocal, 'timeout', None)
  116. if timeout is not None:
  117. kwargs['timeout'] = timeout
  118. # raise_for_error
  119. check_for_httperror = True
  120. if 'raise_for_httperror' in kwargs:
  121. check_for_httperror = kwargs['raise_for_httperror']
  122. del kwargs['raise_for_httperror']
  123. # do request
  124. response = session.request(method=method, url=url, **kwargs)
  125. time_after_request = time()
  126. # is there a timeout for this engine ?
  127. if timeout is not None:
  128. timeout_overhead = 0.2 # seconds
  129. # start_time = when the user request started
  130. start_time = getattr(threadLocal, 'start_time', time_before_request)
  131. search_duration = time_after_request - start_time
  132. if search_duration > timeout + timeout_overhead:
  133. raise requests.exceptions.Timeout(response=response)
  134. # session end
  135. session.close()
  136. if hasattr(threadLocal, 'total_time'):
  137. threadLocal.total_time += time_after_request - time_before_request
  138. # raise an exception
  139. if check_for_httperror:
  140. raise_for_httperror(response)
  141. return response
  142. def get(url, **kwargs):
  143. kwargs.setdefault('allow_redirects', True)
  144. return request('get', url, **kwargs)
  145. def options(url, **kwargs):
  146. kwargs.setdefault('allow_redirects', True)
  147. return request('options', url, **kwargs)
  148. def head(url, **kwargs):
  149. kwargs.setdefault('allow_redirects', False)
  150. return request('head', url, **kwargs)
  151. def post(url, data=None, **kwargs):
  152. return request('post', url, data=data, **kwargs)
  153. def put(url, data=None, **kwargs):
  154. return request('put', url, data=data, **kwargs)
  155. def patch(url, data=None, **kwargs):
  156. return request('patch', url, data=data, **kwargs)
  157. def delete(url, **kwargs):
  158. return request('delete', url, **kwargs)