processor.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. # Copyright 2015 The Distro Tracker Developers
  2. # See the COPYRIGHT file at the top-level directory of this distribution and
  3. # at http://deb.li/DTAuthors
  4. #
  5. # This file is part of Distro Tracker. It is subject to the license terms
  6. # in the LICENSE file found in the top-level directory of this
  7. # distribution and at http://deb.li/DTLicense. No part of Distro Tracker,
  8. # including this file, may be copied, modified, propagated, or distributed
  9. # except according to the terms contained in the LICENSE file.
  10. """
  11. Module implementing the processing of incoming email messages.
  12. """
  13. from __future__ import unicode_literals
  14. import asyncore
  15. from datetime import timedelta
  16. import email
  17. from itertools import chain
  18. from multiprocessing import Pool
  19. import os
  20. from django.conf import settings
  21. import pyinotify
  22. import logging
  23. from distro_tracker.core.utils import message_from_bytes
  24. import distro_tracker.mail.control
  25. import distro_tracker.mail.dispatch
  26. logger = logging.getLogger(__name__)
  27. class MailProcessorException(Exception):
  28. pass
  29. class ConflictingDeliveryAddresses(MailProcessorException):
  30. """
  31. The message contained multiple headers with possible delivery addresses
  32. for the domain defined in settings.DISTRO_TRACKER_FQDN.
  33. """
  34. pass
  35. class MissingDeliveryAddress(MailProcessorException):
  36. """
  37. The message contained no header with a delivery address for the domain
  38. defined in settings.DISTRO_TRACKER_FQDN.
  39. """
  40. pass
  41. class InvalidDeliveryAddress(MailProcessorException):
  42. """
  43. The message contained a delivery address for the domain defined in
  44. settings.DISTRO_TRACKER_FQDN but it did not match any known Distro Tracker
  45. service.
  46. """
  47. pass
  48. class MailProcessor(object):
  49. """
  50. Takes an incoming email and do something useful out of it.
  51. To this end, it must find out where the email was sent
  52. and adjust the processing depending on the role of
  53. the target address.
  54. """
  55. def __init__(self, message_or_filename):
  56. if isinstance(message_or_filename, email.message.Message):
  57. self.message = message_or_filename
  58. else:
  59. self.load_mail_from_file(message_or_filename)
  60. def load_mail_from_file(self, filename):
  61. """
  62. Load the mail to process from a file.
  63. :param str filename: Path of the file to parse as mail.
  64. """
  65. with open(filename, 'rb') as f:
  66. self.message = message_from_bytes(f.read())
  67. @staticmethod
  68. def find_delivery_address(message):
  69. """
  70. Identify the email address the message was delivered to.
  71. The message headers Delivered-To, Envelope-To, X-Original-To, and
  72. X-Envelope-To are scanned to find out an email that matches the FQDN of
  73. the Distro Tracker setup.
  74. """
  75. addresses = []
  76. for field in chain(message.get_all('Delivered-To', []),
  77. message.get_all('Envelope-To', []),
  78. message.get_all('X-Original-To', []),
  79. message.get_all('X-Envelope-To', [])):
  80. if field.endswith('@' + settings.DISTRO_TRACKER_FQDN):
  81. if field not in addresses:
  82. addresses.append(field)
  83. if len(addresses) > 1:
  84. raise ConflictingDeliveryAddresses()
  85. elif len(addresses) == 1:
  86. return addresses[0]
  87. @staticmethod
  88. def identify_service(address):
  89. """
  90. Identify service associated to target email and extract optional args.
  91. The address has the generic form <service>+<details>@<fqdn>.
  92. """
  93. local_part = address.split('@', 1)[0]
  94. if '+' in local_part:
  95. return local_part.split('+', 1)
  96. else:
  97. return (local_part, None)
  98. @staticmethod
  99. def do_nothing(self):
  100. """Just used by unit tests to disable process()"""
  101. def process(self):
  102. """
  103. Process the message stored in self.message.
  104. Find out the delivery address and identify the associated service.
  105. Then defer to handle_*() for service-specific processing. Can raise
  106. MissingDeliveryAddress and UnknownService
  107. """
  108. addr = self.find_delivery_address(self.message)
  109. if addr is None:
  110. raise MissingDeliveryAddress()
  111. service, details = self.identify_service(addr)
  112. if service == 'dispatch':
  113. package, keyword = (details, None)
  114. if details and '_' in details:
  115. package, keyword = details.split('_', 1)
  116. self.handle_dispatch(package, keyword)
  117. elif service == 'bounces':
  118. self.handle_bounces(details)
  119. elif service == 'control':
  120. self.handle_control()
  121. elif settings.DISTRO_TRACKER_ACCEPT_UNQUALIFIED_EMAILS:
  122. package, keyword = (addr.split('@', 1)[0], None)
  123. if package and '_' in package:
  124. package, keyword = package.split('_', 1)
  125. self.handle_dispatch(package, keyword)
  126. else:
  127. raise InvalidDeliveryAddress(
  128. '{} is not a valid Distro Tracker address'.format(addr))
  129. @staticmethod
  130. def build_delivery_address(service, details):
  131. local_part = service
  132. if details:
  133. local_part += '+' + details
  134. return '{}@{}'.format(local_part, settings.DISTRO_TRACKER_FQDN)
  135. def handle_control(self):
  136. distro_tracker.mail.control.process(self.message)
  137. def handle_bounces(self, details):
  138. sent_to_addr = self.build_delivery_address('bounces', details)
  139. distro_tracker.mail.dispatch.handle_bounces(sent_to_addr)
  140. def handle_dispatch(self, package=None, keyword=None):
  141. distro_tracker.mail.dispatch.process(self.message, package=package,
  142. keyword=keyword)
  143. def run_mail_processor(mail_path, log_failure=False):
  144. try:
  145. processor = MailProcessor(mail_path)
  146. processor.process()
  147. except Exception:
  148. if log_failure:
  149. logger.exception("Failed to process incoming mail %s", mail_path)
  150. raise
  151. class MailQueue(object):
  152. """
  153. A queue of mails to process. The mails are identified by their filename
  154. within `DISTRO_TRACKER_MAILDIR_DIRECTORY`.
  155. """
  156. #: The maximum number of sub-process used to process the mail queue
  157. MAX_WORKERS = 4
  158. SLEEP_TIMEOUT_EMPTY = 30.0
  159. SLEEP_TIMEOUT_TASK_RUNNING = 0.010
  160. SLEEP_TIMEOUT_TASK_FINISHED = 0.0
  161. SLEEP_TIMEOUT_TASK_RUNNABLE = 0.0
  162. def __init__(self):
  163. self.queue = []
  164. self.entries = {}
  165. self.processed_count = 0
  166. def add(self, identifier):
  167. """
  168. Add a new mail in the queue.
  169. :param str identifiername: Filename identifying the mail.
  170. """
  171. if identifier in self.entries:
  172. return
  173. entry = MailQueueEntry(self, identifier)
  174. self.queue.append(entry)
  175. self.entries[identifier] = entry
  176. return entry
  177. def remove(self, identifier):
  178. """
  179. Remove a mail from the queue. This does not unlink the file.
  180. :param str identifier: Filename identifying the mail.
  181. """
  182. if identifier not in self.entries:
  183. return
  184. self.queue.remove(self.entries[identifier])
  185. self.entries.pop(identifier)
  186. self.processed_count += 1
  187. @staticmethod
  188. def _get_maildir(subfolder=None):
  189. if subfolder:
  190. return os.path.join(settings.DISTRO_TRACKER_MAILDIR_DIRECTORY,
  191. subfolder, 'new')
  192. return os.path.join(settings.DISTRO_TRACKER_MAILDIR_DIRECTORY, 'new')
  193. @classmethod
  194. def _get_mail_path(cls, entry, subfolder=None):
  195. return os.path.join(cls._get_maildir(subfolder), entry)
  196. def initialize(self):
  197. """Scan the Maildir and fill the queue with the mails in it."""
  198. for mail in os.listdir(self._get_maildir()):
  199. self.add(mail)
  200. @property
  201. def pool(self):
  202. if getattr(self, '_pool', None):
  203. return self._pool
  204. self._pool = Pool(self.MAX_WORKERS, maxtasksperchild=100)
  205. return self._pool
  206. def close_pool(self):
  207. """Wait until all worker processes are finished and destroy the pool"""
  208. if getattr(self, '_pool', None) is None:
  209. return
  210. self._pool.close()
  211. self._pool.join()
  212. self._pool = None
  213. def process_queue(self):
  214. """
  215. Iterate over messages in the queue and do whateever is appropriate.
  216. """
  217. # Work on a snapshot of the queue as it will be modified each time
  218. # a task is finished
  219. queue = [item for item in self.queue]
  220. for entry in queue:
  221. if not entry.processing_task_started():
  222. entry.start_processing_task()
  223. if entry.processing_task_finished():
  224. entry.handle_processing_task_result()
  225. def sleep_timeout(self):
  226. """
  227. Return the maximum delay we can sleep before we process the queue
  228. again.
  229. """
  230. timeout = 86400.0
  231. for entry in self.queue:
  232. next_try_time = entry.get_data('next_try_time')
  233. if entry.processing_task_finished():
  234. timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_FINISHED)
  235. elif entry.processing_task_started():
  236. timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_RUNNING)
  237. elif next_try_time is not None:
  238. wait_time = next_try_time - distro_tracker.core.utils.now()
  239. timeout = min(timeout, wait_time.total_seconds())
  240. else:
  241. timeout = min(timeout, self.SLEEP_TIMEOUT_TASK_RUNNABLE)
  242. timeout = self.SLEEP_TIMEOUT_EMPTY if not len(self.queue) else timeout
  243. return timeout
  244. def process_loop(self, stop_after=None, ready_cb=None):
  245. """
  246. Process all messages as they are delivered. Also processes pre-existing
  247. messages. This method never returns.
  248. :param int stop_after: Stop the loop after having processed the given
  249. number of messages. Used mainly by unit tests.
  250. :param ready_cb: a callback executed after setup of filesystem
  251. monitoring and initial scan of the mail queue, but before the
  252. start of the loop.
  253. """
  254. watcher = MailQueueWatcher(self)
  255. watcher.start()
  256. self.initialize()
  257. if ready_cb:
  258. ready_cb()
  259. while True:
  260. watcher.process_events(timeout=self.sleep_timeout())
  261. self.process_queue()
  262. if stop_after is not None and self.processed_count >= stop_after:
  263. break
  264. class MailQueueEntry(object):
  265. """
  266. An entry in a :py:class:MailQueue.
  267. Contains the following public attributes:
  268. .. :py:attr: queue
  269. The parent :py:class:MailQueue.
  270. .. :py:attr: identifier
  271. The entry identifier, it's the name of the file within the directory
  272. of the MailQueue. Used to uniquely identify the entry in the MailQueue.
  273. .. :py:attr: path
  274. The full path to the mail file.
  275. """
  276. def __init__(self, queue, identifier):
  277. self.queue = queue
  278. self.identifier = identifier
  279. self.path = os.path.join(self.queue._get_maildir(), self.identifier)
  280. self.data = {
  281. 'creation_time': distro_tracker.core.utils.now(),
  282. }
  283. def set_data(self, key, value):
  284. self.data[key] = value
  285. def get_data(self, key):
  286. return self.data.get(key)
  287. def move_to_subfolder(self, folder):
  288. """
  289. Move an entry from the mailqueue to the given subfolder.
  290. """
  291. new_maildir = self.queue._get_maildir(folder)
  292. if not os.path.exists(new_maildir):
  293. os.makedirs(new_maildir)
  294. os.rename(self.path, os.path.join(new_maildir, self.identifier))
  295. def _processed_cb(self, _):
  296. """Callback executed when a worker completes successfully"""
  297. self.queue.remove(self.identifier)
  298. if os.path.exists(self.path):
  299. os.unlink(self.path)
  300. def start_processing_task(self):
  301. """
  302. Create a MailProcessor and schedule its execution in the worker pool.
  303. """
  304. next_try_time = self.get_data('next_try_time')
  305. log_failure = self.get_data('log_failure')
  306. now = distro_tracker.core.utils.now()
  307. if next_try_time and next_try_time > now:
  308. return
  309. result = self.queue.pool.apply_async(run_mail_processor,
  310. (self.path, log_failure),
  311. callback=self._processed_cb)
  312. self.set_data('task_result', result)
  313. def processing_task_started(self):
  314. """
  315. Returns True when the entry has been fed to workers doing mail
  316. processing. Returns False otherwise.
  317. :return: an indication whether the mail processing is on-going.
  318. :rtype: bool
  319. """
  320. return self.get_data('task_result') is not None
  321. def processing_task_finished(self):
  322. """
  323. Returns True when the worker processing the mail has finished its work.
  324. Returns False otherwise, notably when the entry has not been fed to
  325. any worker yet.
  326. :return: an indication whether the mail processing has finished.
  327. :rtype: bool
  328. """
  329. if not self.processing_task_started():
  330. return False
  331. return self.get_data('task_result').ready()
  332. def handle_processing_task_result(self):
  333. """
  334. Called with mails that have been pushed to workers but that are
  335. still in the queue. The task likely failed and we need to handle
  336. the failure smartly.
  337. Mails whose task raised an exception derived from
  338. :py:class:MailProcessorException are directly moved to a "broken"
  339. subfolder and the corresponding entry is dropped from the queue.
  340. Mails whose task raised other exceptions are kept around for
  341. multiple retries and after some time they are moved to a "failed"
  342. subfolder and the corresponding entry is dropped from the queue.
  343. """
  344. task_result = self.get_data('task_result')
  345. if task_result is None:
  346. return
  347. try:
  348. task_result.get()
  349. self._processed_cb(task_result)
  350. except MailProcessorException:
  351. logger.warning('Failed processing %s', self.identifier)
  352. self.move_to_subfolder('failed')
  353. self.queue.remove(self.identifier)
  354. except Exception:
  355. if not self.schedule_next_try():
  356. logger.warning('Failed processing %s (and stop retrying)',
  357. self.identifier)
  358. self.move_to_subfolder('broken')
  359. self.queue.remove(self.identifier)
  360. else:
  361. logger.warning('Failed processing %s (but will retry later)',
  362. self.identifier)
  363. def schedule_next_try(self):
  364. """
  365. When the mail processing failed, schedule a new try for later.
  366. Progressively increase the delay between two tries. After 5 tries,
  367. refuse to schedule a new try and return False.
  368. :return: True if a new try has been scheduled, False otherwise.
  369. """
  370. count = self.get_data('tries') or 0
  371. delays = [
  372. timedelta(seconds=150),
  373. timedelta(seconds=300),
  374. timedelta(seconds=600),
  375. timedelta(seconds=1800),
  376. timedelta(seconds=3600),
  377. timedelta(seconds=7200),
  378. ]
  379. try:
  380. delay = delays[count]
  381. except IndexError:
  382. return False
  383. now = distro_tracker.core.utils.now()
  384. self.set_data('next_try_time', now + delay)
  385. self.set_data('tries', count + 1)
  386. self.set_data('task_result', None)
  387. self.set_data('log_failure', count + 1 == len(delays))
  388. return True
  389. class MailQueueWatcher(object):
  390. """Watch a mail queue and add entries as they appear on the filesystem"""
  391. class EventHandler(pyinotify.ProcessEvent):
  392. def my_init(self, queue=None):
  393. self.queue = queue
  394. def process_IN_CREATE(self, event):
  395. self.queue.add(event.name)
  396. def process_IN_MOVED_TO(self, event):
  397. self.queue.add(event.name)
  398. def __init__(self, queue):
  399. self.queue = queue
  400. def start(self):
  401. """Start watching the directory of the mail queue."""
  402. path = self.queue._get_maildir()
  403. self.wm = pyinotify.WatchManager()
  404. event_handler = self.EventHandler(queue=self.queue)
  405. pyinotify.AsyncNotifier(self.wm, event_handler)
  406. self.wm.add_watch(path, pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO,
  407. quiet=False)
  408. def process_events(self, timeout=0, count=1):
  409. """
  410. Process all pending events since last call of the function.
  411. :param float timeout: Maximum time to wait for an event to happen.
  412. :param int count: Number of processing loops to do.
  413. """
  414. asyncore.loop(timeout=timeout, count=count)