results.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # pylint: disable=missing-module-docstring, missing-class-docstring
  3. from __future__ import annotations
  4. import warnings
  5. from collections import defaultdict
  6. from threading import RLock
  7. from typing import List, NamedTuple, Set
  8. from searx import logger as log
  9. import searx.engines
  10. from searx.metrics import histogram_observe, counter_add
  11. from searx.result_types import Result, LegacyResult, MainResult
  12. from searx.result_types.answer import AnswerSet, BaseAnswer
  13. def calculate_score(result, priority) -> float:
  14. weight = 1.0
  15. for result_engine in result['engines']:
  16. if hasattr(searx.engines.engines.get(result_engine), 'weight'):
  17. weight *= float(searx.engines.engines[result_engine].weight)
  18. weight *= len(result['positions'])
  19. score = 0
  20. for position in result['positions']:
  21. if priority == 'low':
  22. continue
  23. if priority == 'high':
  24. score += weight
  25. else:
  26. score += weight / position
  27. return score
  28. class Timing(NamedTuple):
  29. engine: str
  30. total: float
  31. load: float
  32. class UnresponsiveEngine(NamedTuple):
  33. engine: str
  34. error_type: str
  35. suspended: bool
  36. class ResultContainer:
  37. """In the result container, the results are collected, sorted and duplicates
  38. will be merged."""
  39. # pylint: disable=too-many-statements
  40. main_results_map: dict[int, MainResult | LegacyResult]
  41. infoboxes: list[LegacyResult]
  42. suggestions: set[str]
  43. answers: AnswerSet
  44. corrections: set[str]
  45. def __init__(self):
  46. self.main_results_map = {}
  47. self.infoboxes = []
  48. self.suggestions = set()
  49. self.answers = AnswerSet()
  50. self.corrections = set()
  51. self._number_of_results: list[int] = []
  52. self.engine_data: dict[str, dict[str, str]] = defaultdict(dict)
  53. self._closed: bool = False
  54. self.paging: bool = False
  55. self.unresponsive_engines: Set[UnresponsiveEngine] = set()
  56. self.timings: List[Timing] = []
  57. self.redirect_url: str | None = None
  58. self.on_result = lambda _: True
  59. self._lock = RLock()
  60. self._main_results_sorted: list[MainResult | LegacyResult] = None # type: ignore
  61. def extend(self, engine_name: str | None, results): # pylint: disable=too-many-branches
  62. if self._closed:
  63. log.debug("container is closed, ignoring results: %s", results)
  64. return
  65. main_count = 0
  66. for result in list(results):
  67. if isinstance(result, Result):
  68. result.engine = result.engine or engine_name
  69. result.normalize_result_fields()
  70. if isinstance(result, BaseAnswer) and self.on_result(result):
  71. self.answers.add(result)
  72. elif isinstance(result, MainResult) and self.on_result(result):
  73. main_count += 1
  74. self._merge_main_result(result, main_count)
  75. else:
  76. # more types need to be implemented in the future ..
  77. raise NotImplementedError(f"no handler implemented to process the result of type {result}")
  78. else:
  79. result["engine"] = result.get("engine") or engine_name or ""
  80. result = LegacyResult(result) # for backward compatibility, will be romeved one day
  81. result.normalize_result_fields()
  82. if "suggestion" in result:
  83. if self.on_result(result):
  84. self.suggestions.add(result["suggestion"])
  85. continue
  86. if "answer" in result:
  87. if self.on_result(result):
  88. warnings.warn(
  89. f"answer results from engine {result.engine}"
  90. " are without typification / migrate to Answer class.",
  91. DeprecationWarning,
  92. )
  93. self.answers.add(result) # type: ignore
  94. continue
  95. if "correction" in result:
  96. if self.on_result(result):
  97. self.corrections.add(result["correction"])
  98. continue
  99. if "infobox" in result:
  100. if self.on_result(result):
  101. self._merge_infobox(result)
  102. continue
  103. if "number_of_results" in result:
  104. if self.on_result(result):
  105. self._number_of_results.append(result["number_of_results"])
  106. continue
  107. if "engine_data" in result:
  108. if self.on_result(result):
  109. if result.engine:
  110. self.engine_data[result.engine][result["key"]] = result["engine_data"]
  111. continue
  112. if self.on_result(result):
  113. main_count += 1
  114. self._merge_main_result(result, main_count)
  115. continue
  116. if engine_name in searx.engines.engines:
  117. eng = searx.engines.engines[engine_name]
  118. histogram_observe(main_count, "engine", eng.name, "result", "count")
  119. if not self.paging and eng.paging:
  120. self.paging = True
  121. def _merge_infobox(self, new_infobox: LegacyResult):
  122. add_infobox = True
  123. new_id = getattr(new_infobox, "id", None)
  124. if new_id is not None:
  125. with self._lock:
  126. for existing_infobox in self.infoboxes:
  127. if new_id == getattr(existing_infobox, "id", None):
  128. merge_two_infoboxes(existing_infobox, new_infobox)
  129. add_infobox = False
  130. if add_infobox:
  131. self.infoboxes.append(new_infobox)
  132. def _merge_main_result(self, result: MainResult | LegacyResult, position):
  133. result_hash = hash(result)
  134. with self._lock:
  135. merged = self.main_results_map.get(result_hash)
  136. if not merged:
  137. # if there is no duplicate in the merged results, append result
  138. result.positions = [position]
  139. self.main_results_map[result_hash] = result
  140. return
  141. merge_two_main_results(merged, result)
  142. # add the new position
  143. merged.positions.append(position)
  144. def close(self):
  145. self._closed = True
  146. for result in self.main_results_map.values():
  147. result.score = calculate_score(result, result.priority)
  148. for eng_name in result.engines:
  149. counter_add(result.score, 'engine', eng_name, 'score')
  150. def get_ordered_results(self) -> list[MainResult | LegacyResult]:
  151. """Returns a sorted list of results to be displayed in the main result
  152. area (:ref:`result types`)."""
  153. if not self._closed:
  154. self.close()
  155. if self._main_results_sorted:
  156. return self._main_results_sorted
  157. # first pass, sort results by "score" (descanding)
  158. results = sorted(self.main_results_map.values(), key=lambda x: x.score, reverse=True)
  159. # pass 2 : group results by category and template
  160. gresults = []
  161. categoryPositions = {}
  162. max_count = 8
  163. max_distance = 20
  164. for res in results:
  165. # do we need to handle more than one category per engine?
  166. engine = searx.engines.engines.get(res.engine or "")
  167. if engine:
  168. res.category = engine.categories[0] if len(engine.categories) > 0 else ""
  169. # do we need to handle more than one category per engine?
  170. category = f"{res.category}:{res.template}:{'img_src' if (res.thumbnail or res.img_src) else ''}"
  171. grp = categoryPositions.get(category)
  172. # group with previous results using the same category, if the group
  173. # can accept more result and is not too far from the current
  174. # position
  175. if (grp is not None) and (grp["count"] > 0) and (len(gresults) - grp["index"] < max_distance):
  176. # group with the previous results using the same category with
  177. # this one
  178. index = grp["index"]
  179. gresults.insert(index, res)
  180. # update every index after the current one (including the
  181. # current one)
  182. for item in categoryPositions.values():
  183. v = item["index"]
  184. if v >= index:
  185. item["index"] = v + 1
  186. # update this category
  187. grp["count"] -= 1
  188. else:
  189. gresults.append(res)
  190. # update categoryIndex
  191. categoryPositions[category] = {"index": len(gresults), "count": max_count}
  192. continue
  193. self._main_results_sorted = gresults
  194. return self._main_results_sorted
  195. @property
  196. def number_of_results(self) -> int:
  197. """Returns the average of results number, returns zero if the average
  198. result number is smaller than the actual result count."""
  199. if not self._closed:
  200. log.error("call to ResultContainer.number_of_results before ResultContainer.close")
  201. return 0
  202. with self._lock:
  203. resultnum_sum = sum(self._number_of_results)
  204. if not resultnum_sum or not self._number_of_results:
  205. return 0
  206. average = int(resultnum_sum / len(self._number_of_results))
  207. if average < len(self.get_ordered_results()):
  208. average = 0
  209. return average
  210. def add_unresponsive_engine(self, engine_name: str, error_type: str, suspended: bool = False):
  211. with self._lock:
  212. if self._closed:
  213. log.error("call to ResultContainer.add_unresponsive_engine after ResultContainer.close")
  214. return
  215. if searx.engines.engines[engine_name].display_error_messages:
  216. self.unresponsive_engines.add(UnresponsiveEngine(engine_name, error_type, suspended))
  217. def add_timing(self, engine_name: str, engine_time: float, page_load_time: float):
  218. with self._lock:
  219. if self._closed:
  220. log.error("call to ResultContainer.add_timing after ResultContainer.close")
  221. return
  222. self.timings.append(Timing(engine_name, total=engine_time, load=page_load_time))
  223. def get_timings(self):
  224. with self._lock:
  225. if not self._closed:
  226. log.error("call to ResultContainer.get_timings before ResultContainer.close")
  227. return []
  228. return self.timings
  229. def merge_two_infoboxes(origin: LegacyResult, other: LegacyResult):
  230. """Merges the values from ``other`` into ``origin``."""
  231. # pylint: disable=too-many-branches
  232. weight1 = getattr(searx.engines.engines[origin.engine], "weight", 1)
  233. weight2 = getattr(searx.engines.engines[other.engine], "weight", 1)
  234. if weight2 > weight1:
  235. origin.engine = other.engine
  236. origin.engines |= other.engines
  237. if other.urls:
  238. url_items = origin.get("urls", [])
  239. for url2 in other.urls:
  240. unique_url = True
  241. entity_url2 = url2.get("entity")
  242. for url1 in origin.get("urls", []):
  243. if (entity_url2 is not None and entity_url2 == url1.get("entity")) or (
  244. url1.get("url") == url2.get("url")
  245. ):
  246. unique_url = False
  247. break
  248. if unique_url:
  249. url_items.append(url2)
  250. origin.urls = url_items
  251. if other.img_src:
  252. if not origin.img_src:
  253. origin.img_src = other.img_src
  254. elif weight2 > weight1:
  255. origin.img_src = other.img_src
  256. if other.attributes:
  257. if not origin.attributes:
  258. origin.attributes = other.attributes
  259. else:
  260. attr_names_1 = set()
  261. for attr in origin.attributes:
  262. label = attr.get("label")
  263. if label:
  264. attr_names_1.add(label)
  265. entity = attr.get("entity")
  266. if entity:
  267. attr_names_1.add(entity)
  268. for attr in other.attributes:
  269. if attr.get("label") not in attr_names_1 and attr.get('entity') not in attr_names_1:
  270. origin.attributes.append(attr)
  271. if other.content:
  272. if not origin.content:
  273. origin.content = other.content
  274. elif len(other.content) > len(origin.content):
  275. origin.content = other.content
  276. def merge_two_main_results(origin: MainResult | LegacyResult, other: MainResult | LegacyResult):
  277. """Merges the values from ``other`` into ``origin``."""
  278. if len(other.content) > len(origin.content):
  279. # use content with more text
  280. origin.content = other.content
  281. # use title with more text
  282. if len(other.title) > len(origin.title):
  283. origin.title = other.title
  284. # merge all result's parameters not found in origin
  285. if isinstance(other, MainResult) and isinstance(origin, MainResult):
  286. origin.defaults_from(other)
  287. elif isinstance(other, LegacyResult) and isinstance(origin, LegacyResult):
  288. origin.defaults_from(other)
  289. # add engine to list of result-engines
  290. origin.engines.add(other.engine or "")
  291. # use https, ftps, .. if possible
  292. if origin.parsed_url and not origin.parsed_url.scheme.endswith("s"):
  293. if other.parsed_url and other.parsed_url.scheme.endswith("s"):
  294. origin.parsed_url = origin.parsed_url._replace(scheme=other.parsed_url.scheme)
  295. origin.url = origin.parsed_url.geturl()