__init__.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #!/usr/bin/env python
  2. from contextlib import closing
  3. import codecs
  4. import fcntl
  5. import os.path
  6. from multiprocessing import Queue, Value
  7. import time
  8. import sys
  9. # Upgrading from Python 2 to Python 3 is not supported
  10. try:
  11. import dbhash
  12. from Queue import Empty
  13. except ImportError:
  14. import dbm as dbhash
  15. from queue import Empty
  16. from kopano_search import plaintext
  17. import kopano
  18. from kopano import log_exc, Config
  19. sys.path.insert(0, os.path.dirname(__file__)) # XXX for __import__ to work
  20. """
  21. kopano-search v3 - a process-based indexer and query handler built on python-kopano
  22. initial indexing is performed in parallel using N instances of class IndexWorker, fed with a queue containing folder-ids.
  23. incremental indexing is later performed in the same fashion. when there is no pending work, we
  24. check if there are reindex requests and handle one of these (again parallellized). so incremental syncing is currently paused
  25. during reindexing.
  26. search queries from outlook/webapp are dealt with by a single instance of class SearchWorker.
  27. since ICS does not know for deletion changes which store they belong to, we remember ourselves using a berkeleydb file ("serverguid_mapping").
  28. we also maintain folder and server ICS states in a berkeleydb file, for example so we can resume initial indexing ("serverguid_state").
  29. after initial indexing folder states are not updated anymore.
  30. the used search engine is pluggable, but by default we use xapian (plugin_xapian.py).
  31. a tricky bit is that outlook/exchange perform 'prefix' searches by default and we want to be compatible with this, so terms get an
  32. implicit '*' at the end. not every search engine may perform well for this, but we could make this configurable perhaps.
  33. """
  34. CONFIG = {
  35. 'coredump_enabled': Config.ignore(),
  36. 'index_attachments': Config.boolean(default=True),
  37. 'index_attachment_extension_filter': Config.ignore(),
  38. 'index_attachment_mime_filter': Config.ignore(),
  39. 'index_attachment_max_size': Config.size(default=2**24),
  40. 'index_attachment_parser': Config.ignore(),
  41. 'index_attachment_parser_max_memory': Config.ignore(),
  42. 'index_attachment_parser_max_cputime': Config.ignore(),
  43. # 0x007D: PR_TRANSPORT_MESSAGE_HEADERS
  44. # 0x0064: PR_SENT_REPRESENTING_ADDRTYPE
  45. # 0x0C1E: PR_SENDER_ADDRTYPE
  46. # 0x0075: PR_RECEIVED_BY_ADDRTYPE
  47. # 0x678E: PR_EC_IMAP_BODY
  48. # 0x678F: PR_EC_IMAP_BODYSTRUCTURE
  49. # 0x001A: PR_MESSAGE_CLASS # XXX add to cfg
  50. 'index_exclude_properties': Config.integer(multiple=True, base=16, default=[0x007D, 0x0064, 0x0C1E, 0x0075, 0x678E, 0x678F, 0x001A]),
  51. 'index_path': Config.string(default='/var/lib/kopano/search/'),
  52. 'index_processes': Config.integer(default=1),
  53. 'limit_results': Config.integer(default=0),
  54. 'optimize_age': Config.ignore(),
  55. 'optimize_start': Config.ignore(),
  56. 'optimize_stop': Config.ignore(),
  57. 'run_as_user': Config.string(default="kopano"),
  58. 'run_as_group': Config.string(default="kopano"),
  59. 'search_engine': Config.string(default='xapian'),
  60. 'suggestions': Config.boolean(default=True),
  61. 'index_junk': Config.boolean(default=True),
  62. 'server_bind_name': Config.string(default='file:///var/run/kopano/search.sock'),
  63. 'ssl_private_key_file': Config.path(default=None, check=False), # XXX don't check when default=None?
  64. 'ssl_certificate_file': Config.path(default=None, check=False),
  65. 'term_cache_size': Config.size(default=64000000),
  66. }
  67. def db_get(db_path, key):
  68. """ get value from db file """
  69. with closing(dbhash.open(db_path, 'c')) as db:
  70. return db.get(key)
  71. def db_put(db_path, key, value):
  72. """ store key, value in db file """
  73. with open(db_path+'.lock', 'w') as lockfile:
  74. fcntl.flock(lockfile.fileno(), fcntl.LOCK_EX)
  75. with closing(dbhash.open(db_path, 'c')) as db:
  76. db[key] = value
  77. if sys.hexversion >= 0x03000000:
  78. def _is_unicode(s):
  79. return isinstance(s, str)
  80. def _decode(s):
  81. return s
  82. def _encode(s):
  83. return s.encode()
  84. else:
  85. def _is_unicode(s):
  86. return isinstance(s, unicode)
  87. def _decode(s):
  88. return s.decode(getattr(sys.stdin, 'encoding', 'utf8') or 'utf8')
  89. def _encode(s):
  90. return s.encode(getattr(sys.stdin, 'encoding', 'utf8') or 'utf8')
  91. class SearchWorker(kopano.Worker):
  92. """ process which handles search requests coming from outlook/webapp, according to our internal protocol """
  93. def main(self):
  94. config, plugin = self.service.config, self.service.plugin
  95. def response(conn, msg):
  96. self.log.info('Response: %s', msg)
  97. conn.sendall(_encode(msg) + _encode('\r\n'))
  98. s = kopano.server_socket(config['server_bind_name'], ssl_key=config['ssl_private_key_file'], ssl_cert=config['ssl_certificate_file'], log=self.log)
  99. while True:
  100. with log_exc(self.log):
  101. conn, _ = s.accept()
  102. fields_terms = []
  103. for data in conn.makefile():
  104. data = _decode(data.strip())
  105. self.log.info('Command: %s', data)
  106. cmd, args = data.split()[0], data.split()[1:]
  107. if cmd == 'PROPS':
  108. response(conn, 'OK:'+' '.join(map(str, config['index_exclude_properties'])))
  109. break
  110. if cmd == 'SYNCRUN': # wait for syncing to be up-to-date (only used in tests)
  111. self.syncrun.value = time.time()
  112. while self.syncrun.value:
  113. time.sleep(1)
  114. response(conn, 'OK:')
  115. break
  116. elif cmd == 'SCOPE':
  117. server_guid, store_guid, folder_ids = args[0], args[1], args[2:]
  118. response(conn, 'OK:')
  119. elif cmd == 'FIND':
  120. pos = data.find(':')
  121. fields = map(int, data[:pos].split()[1:])
  122. orig = data[pos+1:].lower()
  123. # Limit number of terms (32) so people do not
  124. # inadvertently DoS it if they paste prose.
  125. terms = plugin.extract_terms(orig)[:32]
  126. if fields and terms:
  127. fields_terms.append((fields, terms))
  128. response(conn, 'OK:')
  129. elif cmd == 'SUGGEST':
  130. suggestion = u''
  131. if config['suggestions'] and len(fields_terms) == 1:
  132. for fields, terms in fields_terms:
  133. suggestion = plugin.suggest(server_guid, store_guid, terms, orig, self.log)
  134. if suggestion == orig:
  135. suggestion = u''
  136. response(conn, 'OK: '+suggestion)
  137. elif cmd == 'QUERY':
  138. t0 = time.time()
  139. restrictions = []
  140. if folder_ids:
  141. restrictions.append('('+' OR '.join(['folderid:%s' % f for f in folder_ids])+')')
  142. for fields, terms in fields_terms:
  143. if fields:
  144. restrictions.append('('+' OR '.join('('+' AND '.join('mapi%d:%s*' % (f, term) for term in terms)+')' for f in fields)+')')
  145. else:
  146. restrictions.append('('+' AND '.join('%s*' % term for term in terms)+')')
  147. query = ' AND '.join(restrictions) # plugin doesn't have to use this relatively standard query format
  148. docids = plugin.search(server_guid, store_guid, folder_ids, fields_terms, query, self.log)
  149. docids = docids[:config['limit_results'] or len(docids)]
  150. response(conn, 'OK: '+' '.join(map(str, docids)))
  151. self.log.info('found %d results in %.2f seconds', len(docids), time.time()-t0)
  152. break
  153. elif cmd == 'REINDEX':
  154. self.reindex_queue.put(args[0])
  155. response(conn, 'OK:')
  156. self.log.info("queued store %s for reindexing", args[0])
  157. break
  158. conn.close()
  159. class IndexWorker(kopano.Worker):
  160. """ process which gets folders from input queue and indexes them, putting the nr of changes in output queue """
  161. def main(self):
  162. config, server, plugin = self.service.config, self.service.server, self.service.plugin
  163. state_db = os.path.join(config['index_path'], server.guid+'_state')
  164. while True:
  165. changes = 0
  166. with log_exc(self.log):
  167. (_, storeguid, folderid, reindex) = self.iqueue.get()
  168. store = server.store(storeguid)
  169. folder = kopano.Folder(store, folderid)
  170. if (folder not in (store.root, store.outbox, store.drafts)) and \
  171. (folder != store.junk or config['index_junk']):
  172. suggestions = config['suggestions'] and folder != store.junk
  173. self.log.info('syncing folder: %s %s', storeguid, folder.name)
  174. importer = FolderImporter(server.guid, config, plugin, suggestions, self.log)
  175. state = db_get(state_db, folder.entryid) if not reindex else None
  176. if state:
  177. self.log.info('found previous folder sync state: %s', state)
  178. t0 = time.time()
  179. new_state = folder.sync(importer, state, log=self.log)
  180. if new_state != state:
  181. plugin.commit(suggestions)
  182. db_put(state_db, folder.entryid, new_state)
  183. self.log.info('saved folder sync state: %s', new_state)
  184. changes = importer.changes + importer.deletes
  185. self.log.info('syncing folder %s %s took %.2f seconds (%d changes, %d attachments)', storeguid, folder.name, time.time()-t0, changes, importer.attachments)
  186. self.oqueue.put(changes)
  187. class FolderImporter:
  188. """ tracks changes for a given folder """
  189. def __init__(self, *args):
  190. self.serverid, self.config, self.plugin, self.suggestions, self.log = args
  191. self.changes = self.deletes = self.attachments = 0
  192. self.mapping_db = os.path.join(self.config['index_path'], self.serverid+'_mapping')
  193. self.excludes = set(self.config['index_exclude_properties']+[0x1000, 0x1009, 0x1013, 0x678C]) # PR_BODY, PR_RTF_COMPRESSED, PR_HTML, PR_EC_IMAP_EMAIL
  194. self.term_cache_size = 0
  195. def update(self, item, flags):
  196. """ called for a new or changed item; get mapi properties, attachments and pass to indexing plugin """
  197. with log_exc(self.log):
  198. self.changes += 1
  199. storeid, folderid, entryid, sourcekey, docid = item.storeid, item.folderid, item.entryid, item.sourcekey, item.docid
  200. self.log.debug('store %s, folder %d: new/updated document with entryid %s, sourcekey %s, docid %d', storeid, folderid, entryid, sourcekey, docid)
  201. doc = {'serverid': self.serverid, 'storeid': storeid, 'folderid': folderid, 'docid': docid, 'sourcekey': item.sourcekey}
  202. for prop in item.props():
  203. if prop.id_ not in self.excludes:
  204. if _is_unicode(prop.value):
  205. if prop.value:
  206. doc['mapi%d' % prop.id_] = prop.value
  207. elif isinstance(prop.value, list):
  208. doc['mapi%d' % prop.id_] = u' '.join(x for x in prop.value if _is_unicode(x))
  209. attach_text = []
  210. if self.config['index_attachments']:
  211. for a in item.attachments():
  212. self.log.debug('checking attachment (filename=%s, size=%d, mimetag=%s)', a.filename, len(a), a.mimetype)
  213. if 0 < len(a) < self.config['index_attachment_max_size'] and a.filename != 'inline.txt': # XXX inline attachment check
  214. self.attachments += 1
  215. attach_text.append(plaintext.get(a, mimetype=a.mimetype, log=self.log))
  216. attach_text.append(u' '+(a.filename or u''))
  217. doc['mapi4096'] = item.body.text + u' ' + u' '.join(attach_text) # PR_BODY
  218. doc['mapi3098'] = u' '.join([item.sender.name, item.sender.email, item.from_.name, item.from_.email]) # PR_SENDER_NAME
  219. doc['mapi3588'] = u' '.join([a.name + u' ' + a.email for a in item.to]) # PR_DISPLAY_TO
  220. doc['mapi3587'] = u' '.join([a.name + u' ' + a.email for a in item.cc]) # PR_DISPLAY_CC
  221. doc['mapi3586'] = u' '.join([a.name + u' ' + a.email for a in item.bcc]) # PR_DISPLAY_BCC
  222. doc['data'] = 'subject: %s\n' % item.subject
  223. db_put(self.mapping_db, item.sourcekey, '%s %s' % (storeid, item.folder.entryid)) # ICS doesn't remember which store a change belongs to..
  224. self.plugin.update(doc)
  225. self.term_cache_size += sum(len(v) for k, v in doc.items() if k.startswith('mapi'))
  226. if (8*self.term_cache_size) > self.config['term_cache_size']:
  227. self.plugin.commit(self.suggestions)
  228. self.term_cache_size = 0
  229. def delete(self, item, flags):
  230. """ for a deleted item, determine store and ask indexing plugin to delete """
  231. with log_exc(self.log):
  232. self.deletes += 1
  233. ids = db_get(self.mapping_db, item.sourcekey)
  234. if ids: # when a 'new' item is deleted right away (spooler?), the 'update' function may not have been called
  235. storeid, folderid = ids.split()
  236. doc = {'serverid': self.serverid, 'storeid': storeid, 'sourcekey': item.sourcekey}
  237. self.log.debug('store %s: deleted document with sourcekey %s', doc['storeid'], item.sourcekey)
  238. self.plugin.delete(doc)
  239. class ServerImporter:
  240. """ tracks changes for a server node; queues encountered folders for updating """ # XXX improve ICS to track changed folders?
  241. def __init__(self, serverid, config, iqueue, log):
  242. self.mapping_db = os.path.join(config['index_path'], serverid+'_mapping')
  243. self.iqueue = iqueue
  244. self.queued = set() # sync each folder at most once
  245. def update(self, item, flags):
  246. self.queue((0, item.storeid, item.folder.entryid))
  247. def delete(self, item, flags):
  248. ids = db_get(self.mapping_db, item.sourcekey)
  249. if ids:
  250. self.queue((0,) + tuple(ids.split()))
  251. def queue(self, folder):
  252. if folder not in self.queued:
  253. self.iqueue.put(folder+(False,))
  254. self.queued.add(folder)
  255. class Service(kopano.Service):
  256. """ main search process """
  257. def main(self):
  258. """ start initial syncing if no state found. then start query process and switch to incremental syncing """
  259. self.reindex_queue = Queue()
  260. index_path = self.config['index_path']
  261. os.umask(0o77)
  262. if not os.path.exists(index_path):
  263. os.makedirs(index_path)
  264. self.state_db = os.path.join(index_path, self.server.guid+'_state')
  265. self.plugin = __import__('plugin_%s' % self.config['search_engine']).Plugin(index_path, self.log)
  266. self.iqueue, self.oqueue = Queue(), Queue()
  267. self.index_processes = self.config['index_processes']
  268. workers = [IndexWorker(self, 'index%d'%i, nr=i, iqueue=self.iqueue, oqueue=self.oqueue) for i in range(self.index_processes)]
  269. for worker in workers:
  270. worker.start()
  271. self.state = db_get(self.state_db, 'SERVER')
  272. if self.state:
  273. self.log.info('found previous server sync state: %s', self.state)
  274. else:
  275. self.log.info('starting initial sync')
  276. new_state = self.server.state # syncing will reach at least the current state
  277. self.initial_sync(self.server.stores())
  278. self.state = new_state
  279. db_put(self.state_db, 'SERVER', self.state)
  280. self.log.info('saved server sync state = %s', self.state)
  281. self.syncrun = Value('d', 0)
  282. SearchWorker(self, 'query', reindex_queue=self.reindex_queue, syncrun=self.syncrun).start()
  283. self.log.info('starting incremental sync')
  284. self.incremental_sync()
  285. def initial_sync(self, stores, reindex=False):
  286. """ queue all folders for given stores """
  287. folders = [(f.count, s.guid, f.entryid, reindex) for s in stores for f in s.folders()]
  288. for f in sorted(folders, reverse=True):
  289. self.iqueue.put(f)
  290. itemcount = sum(f[0] for f in folders)
  291. self.log.info('queued %d folders (~%d changes) for parallel indexing (%s processes)', len(folders), itemcount, self.index_processes)
  292. t0 = time.time()
  293. changes = sum([self.oqueue.get() for i in range(len(folders))]) # blocking
  294. self.log.info('queue processed in %.2f seconds (%d changes, ~%.2f/sec)', time.time()-t0, changes, changes/(time.time()-t0))
  295. def incremental_sync(self):
  296. """ process changes in real-time (not yet parallelized); if no pending changes handle reindex requests """
  297. while True:
  298. with log_exc(self.log):
  299. try:
  300. storeid = self.reindex_queue.get(block=False)
  301. self.log.info('handling reindex request for store %s', storeid)
  302. store = self.server.store(storeid)
  303. self.plugin.reindex(self.server.guid, store.guid)
  304. self.initial_sync([store], reindex=True)
  305. except Empty:
  306. pass
  307. importer = ServerImporter(self.server.guid, self.config, self.iqueue, self.log)
  308. t0 = time.time()
  309. new_state = self.server.sync(importer, self.state, log=self.log)
  310. if new_state != self.state:
  311. changes = sum([self.oqueue.get() for i in range(len(importer.queued))]) # blocking
  312. for f in importer.queued:
  313. self.iqueue.put(f+(False,)) # make sure folders are at least synced to new_state
  314. changes += sum([self.oqueue.get() for i in range(len(importer.queued))]) # blocking
  315. self.log.info('queue processed in %.2f seconds (%d changes, ~%.2f/sec)', time.time()-t0, changes, changes/(time.time()-t0))
  316. self.state = new_state
  317. db_put(self.state_db, 'SERVER', self.state)
  318. self.log.info('saved server sync state = %s', self.state)
  319. if t0 > self.syncrun.value+1:
  320. self.syncrun.value = 0
  321. time.sleep(5)
  322. def reindex(self):
  323. """ pass usernames/store-ids given on command-line to running search process """
  324. for key in self.options.reindex: # usernames supported for convenience/backward compatibility
  325. store = self.server.get_store(key)
  326. if not store:
  327. user = self.server.get_user(key)
  328. if user:
  329. store = user.store
  330. if store: # XXX check all keys first
  331. with closing(kopano.client_socket(self.config['server_bind_name'], ssl_cert=self.config['ssl_certificate_file'])) as s:
  332. s.sendall('REINDEX %s\r\n' % store.guid)
  333. s.recv(1024)
  334. else:
  335. print("no such user/store: %s" % key)
  336. sys.exit(1)
  337. def main():
  338. parser = kopano.parser('ckpsFl') # select common cmd-line options
  339. parser.add_option('-r', '--reindex', dest='reindex', action='append', default=[], help='Reindex user/store', metavar='USER')
  340. options, args = parser.parse_args()
  341. service = Service('search', config=CONFIG, options=options)
  342. if options.reindex:
  343. service.reindex()
  344. else:
  345. service.start()
  346. if __name__ == '__main__':
  347. main()