http.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857
  1. ########################################################################
  2. # Searx-Qt - Lightweight desktop application for Searx.
  3. # Copyright (C) 2020-2024 CYBERDEViL
  4. #
  5. # This file is part of Searx-Qt.
  6. #
  7. # Searx-Qt is free software: you can redistribute it and/or modify
  8. # it under the terms of the GNU General Public License as published by
  9. # the Free Software Foundation, either version 3 of the License, or
  10. # (at your option) any later version.
  11. #
  12. # Searx-Qt is distributed in the hope that it will be useful,
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. # GNU General Public License for more details.
  16. #
  17. # You should have received a copy of the GNU General Public License
  18. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  19. #
  20. ########################################################################
  21. from threading import Thread, Lock
  22. import pycurl
  23. import random
  24. import json
  25. import time
  26. from jsonschema import validate as JsonValidate
  27. from jsonschema.exceptions import ValidationError, SchemaError
  28. from urllib.parse import urlparse
  29. from searxqt.core import log
  30. ## @see https://github.com/pycurl/pycurl/blob/master/examples/retriever-multi.py
  31. # We should ignore SIGPIPE when using pycurl.NOSIGNAL - see
  32. # the libcurl tutorial for more info.
  33. try:
  34. import signal
  35. from signal import SIGPIPE, SIG_IGN
  36. except ImportError:
  37. pass
  38. else:
  39. signal.signal(SIGPIPE, SIG_IGN)
  40. HAVE_SOCKS = False
  41. try:
  42. import socks
  43. HAVE_SOCKS = True
  44. del socks
  45. except ImportError:
  46. log.debug("pysocks not installed! No socks proxy support.")
  47. import ssl
  48. import os
  49. capath = ssl.get_default_verify_paths().capath
  50. if not capath:
  51. raise Exception("Could not find ca path TODO")
  52. CA_CRT_PATH = os.path.join(capath, "ca-certificates.crt")
  53. del ssl
  54. del os
  55. del capath
  56. from .schema import Schemas
  57. ## Our error type definitions.
  58. class ErrorType:
  59. Success = 0
  60. HttpError = 1
  61. ConnectionError = 2
  62. Timeout = 3
  63. WrongStatus = 4
  64. DecodeError = 5
  65. NoResults = 6
  66. ProxyError = 7
  67. SSLError = 8
  68. InvalidSchema = 9
  69. ContentSizeExceeded = 10
  70. CorruptImage = 11
  71. Cancelled = 12
  72. Other = 13
  73. ## cURL error map to our own `ErrorType`.
  74. ## @see https://curl.se/libcurl/c/libcurl-errors.html
  75. CurlErrorMap = {
  76. pycurl.E_HTTP_RETURNED_ERROR : ErrorType.HttpError,
  77. pycurl.E_COULDNT_CONNECT : ErrorType.ConnectionError,
  78. pycurl.E_COULDNT_RESOLVE_HOST : ErrorType.ConnectionError,
  79. pycurl.E_OPERATION_TIMEDOUT : ErrorType.Timeout,
  80. pycurl.E_BAD_CONTENT_ENCODING : ErrorType.DecodeError,
  81. 97 : ErrorType.ProxyError, # CURLE_PROXY
  82. pycurl.E_COULDNT_RESOLVE_PROXY : ErrorType.ProxyError,
  83. pycurl.E_SSL_CONNECT_ERROR : ErrorType.SSLError,
  84. pycurl.E_PEER_FAILED_VERIFICATION: ErrorType.SSLError,
  85. pycurl.E_SSL_SHUTDOWN_FAILED : ErrorType.SSLError,
  86. pycurl.E_SSL_INVALIDCERTSTATUS : ErrorType.SSLError,
  87. pycurl.E_FILESIZE_EXCEEDED : ErrorType.ContentSizeExceeded
  88. }
  89. ## `ErrorType` -> `str` map.
  90. ## Use this to get a string of the error name.
  91. ErrorTypeStr = {
  92. ErrorType.Success: "Success",
  93. ErrorType.HttpError: "HttpError",
  94. ErrorType.ConnectionError: "ConnectionError",
  95. ErrorType.Timeout: "Timeout",
  96. ErrorType.WrongStatus: "WrongStatus",
  97. ErrorType.DecodeError: "DecodeError",
  98. ErrorType.NoResults: "NoResults",
  99. ErrorType.ProxyError: "ProxyError",
  100. ErrorType.SSLError: "SSLError",
  101. ErrorType.InvalidSchema: "InvalidSchema",
  102. ErrorType.ContentSizeExceeded: "ContentSizeExceeded",
  103. ErrorType.CorruptImage: "CorruptImage",
  104. ErrorType.Cancelled: "Cancelled",
  105. ErrorType.Other: "Other"
  106. }
  107. class ProxyProtocol:
  108. HTTP = 1
  109. SOCKS4 = 2
  110. SOCKS5 = 4
  111. ProxyProtocolString = {
  112. 0: "none",
  113. 1: "http",
  114. 2: "socks4",
  115. 4: "socks5"
  116. }
  117. ProxyTypes = {
  118. ## @see https://github.com/pycurl/pycurl/blob/master/src/module.c
  119. "HTTP 1.1": pycurl.PROXYTYPE_HTTP,
  120. "HTTP 1.0": pycurl.PROXYTYPE_HTTP_1_0,
  121. # These are not included in pycurl, see https://github.com/pycurl/pycurl/issues/622
  122. # CURLPROXY_HTTPS = 2, /* HTTPS but stick to HTTP/1 added in 7.52.0 */
  123. # CURLPROXY_HTTPS2 = 3, /* HTTPS and attempt HTTP/2 added in 8.2.0 */
  124. "HTTPS": 2,
  125. "HTTPS2": 3,
  126. }
  127. if HAVE_SOCKS:
  128. ProxyTypes.update({
  129. "Socks 4": pycurl.PROXYTYPE_SOCKS4,
  130. "Socks 4a (with DNS)": pycurl.PROXYTYPE_SOCKS4A,
  131. "Socks 5": pycurl.PROXYTYPE_SOCKS5,
  132. "Socks 5h (with DNS)": pycurl.PROXYTYPE_SOCKS5_HOSTNAME
  133. })
  134. class HttpNewRequestSettings:
  135. """! Read-only request settings
  136. """
  137. def __init__(self, data):
  138. """
  139. @param data A dict object that has no references to other objects
  140. """
  141. self._data = data
  142. def __str__(self):
  143. return str(self._data)
  144. def __getattr__(self, key):
  145. return self._data[key]
  146. class HttpRequestSettings:
  147. """! Request settings
  148. """
  149. def __init__(self):
  150. self._verifySsl = True
  151. self._timeout = 10
  152. self._recvLimit = 5 * 1024 * 1024
  153. self._chunkLimit = 512
  154. self._useragents = []
  155. self._randomUseragent = False
  156. self._extraHeaders = [
  157. "Accept: text/html;q=0.8",
  158. "Accept-Encoding: gzip, deflate",
  159. "Accept-Language: en-US,en;q=0.5",
  160. "Connection: keep-alive",
  161. "DNT: 1",
  162. "Upgrade-Insecure-Requests: 1"
  163. ]
  164. self._proxyEnabled = False
  165. ## "netloc:port" or None
  166. self._proxyAddress = None
  167. ## "username:password" or None
  168. self._proxyAuth = None
  169. ## @see https://github.com/pycurl/pycurl/blob/master/src/module.c
  170. ## pycurl.PROXYTYPE_HTTP
  171. ## pycurl.PROXYTYPE_HTTP_1_0
  172. ## pycurl.PROXYTYPE_SOCKS4
  173. ## pycurl.PROXYTYPE_SOCKS4A
  174. ## pycurl.PROXYTYPE_SOCKS5
  175. ## pycurl.PROXYTYPE_SOCKS5_HOSTNAME
  176. self._proxyType = pycurl.PROXYTYPE_HTTP
  177. def getData(self):
  178. return {
  179. "verifySsl": self.verifySsl,
  180. "timeout": self.timeout,
  181. "recvLimit": self.recvLimit,
  182. "chunkLimit": self.chunkLimit,
  183. "useragents": self.useragents,
  184. "randomUseragent": self.randomUseragent,
  185. "extraHeaders": self._extraHeaders,
  186. "proxyEnabled": self.proxyEnabled,
  187. "proxyAddress": self.proxyAddress,
  188. "proxyAuth": self.proxyAuth,
  189. "proxyType": self.proxyType
  190. }
  191. def setData(self, data):
  192. self.verifySsl = data.get("verifySsl", self.verifySsl)
  193. self.timeout = data.get("timeout", self.timeout)
  194. self.recvLimit = data.get("recvLimit", self.recvLimit)
  195. self.chunkLimit = data.get("chunkLimit", self.chunkLimit)
  196. self.randomUseragent = data.get("randomUseragent", self.randomUseragent)
  197. self.proxyEnabled = data.get("proxyEnabled", self.proxyEnabled)
  198. self.proxyAddress = data.get("proxyAddress", self.proxyAddress)
  199. self.proxyAuth = data.get("proxyAuth", self.proxyAuth)
  200. self.proxyType = data.get("proxyType", self.proxyType)
  201. self.useragents.clear()
  202. for ua in data.get("useragents", []):
  203. self.useragents.append(ua)
  204. self._extraHeaders.clear()
  205. for eh in data.get("extraHeaders", []):
  206. self._extraHeaders.append(eh)
  207. @property
  208. def verifySsl(self):
  209. return self._verifySsl
  210. @verifySsl.setter
  211. def verifySsl(self, state):
  212. self._verifySsl = state
  213. @property
  214. def timeout(self):
  215. return self._timeout
  216. @timeout.setter
  217. def timeout(self, seconds):
  218. self._timeout = seconds
  219. @property
  220. def recvLimit(self):
  221. return self._recvLimit
  222. @recvLimit.setter
  223. def recvLimit(self, bytes):
  224. self._recvLimit = bytes
  225. @property
  226. def chunkLimit(self):
  227. return self._chunkLimit
  228. @chunkLimit.setter
  229. def chunkLimit(self, bytes):
  230. self._chunkLimit = bytes
  231. @property
  232. def useragents(self):
  233. return self._useragents
  234. @property
  235. def randomUseragent(self):
  236. return self._randomUseragent
  237. @randomUseragent.setter
  238. def randomUseragent(self, state):
  239. self._randomUseragent = state
  240. @property
  241. def extraHeaders(self):
  242. return self._extraHeaders
  243. @property
  244. def proxyEnabled(self):
  245. return self._proxyEnabled
  246. @proxyEnabled.setter
  247. def proxyEnabled(self, state):
  248. self._proxyEnabled = state
  249. @property
  250. def proxyAddress(self):
  251. return self._proxyAddress
  252. @proxyAddress.setter
  253. def proxyAddress(self, address):
  254. self._proxyAddress = address
  255. @property
  256. def proxyAuth(self):
  257. return self._proxyAuth
  258. @proxyAuth.setter
  259. def proxyAuth(self, auth):
  260. self._proxyAuth = auth
  261. @property
  262. def proxyType(self):
  263. return self._proxyType
  264. @proxyType.setter
  265. def proxyType(self, protocol):
  266. self._proxyType = protocol
  267. def getUseragent(self):
  268. if not self.useragents:
  269. return None
  270. # Return first useragent string
  271. if len(self.useragents) == 1 or not self.randomUseragent:
  272. return self.useragents[0]
  273. # Return random useragent
  274. return random.choice(self.useragents)
  275. def newRequestSettings(self):
  276. """! Returns a <HttpNewRequestSettings> object that has no references,
  277. so it can be safely used inside another thread.
  278. """
  279. # Make a copy of our self._extraHeaders
  280. header = self._extraHeaders[:]
  281. # Get a User-Agent string or None
  282. useragent = self.getUseragent()
  283. data = {
  284. "verifySsl": self.verifySsl,
  285. "timeout": self.timeout,
  286. "recvLimit": self.recvLimit,
  287. "chunkLimit": self.chunkLimit,
  288. "header": header,
  289. "useragent": useragent,
  290. "proxyEnabled": self.proxyEnabled,
  291. "proxyType": self.proxyType,
  292. "proxyAddress": self.proxyAddress,
  293. "proxyAuth": self.proxyAuth
  294. }
  295. return HttpNewRequestSettings(data)
  296. class HttpRequest:
  297. """! HTTP request
  298. """
  299. def __init__(self, url, settings, data=None):
  300. """
  301. @param url The URL to request.
  302. @param settings Read-only request settings, <HttpNewRequestSettings>
  303. @param data Dict
  304. """
  305. self._url = url
  306. self._parsedUrl = urlparse(url)
  307. self._settings = settings
  308. self._data = data
  309. @property
  310. def url(self):
  311. return self._url
  312. @property
  313. def parsedUrl(self):
  314. return self._parsedUrl
  315. @property
  316. def settings(self):
  317. return self._settings
  318. @property
  319. def data(self):
  320. return self._data
  321. # @NOTE: check that data is not None before calling!
  322. def dataString(self):
  323. return "&".join([f"{k}={v}" for k, v in self.data.items()])
  324. class HttpReponse:
  325. """! HTTP response base class
  326. """
  327. def __init__(self, request, callback):
  328. """
  329. @param request <HttpRequest> object
  330. @param callback Callback function that will be called when the request
  331. has finished and the response is available. The
  332. function/method should accept this object as first
  333. argument.
  334. """
  335. self._request = request
  336. self._callback = callback
  337. self._header = ""
  338. self._content = b''
  339. self._text = ""
  340. self._error = ErrorType.Success
  341. self._errorMsg = None
  342. self._statusCode = 0
  343. self._recvLen = 0
  344. @property
  345. def request(self):
  346. return self._request
  347. @property
  348. def callback(self):
  349. return self._callback
  350. @property
  351. def content(self):
  352. return self._content
  353. @property
  354. def text(self):
  355. return self._text
  356. @property
  357. def statusCode(self):
  358. return self._statusCode
  359. @property
  360. def error(self):
  361. return self._error
  362. @property
  363. def errorMessage(self):
  364. return self._errorMsg
  365. def cancel(self):
  366. self.setError(ErrorType.Cancelled, "Cancelled")
  367. def isCancelled(self):
  368. return self.error == ErrorType.Cancelled
  369. def writeContent(self, data):
  370. if self.error != ErrorType.Success:
  371. # This request got cancelled
  372. return 0
  373. dataLen = len(data)
  374. if (self._recvLen + dataLen) > self.request.settings.recvLimit:
  375. self.setError(ErrorType.ContentSizeExceeded, "Max content size exceeded")
  376. return 0
  377. self._content += data
  378. return dataLen
  379. def writeHeader(self, data):
  380. self._header += data
  381. def setError(self, error, message):
  382. self._errorMsg = message
  383. self._error = error
  384. def setStatusCode(self, statusCode):
  385. self._statusCode = statusCode
  386. if statusCode != 200:
  387. self.setError(ErrorType.WrongStatus, "")
  388. def verifyContent(self, httpThread):
  389. text = None
  390. try:
  391. text = self.content.decode("utf-8")
  392. except UnicodeDecodeError as err:
  393. self.setError(ErrorType.DecodeError, str(err))
  394. else:
  395. self._text = text
  396. class HttpJsonReponse(HttpReponse):
  397. """! JSON response base class
  398. """
  399. ## JSON Schema to validate the response. Subclasses should set this.
  400. Schema = {}
  401. def __init__(self, request, callback):
  402. HttpReponse.__init__(self, request, callback)
  403. self._json = {}
  404. def verifySchema(self):
  405. try:
  406. JsonValidate(instance=self._json, schema=self.Schema)
  407. except ValidationError as err:
  408. self.setError(ErrorType.InvalidSchema, f"InvalidSchema: `{err}`")
  409. except SchemaError as err:
  410. self.setError(ErrorType.InvalidSchema, f"InvalidSchema: `{err}`")
  411. def verifyContent(self, httpThread):
  412. HttpReponse.verifyContent(self, httpThread)
  413. if self.error != ErrorType.Success:
  414. return
  415. try:
  416. self._json = json.loads(self.text)
  417. except json.JSONDecodeError as err:
  418. self.setError(ErrorType.DecodeError, f"DecodeError: `{err}`")
  419. return
  420. self.verifySchema()
  421. def json(self):
  422. return self._json
  423. class InstancesJsonReponse(HttpJsonReponse):
  424. Schema = Schemas["searx_space_instances"]
  425. class HttpImageReponse(HttpReponse):
  426. def verifyContent(self, httpThread):
  427. pass
  428. class SearxngHtmlReponse(HttpReponse):
  429. @property
  430. def callback(self):
  431. #if self._linktoken:
  432. # return None
  433. if self.error == ErrorType.Cancelled:
  434. return None
  435. return self._callback
  436. def verifyContent(self, httpThread):
  437. pass
  438. class HttpThread(Thread):
  439. def __init__(self):
  440. Thread.__init__(self)
  441. self._curlMulti = pycurl.CurlMulti()
  442. ## list with <pycurl.Curl> objects
  443. self._curlMulti.handles = []
  444. ## list with <pycurl.Curl> objects, handles that are idle.
  445. self._freeHandles = []
  446. ## Maximum concurrent connections (not host dependent)
  447. self._maxConn = 10
  448. ## list with <HttpReponse> objects
  449. self._queue = []
  450. ## list with completed <HttpReponse> objects waiting to be
  451. ## processed by the main thread (on processCallbacks() call).
  452. self._respQueue = []
  453. ## Loop breaks when set to True
  454. self._exit = False
  455. ## This will be set on cancelAll()
  456. self._cancelled = False
  457. ## Limiting concurrent connections to the same host
  458. self._maxHost = 2
  459. self._curHosts = {} # {netloc: open-connection-count}
  460. # Locks
  461. self._queueLock = Lock() # Adding new requests
  462. self._respQueueLock = Lock() # Getting done requests
  463. self._exitLock = Lock()
  464. self._cancelLock = Lock()
  465. # Init `self._maxConn` amount of handles
  466. for i in range(self._maxConn):
  467. handle = pycurl.Curl()
  468. handle.fp = None
  469. handle.setopt(pycurl.FOLLOWLOCATION, 1)
  470. handle.setopt(pycurl.MAXREDIRS, 2)
  471. handle.setopt(pycurl.NOSIGNAL, 1)
  472. #handle.setopt(pycurl.CAINFO, "/etc/ssl/certs/ca-certificates.crt") # make settable
  473. handle.setopt(pycurl.CAINFO, CA_CRT_PATH)
  474. self._curlMulti.handles.append(handle)
  475. # Copy all handles to _freeHandles, since all are idle.
  476. self._freeHandles = self._curlMulti.handles[:]
  477. def __subtractOpenHost(self, response):
  478. """! This is called when a response is ready, it will subtract one
  479. from the open connection counter of the requested host.
  480. """
  481. netloc = response.request.parsedUrl.netloc
  482. self._curHosts[netloc] -= 1
  483. if self._curHosts[netloc] == 0:
  484. del self._curHosts[netloc]
  485. def exit(self):
  486. """! Stop the thread loop.
  487. @note Call .join() on this object to wait till the thread is finished.
  488. @note This is thread safe and may be called from the main thread, or
  489. any other thread.
  490. """
  491. self._exitLock.acquire()
  492. self._exit = True
  493. self._exitLock.release()
  494. def cancelAll(self):
  495. """! Cancel all current requests.
  496. Response callbacks won't be called on cancelled requests.
  497. @note This is thread safe and may be called from the main thread, or
  498. any other thread.
  499. """
  500. self._cancelLock.acquire()
  501. self._cancelled = True
  502. self._cancelLock.release()
  503. def run(self):
  504. """! The actual loop that will handle requests.
  505. @note Do not call this! Call .start() instead.
  506. """
  507. while True:
  508. # These are on hold because there are to many connections to the
  509. # host.
  510. doLater = []
  511. # Add new jobs from the queue
  512. new = []
  513. self._queueLock.acquire()
  514. while self._queue and self._freeHandles:
  515. response = self._queue.pop(0)
  516. request = response.request
  517. # Honor maximum amount of concurrent connections to the same
  518. # host.
  519. netloc = request.parsedUrl.netloc
  520. hostCount = self._curHosts.get(netloc, None)
  521. if hostCount is None:
  522. self._curHosts.update({netloc: 1})
  523. elif hostCount == self._maxHost:
  524. doLater.append(response)
  525. continue
  526. else:
  527. self._curHosts[netloc] += 1
  528. handle = self._freeHandles.pop()
  529. new.append((response, handle))
  530. self._queueLock.release()
  531. for response, handle in new:
  532. request = response.request
  533. # Set the requested URL
  534. handle.setopt(pycurl.URL, request.url)
  535. # Set POST data
  536. if request.data:
  537. handle.setopt(pycurl.POSTFIELDS, request.dataString())
  538. # Option: Timeout
  539. handle.setopt(pycurl.CONNECTTIMEOUT, request.settings.timeout)
  540. handle.setopt(pycurl.TIMEOUT, request.settings.timeout)
  541. # Option: Verify SSL
  542. if request.settings.verifySsl:
  543. handle.setopt(pycurl.SSL_VERIFYPEER, 1)
  544. handle.setopt(pycurl.SSL_VERIFYHOST, 2)
  545. else:
  546. handle.setopt(pycurl.SSL_VERIFYPEER, 0)
  547. handle.setopt(pycurl.SSL_VERIFYHOST, 0)
  548. # Option: Receive limit
  549. handle.setopt(pycurl.MAXFILESIZE, request.settings.recvLimit)
  550. # Option: Proxy
  551. # @see https://curl.se/libcurl/c/CURLOPT_PROXY.html
  552. # @see https://curl.se/libcurl/c/CURLOPT_PROXYTYPE.html
  553. # @see https://curl.se/libcurl/c/CURLOPT_PROXYUSERPWD.html
  554. if request.settings.proxyEnabled:
  555. handle.setopt(pycurl.PROXY, request.settings.proxyAddress) # netloc:port
  556. handle.setopt(pycurl.PROXYUSERPWD, request.settings.proxyAuth) # username:password
  557. handle.setopt(pycurl.PROXYTYPE, request.settings.proxyType)
  558. else:
  559. handle.setopt(pycurl.PROXY, None)
  560. handle.setopt(pycurl.PROXYUSERPWD, None)
  561. # User-Agent
  562. if request.settings.useragent:
  563. handle.setopt(pycurl.USERAGENT, request.settings.useragent)
  564. else:
  565. handle.setopt(pycurl.USERAGENT, "")
  566. # Accept compression
  567. handle.setopt(pycurl.ACCEPT_ENCODING, "gzip, deflate")
  568. # Header
  569. handle.setopt(pycurl.HTTPHEADER, request.settings.header)
  570. handle.setopt(pycurl.WRITEFUNCTION, response.writeContent)
  571. #handle.setopt(pycurl.HEADERFUNCTION, response.writeHeader)
  572. handle.response = response
  573. self._curlMulti.add_handle(handle)
  574. # Perform
  575. while True:
  576. ret, num_handles = self._curlMulti.perform()
  577. if ret != pycurl.E_CALL_MULTI_PERFORM:
  578. break
  579. # Handle completed requests
  580. while True:
  581. num_q, ok_list, err_list = self._curlMulti.info_read()
  582. # Success
  583. for handle in ok_list:
  584. response = handle.response
  585. # Subtract this host from open connections
  586. self.__subtractOpenHost(response)
  587. if not response.isCancelled():
  588. response.setStatusCode(handle.getinfo(handle.RESPONSE_CODE))
  589. # handle.getinfo(pycurl.EFFECTIVE_URL)
  590. if response.error == ErrorType.Success:
  591. response.verifyContent(self)
  592. # Add the completed task so it can be processed
  593. # by the main thread.
  594. self._respQueueLock.acquire()
  595. self._respQueue.append(response)
  596. self._respQueueLock.release()
  597. self._curlMulti.remove_handle(handle)
  598. self._freeHandles.append(handle)
  599. # Error
  600. for handle, errno, errmsg in err_list:
  601. response = handle.response
  602. # Subtract this host from open connections
  603. self.__subtractOpenHost(response)
  604. if not response.isCancelled():
  605. response.setStatusCode(handle.getinfo(handle.RESPONSE_CODE))
  606. if not response.error:
  607. if errno in CurlErrorMap:
  608. response.setError(CurlErrorMap[errno], errmsg)
  609. else:
  610. response.setError(ErrorType.Other, errmsg)
  611. # Add the completed task so it can be processed
  612. # by the main thread.
  613. self._respQueueLock.acquire()
  614. self._respQueue.append(response)
  615. self._respQueueLock.release()
  616. self._curlMulti.remove_handle(handle)
  617. self._freeHandles.append(handle)
  618. if num_q == 0:
  619. break
  620. # Exit
  621. self._exitLock.acquire()
  622. if self._exit:
  623. self._exitLock.release()
  624. # Wait
  625. num_q = 1
  626. while num_q:
  627. num_q, ok_list, err_list = self._curlMulti.info_read()
  628. time.sleep(0.2)
  629. break
  630. self._exitLock.release()
  631. # Cancel all current requests
  632. self._cancelLock.acquire()
  633. if self._cancelled:
  634. # Clear the queue
  635. self._queueLock.acquire()
  636. self._queue.clear()
  637. self._queueLock.release()
  638. # These thread will finish but don't callback because they
  639. # are cancelled.
  640. for handle in self._curlMulti.handles:
  641. if handle in self._freeHandles:
  642. continue
  643. handle.response.cancel()
  644. # Clear doLater
  645. doLater.clear()
  646. # Callback
  647. self._cancelled = False
  648. self._cancelLock.release()
  649. # Add back requests that are on hold because there are to many
  650. # open connections to the same host.
  651. self._queueLock.acquire()
  652. for response in doLater:
  653. self._queue.append(response)
  654. self._queueLock.release()
  655. # Wait for activity
  656. self._curlMulti.select(1.0)
  657. time.sleep(0.2)
  658. """ Below should be called from main thread """
  659. def get(self, response):
  660. """! Add a new GET request to the queue.
  661. @param response <HttpReponse> or derivative.
  662. """
  663. request = response.request
  664. log.debug(f"NEW REQUEST to <{request.url}>")
  665. log.debug("--------------------------------")
  666. log.debug(f"data : {request.data}")
  667. log.debug(f"settings: {request.settings}")
  668. log.debug("")
  669. self._queueLock.acquire()
  670. self._queue.append(response)
  671. self._queueLock.release()
  672. def processCallbacks(self):
  673. """! Callback completed requests, this should be run from your
  674. main thread, not this thread!
  675. """
  676. self._respQueueLock.acquire()
  677. responses = self._respQueue[:]
  678. self._respQueue.clear()
  679. self._respQueueLock.release()
  680. for response in responses:
  681. request = response.request
  682. log.debug(f"RESPONSE status {response.statusCode} for "
  683. f"<{request.url}> with content size of "
  684. f"{len(response.content)}")
  685. log.debug(f" - Error: {response.error} {response.errorMessage}")
  686. if response.callback:
  687. response.callback(response)
  688. return len(responses)