query.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. from abc import abstractmethod, ABC
  3. import re
  4. from searx.languages import language_codes
  5. from searx.engines import categories, engines, engine_shortcuts
  6. from searx.external_bang import get_bang_definition_and_autocomplete
  7. from searx.search import EngineRef
  8. from searx.webutils import VALID_LANGUAGE_CODE
  9. class QueryPartParser(ABC):
  10. __slots__ = "raw_text_query", "enable_autocomplete"
  11. @staticmethod
  12. @abstractmethod
  13. def check(raw_value):
  14. """Check if raw_value can be parsed"""
  15. def __init__(self, raw_text_query, enable_autocomplete):
  16. self.raw_text_query = raw_text_query
  17. self.enable_autocomplete = enable_autocomplete
  18. @abstractmethod
  19. def __call__(self, raw_value):
  20. """Try to parse raw_value: set the self.raw_text_query properties
  21. return True if raw_value has been parsed
  22. self.raw_text_query.autocomplete_list is also modified
  23. if self.enable_autocomplete is True
  24. """
  25. def _add_autocomplete(self, value):
  26. if value not in self.raw_text_query.autocomplete_list:
  27. self.raw_text_query.autocomplete_list.append(value)
  28. class TimeoutParser(QueryPartParser):
  29. @staticmethod
  30. def check(raw_value):
  31. return raw_value[0] == '<'
  32. def __call__(self, raw_value):
  33. value = raw_value[1:]
  34. found = self._parse(value) if len(value) > 0 else False
  35. if self.enable_autocomplete and not value:
  36. self._autocomplete()
  37. return found
  38. def _parse(self, value):
  39. if not value.isdigit():
  40. return False
  41. raw_timeout_limit = int(value)
  42. if raw_timeout_limit < 100:
  43. # below 100, the unit is the second ( <3 = 3 seconds timeout )
  44. self.raw_text_query.timeout_limit = float(raw_timeout_limit)
  45. else:
  46. # 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )
  47. self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0
  48. return True
  49. def _autocomplete(self):
  50. for suggestion in ['<3', '<850']:
  51. self._add_autocomplete(suggestion)
  52. class LanguageParser(QueryPartParser):
  53. @staticmethod
  54. def check(raw_value):
  55. return raw_value[0] == ':'
  56. def __call__(self, raw_value):
  57. value = raw_value[1:].lower().replace('_', '-')
  58. found = self._parse(value) if len(value) > 0 else False
  59. if self.enable_autocomplete and not found:
  60. self._autocomplete(value)
  61. return found
  62. def _parse(self, value):
  63. found = False
  64. # check if any language-code is equal with
  65. # declared language-codes
  66. for lc in language_codes:
  67. lang_id, lang_name, country, english_name = map(str.lower, lc)
  68. # if correct language-code is found
  69. # set it as new search-language
  70. if (value == lang_id
  71. or value == lang_name
  72. or value == english_name
  73. or value.replace('-', ' ') == country)\
  74. and value not in self.raw_text_query.languages:
  75. found = True
  76. lang_parts = lang_id.split('-')
  77. if len(lang_parts) == 2:
  78. self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
  79. else:
  80. self.raw_text_query.languages.append(lang_id)
  81. # to ensure best match (first match is not necessarily the best one)
  82. if value == lang_id:
  83. break
  84. # user may set a valid, yet not selectable language
  85. if VALID_LANGUAGE_CODE.match(value):
  86. lang_parts = value.split('-')
  87. if len(lang_parts) > 1:
  88. value = lang_parts[0].lower() + '-' + lang_parts[1].upper()
  89. if value not in self.raw_text_query.languages:
  90. self.raw_text_query.languages.append(value)
  91. found = True
  92. return found
  93. def _autocomplete(self, value):
  94. if not value:
  95. # show some example queries
  96. for lang in [":en", ":en_us", ":english", ":united_kingdom"]:
  97. self.raw_text_query.autocomplete_list.append(lang)
  98. return
  99. for lc in language_codes:
  100. lang_id, lang_name, country, english_name = map(str.lower, lc)
  101. # check if query starts with language-id
  102. if lang_id.startswith(value):
  103. if len(value) <= 2:
  104. self._add_autocomplete(':' + lang_id.split('-')[0])
  105. else:
  106. self._add_autocomplete(':' + lang_id)
  107. # check if query starts with language name
  108. if lang_name.startswith(value) or english_name.startswith(value):
  109. self._add_autocomplete(':' + lang_name)
  110. # check if query starts with country
  111. # here "new_zealand" is "new-zealand" (see __call__)
  112. if country.startswith(value.replace('-', ' ')):
  113. self._add_autocomplete(':' + country.replace(' ', '_'))
  114. class ExternalBangParser(QueryPartParser):
  115. @staticmethod
  116. def check(raw_value):
  117. return raw_value.startswith('!!')
  118. def __call__(self, raw_value):
  119. value = raw_value[2:]
  120. found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])
  121. if self.enable_autocomplete:
  122. self._autocomplete(bang_ac_list)
  123. return found
  124. def _parse(self, value):
  125. found = False
  126. bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)
  127. if bang_definition is not None:
  128. self.raw_text_query.external_bang = value
  129. found = True
  130. return found, bang_ac_list
  131. def _autocomplete(self, bang_ac_list):
  132. if not bang_ac_list:
  133. bang_ac_list = ['g', 'ddg', 'bing']
  134. for external_bang in bang_ac_list:
  135. self._add_autocomplete('!!' + external_bang)
  136. class BangParser(QueryPartParser):
  137. @staticmethod
  138. def check(raw_value):
  139. return raw_value[0] == '!' or raw_value[0] == '?'
  140. def __call__(self, raw_value):
  141. value = raw_value[1:].replace('-', ' ').replace('_', ' ')
  142. found = self._parse(value) if len(value) > 0 else False
  143. if found and raw_value[0] == '!':
  144. self.raw_text_query.specific = True
  145. if self.enable_autocomplete:
  146. self._autocomplete(raw_value[0], value)
  147. return found
  148. def _parse(self, value):
  149. # check if prefix is equal with engine shortcut
  150. if value in engine_shortcuts:
  151. value = engine_shortcuts[value]
  152. # check if prefix is equal with engine name
  153. if value in engines:
  154. self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))
  155. return True
  156. # check if prefix is equal with category name
  157. if value in categories:
  158. # using all engines for that search, which
  159. # are declared under that category name
  160. self.raw_text_query.enginerefs.extend(EngineRef(engine.name, value)
  161. for engine in categories[value]
  162. if (engine.name, value) not in self.raw_text_query.disabled_engines)
  163. return True
  164. return False
  165. def _autocomplete(self, first_char, value):
  166. if not value:
  167. # show some example queries
  168. for suggestion in ['images', 'wikipedia', 'osm']:
  169. if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:
  170. self._add_autocomplete(first_char + suggestion)
  171. return
  172. # check if query starts with category name
  173. for category in categories:
  174. if category.startswith(value):
  175. self._add_autocomplete(first_char + category)
  176. # check if query starts with engine name
  177. for engine in engines:
  178. if engine.startswith(value):
  179. self._add_autocomplete(first_char + engine.replace(' ', '_'))
  180. # check if query starts with engine shortcut
  181. for engine_shortcut in engine_shortcuts:
  182. if engine_shortcut.startswith(value):
  183. self._add_autocomplete(first_char + engine_shortcut)
  184. class RawTextQuery:
  185. """parse raw text query (the value from the html input)"""
  186. PARSER_CLASSES = [
  187. TimeoutParser, # this force the timeout
  188. LanguageParser, # this force a language
  189. ExternalBangParser, # external bang (must be before BangParser)
  190. BangParser # this force a engine or category
  191. ]
  192. def __init__(self, query, disabled_engines):
  193. assert isinstance(query, str)
  194. # input parameters
  195. self.query = query
  196. self.disabled_engines = disabled_engines if disabled_engines else []
  197. # parsed values
  198. self.enginerefs = []
  199. self.languages = []
  200. self.timeout_limit = None
  201. self.external_bang = None
  202. self.specific = False
  203. self.autocomplete_list = []
  204. # internal properties
  205. self.query_parts = [] # use self.getFullQuery()
  206. self.user_query_parts = [] # use self.getQuery()
  207. self.autocomplete_location = None
  208. self._parse_query()
  209. def _parse_query(self):
  210. """
  211. parse self.query, if tags are set, which
  212. change the search engine or search-language
  213. """
  214. # split query, including whitespaces
  215. raw_query_parts = re.split(r'(\s+)', self.query)
  216. last_index_location = None
  217. autocomplete_index = len(raw_query_parts) - 1
  218. for i, query_part in enumerate(raw_query_parts):
  219. # part does only contain spaces, skip
  220. if query_part.isspace()\
  221. or query_part == '':
  222. continue
  223. # parse special commands
  224. special_part = False
  225. for parser_class in RawTextQuery.PARSER_CLASSES:
  226. if parser_class.check(query_part):
  227. special_part = parser_class(self, i == autocomplete_index)(query_part)
  228. break
  229. # append query part to query_part list
  230. qlist = self.query_parts if special_part else self.user_query_parts
  231. qlist.append(query_part)
  232. last_index_location = (qlist, len(qlist) - 1)
  233. self.autocomplete_location = last_index_location
  234. def get_autocomplete_full_query(self, text):
  235. qlist, position = self.autocomplete_location
  236. qlist[position] = text
  237. return self.getFullQuery()
  238. def changeQuery(self, query):
  239. self.user_query_parts = query.strip().split()
  240. self.query = self.getFullQuery()
  241. self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)
  242. self.autocomplete_list = []
  243. return self
  244. def getQuery(self):
  245. return ' '.join(self.user_query_parts)
  246. def getFullQuery(self):
  247. """
  248. get full query including whitespaces
  249. """
  250. return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()
  251. def __str__(self):
  252. return self.getFullQuery()
  253. def __repr__(self):
  254. return f"<{self.__class__.__name__} " \
  255. + f"query={self.query!r} " \
  256. + f"disabled_engines={self.disabled_engines!r}\n " \
  257. + f"languages={self.languages!r} " \
  258. + f"timeout_limit={self.timeout_limit!r} "\
  259. + f"external_bang={self.external_bang!r} " \
  260. + f"specific={self.specific!r} " \
  261. + f"enginerefs={self.enginerefs!r}\n " \
  262. + f"autocomplete_list={self.autocomplete_list!r}\n " \
  263. + f"query_parts={self.query_parts!r}\n " \
  264. + f"user_query_parts={self.user_query_parts!r} >"