query.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # pylint: disable=invalid-name, missing-module-docstring, missing-class-docstring
  3. from __future__ import annotations
  4. from abc import abstractmethod, ABC
  5. import re
  6. from searx import settings
  7. from searx.sxng_locales import sxng_locales
  8. from searx.engines import categories, engines, engine_shortcuts
  9. from searx.external_bang import get_bang_definition_and_autocomplete
  10. from searx.search import EngineRef
  11. from searx.webutils import VALID_LANGUAGE_CODE
  12. class QueryPartParser(ABC):
  13. __slots__ = "raw_text_query", "enable_autocomplete"
  14. @staticmethod
  15. @abstractmethod
  16. def check(raw_value):
  17. """Check if raw_value can be parsed"""
  18. def __init__(self, raw_text_query, enable_autocomplete):
  19. self.raw_text_query = raw_text_query
  20. self.enable_autocomplete = enable_autocomplete
  21. @abstractmethod
  22. def __call__(self, raw_value):
  23. """Try to parse raw_value: set the self.raw_text_query properties
  24. return True if raw_value has been parsed
  25. self.raw_text_query.autocomplete_list is also modified
  26. if self.enable_autocomplete is True
  27. """
  28. def _add_autocomplete(self, value):
  29. if value not in self.raw_text_query.autocomplete_list:
  30. self.raw_text_query.autocomplete_list.append(value)
  31. class TimeoutParser(QueryPartParser):
  32. @staticmethod
  33. def check(raw_value):
  34. return raw_value[0] == '<'
  35. def __call__(self, raw_value):
  36. value = raw_value[1:]
  37. found = self._parse(value) if len(value) > 0 else False
  38. if self.enable_autocomplete and not value:
  39. self._autocomplete()
  40. return found
  41. def _parse(self, value):
  42. if not value.isdigit():
  43. return False
  44. raw_timeout_limit = int(value)
  45. if raw_timeout_limit < 100:
  46. # below 100, the unit is the second ( <3 = 3 seconds timeout )
  47. self.raw_text_query.timeout_limit = float(raw_timeout_limit)
  48. else:
  49. # 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )
  50. self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0
  51. return True
  52. def _autocomplete(self):
  53. for suggestion in ['<3', '<850']:
  54. self._add_autocomplete(suggestion)
  55. class LanguageParser(QueryPartParser):
  56. @staticmethod
  57. def check(raw_value):
  58. return raw_value[0] == ':'
  59. def __call__(self, raw_value):
  60. value = raw_value[1:].lower().replace('_', '-')
  61. found = self._parse(value) if len(value) > 0 else False
  62. if self.enable_autocomplete and not found:
  63. self._autocomplete(value)
  64. return found
  65. def _parse(self, value):
  66. found = False
  67. # check if any language-code is equal with
  68. # declared language-codes
  69. for lc in sxng_locales:
  70. lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
  71. # if correct language-code is found
  72. # set it as new search-language
  73. if (
  74. value == lang_id or value == lang_name or value == english_name or value.replace('-', ' ') == country
  75. ) and value not in self.raw_text_query.languages:
  76. found = True
  77. lang_parts = lang_id.split('-')
  78. if len(lang_parts) == 2:
  79. self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
  80. else:
  81. self.raw_text_query.languages.append(lang_id)
  82. # to ensure best match (first match is not necessarily the best one)
  83. if value == lang_id:
  84. break
  85. # user may set a valid, yet not selectable language
  86. if VALID_LANGUAGE_CODE.match(value) or value == 'auto':
  87. lang_parts = value.split('-')
  88. if len(lang_parts) > 1:
  89. value = lang_parts[0].lower() + '-' + lang_parts[1].upper()
  90. if value not in self.raw_text_query.languages:
  91. self.raw_text_query.languages.append(value)
  92. found = True
  93. return found
  94. def _autocomplete(self, value):
  95. if not value:
  96. # show some example queries
  97. if len(settings['search']['languages']) < 10:
  98. for lang in settings['search']['languages']:
  99. self.raw_text_query.autocomplete_list.append(':' + lang)
  100. else:
  101. for lang in [":en", ":en_us", ":english", ":united_kingdom"]:
  102. self.raw_text_query.autocomplete_list.append(lang)
  103. return
  104. for lc in sxng_locales:
  105. if lc[0] not in settings['search']['languages']:
  106. continue
  107. lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
  108. # check if query starts with language-id
  109. if lang_id.startswith(value):
  110. if len(value) <= 2:
  111. self._add_autocomplete(':' + lang_id.split('-')[0])
  112. else:
  113. self._add_autocomplete(':' + lang_id)
  114. # check if query starts with language name
  115. if lang_name.startswith(value) or english_name.startswith(value):
  116. self._add_autocomplete(':' + lang_name)
  117. # check if query starts with country
  118. # here "new_zealand" is "new-zealand" (see __call__)
  119. if country.startswith(value.replace('-', ' ')):
  120. self._add_autocomplete(':' + country.replace(' ', '_'))
  121. class ExternalBangParser(QueryPartParser):
  122. @staticmethod
  123. def check(raw_value):
  124. return raw_value.startswith('!!') and len(raw_value) > 2
  125. def __call__(self, raw_value):
  126. value = raw_value[2:]
  127. found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])
  128. if self.enable_autocomplete:
  129. self._autocomplete(bang_ac_list)
  130. return found
  131. def _parse(self, value):
  132. found = False
  133. bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)
  134. if bang_definition is not None:
  135. self.raw_text_query.external_bang = value
  136. found = True
  137. return found, bang_ac_list
  138. def _autocomplete(self, bang_ac_list):
  139. if not bang_ac_list:
  140. bang_ac_list = ['g', 'ddg', 'bing']
  141. for external_bang in bang_ac_list:
  142. self._add_autocomplete('!!' + external_bang)
  143. class BangParser(QueryPartParser):
  144. @staticmethod
  145. def check(raw_value):
  146. # make sure it's not any bang with double '!!'
  147. return raw_value[0] == '!' and (len(raw_value) < 2 or raw_value[1] != '!')
  148. def __call__(self, raw_value):
  149. value = raw_value[1:].replace('-', ' ').replace('_', ' ')
  150. found = self._parse(value) if len(value) > 0 else False
  151. if found and raw_value[0] == '!':
  152. self.raw_text_query.specific = True
  153. if self.enable_autocomplete:
  154. self._autocomplete(raw_value[0], value)
  155. return found
  156. def _parse(self, value):
  157. # check if prefix is equal with engine shortcut
  158. if value in engine_shortcuts: # pylint: disable=consider-using-get
  159. value = engine_shortcuts[value]
  160. # check if prefix is equal with engine name
  161. if value in engines:
  162. self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))
  163. return True
  164. # check if prefix is equal with category name
  165. if value in categories:
  166. # using all engines for that search, which
  167. # are declared under that category name
  168. self.raw_text_query.enginerefs.extend(
  169. EngineRef(engine.name, value)
  170. for engine in categories[value]
  171. if (engine.name, value) not in self.raw_text_query.disabled_engines
  172. )
  173. return True
  174. return False
  175. def _autocomplete(self, first_char, value):
  176. if not value:
  177. # show some example queries
  178. for suggestion in ['images', 'wikipedia', 'osm']:
  179. if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:
  180. self._add_autocomplete(first_char + suggestion)
  181. return
  182. # check if query starts with category name
  183. for category in categories:
  184. if category.startswith(value):
  185. self._add_autocomplete(first_char + category.replace(' ', '_'))
  186. # check if query starts with engine name
  187. for engine in engines:
  188. if engine.startswith(value):
  189. self._add_autocomplete(first_char + engine.replace(' ', '_'))
  190. # check if query starts with engine shortcut
  191. for engine_shortcut in engine_shortcuts:
  192. if engine_shortcut.startswith(value):
  193. self._add_autocomplete(first_char + engine_shortcut)
  194. class FeelingLuckyParser(QueryPartParser):
  195. @staticmethod
  196. def check(raw_value):
  197. return raw_value == '!!'
  198. def __call__(self, raw_value):
  199. self.raw_text_query.redirect_to_first_result = True
  200. return True
  201. class RawTextQuery:
  202. """parse raw text query (the value from the html input)"""
  203. PARSER_CLASSES = [
  204. TimeoutParser, # force the timeout
  205. LanguageParser, # force a language
  206. ExternalBangParser, # external bang (must be before BangParser)
  207. BangParser, # force an engine or category
  208. FeelingLuckyParser, # redirect to the first link in the results list
  209. ]
  210. def __init__(self, query: str, disabled_engines: list):
  211. assert isinstance(query, str)
  212. # input parameters
  213. self.query = query
  214. self.disabled_engines = disabled_engines if disabled_engines else []
  215. # parsed values
  216. self.enginerefs = []
  217. self.languages = []
  218. self.timeout_limit = None
  219. self.external_bang = None
  220. self.specific = False
  221. self.autocomplete_list = []
  222. # internal properties
  223. self.query_parts = [] # use self.getFullQuery()
  224. self.user_query_parts = [] # use self.getQuery()
  225. self.autocomplete_location = None
  226. self.redirect_to_first_result = False
  227. self._parse_query()
  228. def _parse_query(self):
  229. """
  230. parse self.query, if tags are set, which
  231. change the search engine or search-language
  232. """
  233. # split query, including whitespaces
  234. raw_query_parts = re.split(r'(\s+)', self.query)
  235. last_index_location = None
  236. autocomplete_index = len(raw_query_parts) - 1
  237. for i, query_part in enumerate(raw_query_parts):
  238. # part does only contain spaces, skip
  239. if query_part.isspace() or query_part == '':
  240. continue
  241. # parse special commands
  242. special_part = False
  243. for parser_class in RawTextQuery.PARSER_CLASSES:
  244. if parser_class.check(query_part):
  245. special_part = parser_class(self, i == autocomplete_index)(query_part)
  246. break
  247. # append query part to query_part list
  248. qlist = self.query_parts if special_part else self.user_query_parts
  249. qlist.append(query_part)
  250. last_index_location = (qlist, len(qlist) - 1)
  251. self.autocomplete_location = last_index_location
  252. def get_autocomplete_full_query(self, text):
  253. qlist, position = self.autocomplete_location
  254. qlist[position] = text
  255. return self.getFullQuery()
  256. def changeQuery(self, query):
  257. self.user_query_parts = query.strip().split()
  258. self.query = self.getFullQuery()
  259. self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)
  260. self.autocomplete_list = []
  261. return self
  262. def getQuery(self):
  263. return ' '.join(self.user_query_parts)
  264. def getFullQuery(self):
  265. """
  266. get full query including whitespaces
  267. """
  268. return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()
  269. def __str__(self):
  270. return self.getFullQuery()
  271. def __repr__(self):
  272. return (
  273. f"<{self.__class__.__name__} "
  274. + f"query={self.query!r} "
  275. + f"disabled_engines={self.disabled_engines!r}\n "
  276. + f"languages={self.languages!r} "
  277. + f"timeout_limit={self.timeout_limit!r} "
  278. + f"external_bang={self.external_bang!r} "
  279. + f"specific={self.specific!r} "
  280. + f"enginerefs={self.enginerefs!r}\n "
  281. + f"autocomplete_list={self.autocomplete_list!r}\n "
  282. + f"query_parts={self.query_parts!r}\n "
  283. + f"user_query_parts={self.user_query_parts!r} >\n"
  284. + f"redirect_to_first_result={self.redirect_to_first_result!r}"
  285. )