plugin_xapian.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. from contextlib import closing
  2. import fcntl
  3. import os.path
  4. import shutil
  5. import time
  6. import xapian
  7. """
  8. index and query mapi fields via python-xapian
  9. useful reading to understand the code:
  10. http://xapian.org/docs/omega/termprefixes.html
  11. http://xapian.org/docs/apidoc/html/classXapian_1_1TermGenerator.html
  12. things would be a lot simpler if we stored everything in 1 xapian database,
  13. but in the old situation we have one database per store which is nice.
  14. """
  15. class Plugin:
  16. def __init__(self, index_path, log):
  17. self.index_path = index_path
  18. self.log = log
  19. self.data = []
  20. self.deletes = []
  21. def open_db(self, server_guid, store_guid, writable=False, log=None):
  22. """ open xapian database; if locked, wait until unlocked """
  23. dbpath = os.path.join(self.index_path, '%s-%s' % (server_guid, store_guid))
  24. try:
  25. if writable:
  26. with open(os.path.join(dbpath+'.lock'), 'w') as lockfile: # avoid compaction using an external lock to be safe
  27. fcntl.flock(lockfile.fileno(), fcntl.LOCK_EX)
  28. while True:
  29. try:
  30. return xapian.WritableDatabase(dbpath, xapian.DB_CREATE_OR_OPEN)
  31. except xapian.DatabaseLockError:
  32. time.sleep(0.1)
  33. else:
  34. return xapian.Database(dbpath)
  35. except xapian.DatabaseOpeningError:
  36. if log:
  37. log.warn('could not open database: %s', dbpath)
  38. def extract_terms(self, text):
  39. """ extract terms as if we are indexing """
  40. doc = xapian.Document()
  41. tg = xapian.TermGenerator()
  42. tg.set_document(doc)
  43. text = text.replace('_', ' ') # xapian sees '_' as a word-character (to search for identifiers in source code)
  44. tg.index_text(text)
  45. return [t.term.decode('utf-8') for t in doc.termlist()]
  46. def search(self, server_guid, store_guid, folder_ids, fields_terms, query, log):
  47. """ handle query; see links in the top for a description of the Xapian API """
  48. db = self.open_db(server_guid, store_guid, log=log)
  49. if not db:
  50. return []
  51. qp = xapian.QueryParser()
  52. qp.add_prefix("sourcekey", "XK:")
  53. qp.add_prefix("folderid", "XF:")
  54. for fields, terms in fields_terms:
  55. for field in fields:
  56. qp.add_prefix('mapi%d' % field, "XM%d:" % field)
  57. log.info('performing query: %s', query)
  58. qp.set_database(db)
  59. query = qp.parse_query(query, xapian.QueryParser.FLAG_BOOLEAN|xapian.QueryParser.FLAG_PHRASE|xapian.QueryParser.FLAG_WILDCARD)
  60. enquire = xapian.Enquire(db)
  61. enquire.set_query(query)
  62. matches = []
  63. for match in enquire.get_mset(0, db.get_doccount()): # XXX catch exception if database is being updated?
  64. matches.append(match.document.get_value(0))
  65. db.close()
  66. return matches
  67. def suggest(self, server_guid, store_guid, terms, orig, log):
  68. """ update original search text with suggested terms """
  69. db = self.open_db(server_guid, store_guid, log=log)
  70. if not db:
  71. return orig
  72. with closing(db) as db:
  73. # XXX revisit later. looks like xapian cannot do this for us? :S
  74. for term in sorted(terms, key=lambda s: len(s), reverse=True):
  75. suggestion = db.get_spelling_suggestion(term).decode('utf8') or term
  76. orig = orig.replace(term, suggestion, 1)
  77. return orig
  78. def update(self, doc):
  79. """ new/changed document """
  80. self.data.append(doc)
  81. def delete(self, doc):
  82. """ deleted document """
  83. self.deletes.append(doc)
  84. def commit(self, suggestions):
  85. """ index pending documents; see links in the top for a description of the Xapian API """
  86. if not self.data and not self.deletes:
  87. return
  88. t0 = time.time()
  89. nitems = len(self.data)
  90. try:
  91. # XXX we assume here that all data is from the same store
  92. doc = (self.data or self.deletes)[0]
  93. with closing(self.open_db(doc['serverid'], doc['storeid'], writable=True, log=self.log)) as db:
  94. termgenerator = xapian.TermGenerator()
  95. termgenerator.set_database(db)
  96. flags = 0
  97. if suggestions:
  98. flags |= termgenerator.FLAG_SPELLING
  99. termgenerator.set_flags(flags)
  100. for doc in self.data:
  101. xdoc = xapian.Document()
  102. termgenerator.set_document(xdoc)
  103. for key, value in doc.items():
  104. if key.startswith('mapi'):
  105. value = value.replace('_', ' ') # xapian sees '_' as a word-character (to search for identifiers in source code)
  106. termgenerator.index_text_without_positions(value) # add to full-text, needed for spelling dict?
  107. termgenerator.index_text_without_positions(value, 1, 'XM%s:' % key[4:])
  108. xdoc.add_value(0, str(doc['docid']))
  109. sourcekey_term = 'XK:'+doc['sourcekey'].lower()
  110. xdoc.add_term(sourcekey_term)
  111. xdoc.add_term('XF:'+str(doc['folderid'])) #XXX
  112. xdoc.set_data(doc['data'])
  113. db.replace_document(sourcekey_term, xdoc)
  114. for doc in self.deletes:
  115. db.delete_document('XK:'+doc['sourcekey'].lower())
  116. self.log.debug('commit took %.2f seconds (%d items)', time.time()-t0, nitems)
  117. finally:
  118. self.data = []
  119. self.deletes = []
  120. def reindex(self, server_guid, store_guid):
  121. """ remove database so we can cleanly reindex the store """
  122. dbpath = os.path.join(self.index_path, '%s-%s' % (server_guid, store_guid))
  123. self.log.info('removing %s', dbpath)
  124. shutil.rmtree(dbpath, ignore_errors=True) # may not exist yet (no items to index)