123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- # SPDX-License-Identifier: AGPL-3.0-or-later
- # pylint: disable=invalid-name, missing-module-docstring, missing-class-docstring
- from __future__ import annotations
- from abc import abstractmethod, ABC
- import re
- from searx import settings
- from searx.sxng_locales import sxng_locales
- from searx.engines import categories, engines, engine_shortcuts
- from searx.external_bang import get_bang_definition_and_autocomplete
- from searx.search import EngineRef
- from searx.webutils import VALID_LANGUAGE_CODE
- class QueryPartParser(ABC):
- __slots__ = "raw_text_query", "enable_autocomplete"
- @staticmethod
- @abstractmethod
- def check(raw_value):
- """Check if raw_value can be parsed"""
- def __init__(self, raw_text_query, enable_autocomplete):
- self.raw_text_query = raw_text_query
- self.enable_autocomplete = enable_autocomplete
- @abstractmethod
- def __call__(self, raw_value):
- """Try to parse raw_value: set the self.raw_text_query properties
- return True if raw_value has been parsed
- self.raw_text_query.autocomplete_list is also modified
- if self.enable_autocomplete is True
- """
- def _add_autocomplete(self, value):
- if value not in self.raw_text_query.autocomplete_list:
- self.raw_text_query.autocomplete_list.append(value)
- class TimeoutParser(QueryPartParser):
- @staticmethod
- def check(raw_value):
- return raw_value[0] == '<'
- def __call__(self, raw_value):
- value = raw_value[1:]
- found = self._parse(value) if len(value) > 0 else False
- if self.enable_autocomplete and not value:
- self._autocomplete()
- return found
- def _parse(self, value):
- if not value.isdigit():
- return False
- raw_timeout_limit = int(value)
- if raw_timeout_limit < 100:
- # below 100, the unit is the second ( <3 = 3 seconds timeout )
- self.raw_text_query.timeout_limit = float(raw_timeout_limit)
- else:
- # 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )
- self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0
- return True
- def _autocomplete(self):
- for suggestion in ['<3', '<850']:
- self._add_autocomplete(suggestion)
- class LanguageParser(QueryPartParser):
- @staticmethod
- def check(raw_value):
- return raw_value[0] == ':'
- def __call__(self, raw_value):
- value = raw_value[1:].lower().replace('_', '-')
- found = self._parse(value) if len(value) > 0 else False
- if self.enable_autocomplete and not found:
- self._autocomplete(value)
- return found
- def _parse(self, value):
- found = False
- # check if any language-code is equal with
- # declared language-codes
- for lc in sxng_locales:
- lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
- # if correct language-code is found
- # set it as new search-language
- if (
- value == lang_id or value == lang_name or value == english_name or value.replace('-', ' ') == country
- ) and value not in self.raw_text_query.languages:
- found = True
- lang_parts = lang_id.split('-')
- if len(lang_parts) == 2:
- self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
- else:
- self.raw_text_query.languages.append(lang_id)
- # to ensure best match (first match is not necessarily the best one)
- if value == lang_id:
- break
- # user may set a valid, yet not selectable language
- if VALID_LANGUAGE_CODE.match(value) or value == 'auto':
- lang_parts = value.split('-')
- if len(lang_parts) > 1:
- value = lang_parts[0].lower() + '-' + lang_parts[1].upper()
- if value not in self.raw_text_query.languages:
- self.raw_text_query.languages.append(value)
- found = True
- return found
- def _autocomplete(self, value):
- if not value:
- # show some example queries
- if len(settings['search']['languages']) < 10:
- for lang in settings['search']['languages']:
- self.raw_text_query.autocomplete_list.append(':' + lang)
- else:
- for lang in [":en", ":en_us", ":english", ":united_kingdom"]:
- self.raw_text_query.autocomplete_list.append(lang)
- return
- for lc in sxng_locales:
- if lc[0] not in settings['search']['languages']:
- continue
- lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
- # check if query starts with language-id
- if lang_id.startswith(value):
- if len(value) <= 2:
- self._add_autocomplete(':' + lang_id.split('-')[0])
- else:
- self._add_autocomplete(':' + lang_id)
- # check if query starts with language name
- if lang_name.startswith(value) or english_name.startswith(value):
- self._add_autocomplete(':' + lang_name)
- # check if query starts with country
- # here "new_zealand" is "new-zealand" (see __call__)
- if country.startswith(value.replace('-', ' ')):
- self._add_autocomplete(':' + country.replace(' ', '_'))
- class ExternalBangParser(QueryPartParser):
- @staticmethod
- def check(raw_value):
- return raw_value.startswith('!!') and len(raw_value) > 2
- def __call__(self, raw_value):
- value = raw_value[2:]
- found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])
- if self.enable_autocomplete:
- self._autocomplete(bang_ac_list)
- return found
- def _parse(self, value):
- found = False
- bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)
- if bang_definition is not None:
- self.raw_text_query.external_bang = value
- found = True
- return found, bang_ac_list
- def _autocomplete(self, bang_ac_list):
- if not bang_ac_list:
- bang_ac_list = ['g', 'ddg', 'bing']
- for external_bang in bang_ac_list:
- self._add_autocomplete('!!' + external_bang)
- class BangParser(QueryPartParser):
- @staticmethod
- def check(raw_value):
- # make sure it's not any bang with double '!!'
- return raw_value[0] == '!' and (len(raw_value) < 2 or raw_value[1] != '!')
- def __call__(self, raw_value):
- value = raw_value[1:].replace('-', ' ').replace('_', ' ')
- found = self._parse(value) if len(value) > 0 else False
- if found and raw_value[0] == '!':
- self.raw_text_query.specific = True
- if self.enable_autocomplete:
- self._autocomplete(raw_value[0], value)
- return found
- def _parse(self, value):
- # check if prefix is equal with engine shortcut
- if value in engine_shortcuts: # pylint: disable=consider-using-get
- value = engine_shortcuts[value]
- # check if prefix is equal with engine name
- if value in engines:
- self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))
- return True
- # check if prefix is equal with category name
- if value in categories:
- # using all engines for that search, which
- # are declared under that category name
- self.raw_text_query.enginerefs.extend(
- EngineRef(engine.name, value)
- for engine in categories[value]
- if (engine.name, value) not in self.raw_text_query.disabled_engines
- )
- return True
- return False
- def _autocomplete(self, first_char, value):
- if not value:
- # show some example queries
- for suggestion in ['images', 'wikipedia', 'osm']:
- if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:
- self._add_autocomplete(first_char + suggestion)
- return
- # check if query starts with category name
- for category in categories:
- if category.startswith(value):
- self._add_autocomplete(first_char + category.replace(' ', '_'))
- # check if query starts with engine name
- for engine in engines:
- if engine.startswith(value):
- self._add_autocomplete(first_char + engine.replace(' ', '_'))
- # check if query starts with engine shortcut
- for engine_shortcut in engine_shortcuts:
- if engine_shortcut.startswith(value):
- self._add_autocomplete(first_char + engine_shortcut)
- class FeelingLuckyParser(QueryPartParser):
- @staticmethod
- def check(raw_value):
- return raw_value == '!!'
- def __call__(self, raw_value):
- self.raw_text_query.redirect_to_first_result = True
- return True
- class RawTextQuery:
- """parse raw text query (the value from the html input)"""
- PARSER_CLASSES = [
- TimeoutParser, # force the timeout
- LanguageParser, # force a language
- ExternalBangParser, # external bang (must be before BangParser)
- BangParser, # force an engine or category
- FeelingLuckyParser, # redirect to the first link in the results list
- ]
- def __init__(self, query: str, disabled_engines: list):
- assert isinstance(query, str)
- # input parameters
- self.query = query
- self.disabled_engines = disabled_engines if disabled_engines else []
- # parsed values
- self.enginerefs = []
- self.languages = []
- self.timeout_limit = None
- self.external_bang = None
- self.specific = False
- self.autocomplete_list = []
- # internal properties
- self.query_parts = [] # use self.getFullQuery()
- self.user_query_parts = [] # use self.getQuery()
- self.autocomplete_location = None
- self.redirect_to_first_result = False
- self._parse_query()
- def _parse_query(self):
- """
- parse self.query, if tags are set, which
- change the search engine or search-language
- """
- # split query, including whitespaces
- raw_query_parts = re.split(r'(\s+)', self.query)
- last_index_location = None
- autocomplete_index = len(raw_query_parts) - 1
- for i, query_part in enumerate(raw_query_parts):
- # part does only contain spaces, skip
- if query_part.isspace() or query_part == '':
- continue
- # parse special commands
- special_part = False
- for parser_class in RawTextQuery.PARSER_CLASSES:
- if parser_class.check(query_part):
- special_part = parser_class(self, i == autocomplete_index)(query_part)
- break
- # append query part to query_part list
- qlist = self.query_parts if special_part else self.user_query_parts
- qlist.append(query_part)
- last_index_location = (qlist, len(qlist) - 1)
- self.autocomplete_location = last_index_location
- def get_autocomplete_full_query(self, text):
- qlist, position = self.autocomplete_location
- qlist[position] = text
- return self.getFullQuery()
- def changeQuery(self, query):
- self.user_query_parts = query.strip().split()
- self.query = self.getFullQuery()
- self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)
- self.autocomplete_list = []
- return self
- def getQuery(self):
- return ' '.join(self.user_query_parts)
- def getFullQuery(self):
- """
- get full query including whitespaces
- """
- return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()
- def __str__(self):
- return self.getFullQuery()
- def __repr__(self):
- return (
- f"<{self.__class__.__name__} "
- + f"query={self.query!r} "
- + f"disabled_engines={self.disabled_engines!r}\n "
- + f"languages={self.languages!r} "
- + f"timeout_limit={self.timeout_limit!r} "
- + f"external_bang={self.external_bang!r} "
- + f"specific={self.specific!r} "
- + f"enginerefs={self.enginerefs!r}\n "
- + f"autocomplete_list={self.autocomplete_list!r}\n "
- + f"query_parts={self.query_parts!r}\n "
- + f"user_query_parts={self.user_query_parts!r} >\n"
- + f"redirect_to_first_result={self.redirect_to_first_result!r}"
- )
|