external_bang.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. # pylint: disable=missing-module-docstring
  3. from urllib.parse import quote_plus, urlparse
  4. from searx.data import EXTERNAL_BANGS
  5. LEAF_KEY = chr(16)
  6. def get_node(external_bangs_db, bang):
  7. node = external_bangs_db['trie']
  8. after = ''
  9. before = ''
  10. for bang_letter in bang:
  11. after += bang_letter
  12. if after in node and isinstance(node, dict):
  13. node = node[after]
  14. before += after
  15. after = ''
  16. return node, before, after
  17. def get_bang_definition_and_ac(external_bangs_db, bang):
  18. node, before, after = get_node(external_bangs_db, bang)
  19. bang_definition = None
  20. bang_ac_list = []
  21. if after != '':
  22. for k in node:
  23. if k.startswith(after):
  24. bang_ac_list.append(before + k)
  25. elif isinstance(node, dict):
  26. bang_definition = node.get(LEAF_KEY)
  27. bang_ac_list = [before + k for k in node.keys() if k != LEAF_KEY]
  28. elif isinstance(node, str):
  29. bang_definition = node
  30. bang_ac_list = []
  31. return bang_definition, bang_ac_list
  32. def resolve_bang_definition(bang_definition, query):
  33. url, rank = bang_definition.split(chr(1))
  34. if url.startswith('//'):
  35. url = 'https:' + url
  36. if query:
  37. url = url.replace(chr(2), quote_plus(query))
  38. else:
  39. # go to main instead of search page
  40. o = urlparse(url)
  41. url = o.scheme + '://' + o.netloc
  42. rank = int(rank) if len(rank) > 0 else 0
  43. return (url, rank)
  44. def get_bang_definition_and_autocomplete(bang, external_bangs_db=None): # pylint: disable=invalid-name
  45. if external_bangs_db is None:
  46. external_bangs_db = EXTERNAL_BANGS
  47. bang_definition, bang_ac_list = get_bang_definition_and_ac(external_bangs_db, bang)
  48. new_autocomplete = []
  49. current = [*bang_ac_list]
  50. done = set()
  51. while len(current) > 0:
  52. bang_ac = current.pop(0)
  53. done.add(bang_ac)
  54. current_bang_definition, current_bang_ac_list = get_bang_definition_and_ac(external_bangs_db, bang_ac)
  55. if current_bang_definition:
  56. _, order = resolve_bang_definition(current_bang_definition, '')
  57. new_autocomplete.append((bang_ac, order))
  58. for new_bang in current_bang_ac_list:
  59. if new_bang not in done and new_bang not in current:
  60. current.append(new_bang)
  61. new_autocomplete.sort(key=lambda t: (-t[1], t[0]))
  62. new_autocomplete = list(map(lambda t: t[0], new_autocomplete))
  63. return bang_definition, new_autocomplete
  64. def get_bang_url(search_query, external_bangs_db=None):
  65. """
  66. Redirects if the user supplied a correct bang search.
  67. :param search_query: This is a search_query object which contains preferences and the submitted queries.
  68. :return: None if the bang was invalid, else a string of the redirect url.
  69. """
  70. ret_val = None
  71. if external_bangs_db is None:
  72. external_bangs_db = EXTERNAL_BANGS
  73. if search_query.external_bang:
  74. bang_definition, _ = get_bang_definition_and_ac(external_bangs_db, search_query.external_bang)
  75. if bang_definition and isinstance(bang_definition, str):
  76. ret_val = resolve_bang_definition(bang_definition, search_query.query)[0]
  77. return ret_val