tests_processor.py 31 KB


  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015 The Distro Tracker Developers
  3. # See the COPYRIGHT file at the top-level directory of this distribution and
  4. # at http://deb.li/DTAuthors
  5. #
  6. # This file is part of Distro Tracker. It is subject to the license terms
  7. # in the LICENSE file found in the top-level directory of this
  8. # distribution and at http://deb.li/DTLicense. No part of Distro Tracker,
  9. # including this file, may be copied, modified, propagated, or distributed
  10. # except according to the terms contained in the LICENSE file.
  11. """
  12. Tests for :mod:`distro_tracker.mail.processor`.
  13. """
  14. from __future__ import unicode_literals
  15. from email.message import Message
  16. from datetime import datetime
  17. from datetime import timedelta
  18. import multiprocessing
  19. import os.path
  20. import time
  21. from django.conf import settings
  22. from django.test.utils import override_settings
  23. from django.utils import six
  24. from django.utils.encoding import force_bytes
  25. from django.utils.six.moves import mock
  26. import pyinotify
  27. from distro_tracker.test import TestCase
  28. from distro_tracker.core.utils.email_messages import message_from_bytes
  29. from distro_tracker.mail.processor import MailProcessor
  30. from distro_tracker.mail.processor import MailQueue
  31. from distro_tracker.mail.processor import MailQueueEntry
  32. from distro_tracker.mail.processor import MailQueueWatcher
  33. from distro_tracker.mail.processor import ConflictingDeliveryAddresses
  34. from distro_tracker.mail.processor import InvalidDeliveryAddress
  35. from distro_tracker.mail.processor import MissingDeliveryAddress
  36. from distro_tracker.mail.processor import MailProcessorException
  37. class HelperMixin(object):
  38. @staticmethod
  39. def create_mail(filename, subject='A subject', **kwargs):
  40. with open(filename, 'wb') as msg:
  41. msg.write(b'Subject: ' + force_bytes(subject) + b'\n\nBody')
  42. @staticmethod
  43. def mkdir(directory):
  44. if not os.path.exists(directory):
  45. os.makedirs(directory)
  46. def patch_mail_processor(self, new=mock.DEFAULT):
  47. process_method = 'distro_tracker.mail.processor.MailProcessor.process'
  48. patcher = mock.patch(process_method)
  49. mock_process = patcher.start()
  50. self.addCleanup(patcher.stop)
  51. return mock_process
  52. def patch_methods(self, entry, **kwargs):
  53. mock_objects = {}
  54. for method in kwargs:
  55. patcher = mock.patch.object(entry, method)
  56. mocked_method = patcher.start()
  57. if callable(kwargs[method]):
  58. mocked_method.side_effect = kwargs[method]
  59. else:
  60. mocked_method.return_value = kwargs[method]
  61. mock_objects[method] = mocked_method
  62. self.addCleanup(patcher.stop)
  63. return mock_objects
  64. def patch_now(self):
  65. """
  66. Replace distro_tracker.core.utils.now() with a mocked version returning
  67. self.current_datetime
  68. """
  69. def get_datetime(*args, **kwargs):
  70. return self.current_datetime
  71. if not hasattr(self, 'current_datetime'):
  72. self.current_datetime = datetime.now()
  73. patcher = mock.patch('distro_tracker.core.utils.now')
  74. now = patcher.start()
  75. now.side_effect = get_datetime
  76. self.addCleanup(patcher.stop)
  77. @override_settings(DISTRO_TRACKER_FQDN='tracker.debian.org')
  78. class MailProcessorTest(TestCase, HelperMixin):
  79. def setUp(self):
  80. """Create a MailProcessor object"""
  81. self.msg = Message()
  82. self.processor = MailProcessor(self.msg)
  83. self.DOMAIN = settings.DISTRO_TRACKER_FQDN
  84. def _test_find_addr_with(self, field):
  85. to_addr = 'foo@{}'.format(self.DOMAIN)
  86. self.msg.add_header(field, to_addr)
  87. addr = self.processor.find_delivery_address(self.msg)
  88. self.assertEqual(to_addr, addr)
  89. def test_find_addr_with_delivered_to(self):
  90. """Delivered-To is found and used"""
  91. self._test_find_addr_with('Delivered-To')
  92. def test_find_addr_with_envelope_to(self):
  93. """Envelope-To is found and used"""
  94. self._test_find_addr_with('Envelope-To')
  95. def test_find_addr_with_x_original_to(self):
  96. """X-Original-To is found and used"""
  97. self._test_find_addr_with('X-Original-To')
  98. def test_find_addr_with_x_envelope_to(self):
  99. """X-Envelope-To is found and used"""
  100. self._test_find_addr_with('X-Envelope-To')
  101. @override_settings(DISTRO_TRACKER_FQDN='domain.test')
  102. def test_find_addr_ignores_bad_domain(self):
  103. """Headers pointing to domain that do not match the FQDN are ignored """
  104. to_addr = 'foo@{}'.format(self.DOMAIN)
  105. # Entirely different domain should be ignored
  106. self.msg.add_header('Envelope-To', to_addr)
  107. self.msg.add_header('Delivered-To', to_addr)
  108. # Subdomains should be ignored too
  109. self.msg.add_header('Delivered-To', 'foo@bar.domain.test')
  110. addr = self.processor.find_delivery_address(self.msg)
  111. self.assertIsNone(addr)
  112. def test_find_addr_with_multiple_field_copies(self):
  113. """All copies of the same fields are parsed"""
  114. to_addr = 'foo@{}'.format(self.DOMAIN)
  115. self.msg.add_header('Delivered-To', 'foo@bar')
  116. self.msg.add_header('Delivered-To', to_addr)
  117. self.msg.add_header('Delivered-To', 'foo@baz')
  118. addr = self.processor.find_delivery_address(self.msg)
  119. self.assertEqual(to_addr, addr)
  120. def test_find_addr_conflicting(self):
  121. """Fails when encountering multiple headers with the same domain"""
  122. self.msg.add_header('Delivered-To', 'foo@{}'.format(self.DOMAIN))
  123. self.msg.add_header('Delivered-To', 'bar@{}'.format(self.DOMAIN))
  124. with self.assertRaises(ConflictingDeliveryAddresses):
  125. self.processor.find_delivery_address(self.msg)
  126. def test_find_addr_duplicate_non_conflicting(self):
  127. """Does not fail when encountering same address in different headers"""
  128. to_addr = 'foo@{}'.format(self.DOMAIN)
  129. self.msg.add_header('Delivered-To', to_addr)
  130. self.msg.add_header('X-Original-To', to_addr)
  131. addr = self.processor.find_delivery_address(self.msg)
  132. self.assertEqual(to_addr, addr)
  133. def test_identify_service_without_details(self):
  134. """identify_service(foo@bar) returns (foo, None)"""
  135. (service, details) = self.processor.identify_service('foo@bar')
  136. self.assertEqual(service, 'foo')
  137. self.assertIsNone(details)
  138. def test_identify_service_with_details(self):
  139. """identify_service(foo+baz@bar) returns (foo, baz)"""
  140. (service, details) = self.processor.identify_service('foo+baz@bar')
  141. self.assertEqual(service, 'foo')
  142. self.assertEqual(details, 'baz')
  143. def test_identify_service_with_details_with_plus(self):
  144. """identify_service(foo+baz+baz@bar) returns (foo, baz+baz)"""
  145. (service, details) = self.processor.identify_service('foo+baz+baz@bar')
  146. self.assertEqual(service, 'foo')
  147. self.assertEqual(details, 'baz+baz')
  148. def _test_process_for_addr(self, local_part, method_name, *args, **kwargs):
  149. self.msg.add_header('Delivered-To',
  150. '{}@{}'.format(local_part, self.DOMAIN))
  151. with mock.patch.object(self.processor, method_name) as func:
  152. self.processor.process()
  153. func.assert_called_once_with(*args, **kwargs)
  154. def test_process_control(self):
  155. '''control@ is processed by handle_control()'''
  156. self._test_process_for_addr('control', 'handle_control')
  157. def test_process_dispatch(self):
  158. '''dispatch@ is processed by handle_dispatch(None, None)'''
  159. self._test_process_for_addr('dispatch', 'handle_dispatch', None, None)
  160. def test_process_dispatch_with_package(self):
  161. '''dispatch+foo@ is processed by handle_dispatch(foo, None)'''
  162. self._test_process_for_addr('dispatch+foo', 'handle_dispatch',
  163. 'foo', None)
  164. def test_process_dispatch_with_package_and_keyword(self):
  165. '''dispatch+foo_bar@ is processed by handle_dispatch(foo, bar)'''
  166. self._test_process_for_addr('dispatch+foo_bar', 'handle_dispatch',
  167. 'foo', 'bar')
  168. def test_process_bounces(self):
  169. '''bounces+foo@ is processed by handle_bounces()'''
  170. self._test_process_for_addr('bounces+foo', 'handle_bounces', 'foo')
  171. def test_process_without_delivery_address(self):
  172. '''process() fails when no delivery address can be identified'''
  173. with self.assertRaises(MissingDeliveryAddress):
  174. self.processor.process()
  175. @override_settings(DISTRO_TRACKER_ACCEPT_UNQUALIFIED_EMAILS=False)
  176. def test_process_unknown_service_fails(self):
  177. '''process() fails when delivery address is not a known service'''
  178. self.msg.add_header('Delivered-To', 'unknown@{}'.format(self.DOMAIN))
  179. with self.assertRaises(InvalidDeliveryAddress):
  180. self.processor.process()
  181. @override_settings(DISTRO_TRACKER_ACCEPT_UNQUALIFIED_EMAILS=True)
  182. def test_process_unknown_service_works_as_dispatch(self):
  183. '''process() fails when delivery address is not a known service'''
  184. self._test_process_for_addr('unknown', 'handle_dispatch', 'unknown',
  185. None)
  186. def test_load_mail_from_file(self):
  187. '''loads the mail to process from a file'''
  188. mail_path = os.path.join(settings.DISTRO_TRACKER_DATA_PATH, 'a-mail')
  189. self.create_mail(mail_path, subject='load_mail')
  190. self.processor.load_mail_from_file(mail_path)
  191. self.assertEqual(self.processor.message['Subject'], 'load_mail')
  192. def test_init_with_filename(self):
  193. '''can create object with filename'''
  194. mail_path = os.path.join(settings.DISTRO_TRACKER_DATA_PATH, 'a-mail')
  195. self.create_mail(mail_path, subject='load_mail')
  196. mail_proc = MailProcessor(mail_path)
  197. self.assertIsInstance(mail_proc.message, Message)
  198. class QueueHelperMixin(HelperMixin):
  199. def create_mail(self, filename, **kwargs):
  200. """
  201. Creates a mail and stores it in the maildir.
  202. """
  203. hardlink_from_tmp = kwargs.get('hardlink_from_tmp', False)
  204. maildir_new = self.queue._get_maildir()
  205. self.mkdir(maildir_new)
  206. tmp_path = path = self.get_mail_path(filename)
  207. if hardlink_from_tmp:
  208. maildir_tmp = os.path.join(maildir_new, '..', 'tmp')
  209. self.mkdir(maildir_tmp)
  210. tmp_path = os.path.join(maildir_tmp, filename)
  211. super(QueueHelperMixin, self).create_mail(tmp_path, **kwargs)
  212. if hardlink_from_tmp:
  213. os.link(tmp_path, path)
  214. os.unlink(tmp_path)
  215. return path
  216. def get_mail_path(self, filename):
  217. maildir = self.queue._get_maildir()
  218. return os.path.join(maildir, filename)
  219. def add_mails_to_queue(self, *args, **kwargs):
  220. entries = []
  221. for entry in args:
  222. if kwargs.get('create_mail', True):
  223. self.create_mail(entry)
  224. entries.append(self.queue.add(entry))
  225. return entries
  226. def assertQueueIsEmpty(self):
  227. self.assertListEqual(self.queue.queue, [])
  228. def assertNotInQueue(self, entry):
  229. self.assertNotIn(entry, self.queue.queue)
  230. def assertInQueue(self, entry):
  231. self.assertIn(entry, self.queue.queue)
  232. class MailQueueTest(TestCase, QueueHelperMixin):
  233. def setUp(self):
  234. self.queue = MailQueue()
  235. self.queue.MAX_WORKERS = 1
  236. def test_default_attributes(self):
  237. """The needed attributes are there"""
  238. self.assertIsInstance(self.queue.queue, list)
  239. self.assertIsInstance(self.queue.entries, dict)
  240. self.assertEqual(self.queue.processed_count, 0)
  241. def test_add_returns_mail_queue_entry(self):
  242. identifier = 'a'
  243. entry = self.queue.add(identifier)
  244. self.assertIsInstance(entry, MailQueueEntry)
  245. self.assertEqual(entry.identifier, identifier)
  246. def test_add_twice(self):
  247. """Duplicate add() is ignored"""
  248. self.queue.add('a')
  249. self.queue.add('a')
  250. self.assertEqual(len(self.queue.queue), 1)
  251. def test_remove(self):
  252. """remove() cancels the effects of add()"""
  253. self.queue.add('a')
  254. self.queue.remove('a')
  255. self.assertQueueIsEmpty()
  256. self.assertEqual(len(self.queue.entries), 0)
  257. def test_remove_increases_processed_count(self):
  258. """remove() counts the number of processed mails"""
  259. self.queue.add('a')
  260. self.assertEqual(self.queue.processed_count, 0)
  261. self.queue.remove('a')
  262. self.assertEqual(self.queue.processed_count, 1)
  263. def test_remove_non_existing(self):
  264. """remove() is a no-op for a non-existing entry"""
  265. self.queue.remove('a')
  266. @mock.patch('os.listdir')
  267. def test_initialize(self, mock_listdir):
  268. """
  269. Initialize calls os.listdir() on the Maildir/new and populates
  270. the queue attribute with it
  271. """
  272. mock_listdir.return_value = ['a', 'b', 'c']
  273. new = os.path.join(settings.DISTRO_TRACKER_MAILDIR_DIRECTORY, 'new')
  274. self.queue.initialize()
  275. mock_listdir.assert_called_with(new)
  276. self.assertListEqual(
  277. list(map(lambda x: x.identifier, self.queue.queue)),
  278. mock_listdir.return_value)
  279. def test_pool_is_multiprocessing_pool(self):
  280. self.assertIsInstance(self.queue.pool, multiprocessing.pool.Pool)
  281. def test_pool_is_singleton(self):
  282. self.assertEqual(self.queue.pool, self.queue.pool)
  283. def test_close_pool_drops_cached_object(self):
  284. self.queue.pool
  285. self.queue.close_pool()
  286. self.assertIsNone(self.queue._pool)
  287. def test_close_pool_works_without_pool(self):
  288. self.queue.close_pool()
  289. def test_close_pool_really_closes_the_pool(self):
  290. pool = self.queue.pool
  291. self.queue.close_pool()
  292. if six.PY2:
  293. expected_exception = AssertionError # assert self._state == RUN
  294. else:
  295. expected_exception = ValueError # Pool not running exception
  296. with self.assertRaises(expected_exception):
  297. pool.apply_async(time.sleep, 0)
  298. def test_process_queue_handles_preexisting_mails(self):
  299. """Pre-existing mails are processed"""
  300. self.patch_mail_processor()
  301. self.add_mails_to_queue('a', 'b')
  302. self.queue.process_queue()
  303. self.queue.close_pool()
  304. self.assertQueueIsEmpty()
  305. def test_process_queue_does_not_start_tasks_for_entries_with_task(self):
  306. """Mails being processed are not re-queued"""
  307. self.patch_mail_processor()
  308. entry_a, entry_b = self.add_mails_to_queue('a', 'b')
  309. self.patch_methods(entry_a, processing_task_started=True,
  310. processing_task_finished=False,
  311. start_processing_task=None)
  312. self.patch_methods(entry_b, processing_task_started=False,
  313. processing_task_finished=False,
  314. start_processing_task=None)
  315. self.queue.process_queue()
  316. self.assertFalse(entry_a.start_processing_task.called)
  317. entry_b.start_processing_task.assert_called_once_with()
  318. def test_process_queue_handles_processing_task_result(self):
  319. """Mails being processed are handled when finished"""
  320. self.patch_mail_processor()
  321. entry_a, entry_b = self.add_mails_to_queue('a', 'b')
  322. self.patch_methods(entry_a, processing_task_started=True,
  323. processing_task_finished=False,
  324. handle_processing_task_result=None)
  325. self.patch_methods(entry_b, processing_task_started=True,
  326. processing_task_finished=True,
  327. handle_processing_task_result=None)
  328. self.queue.process_queue()
  329. entry_a.processing_task_finished.assert_called_once_with()
  330. entry_b.processing_task_finished.assert_called_once_with()
  331. self.assertFalse(entry_a.handle_processing_task_result.called)
  332. entry_b.handle_processing_task_result.assert_called_once_with()
  333. def test_process_queue_works_when_queue_items_are_removed(self):
  334. """The processing of entries results in entries being dropped. This
  335. should not confuse process_queue which should still properly
  336. process all entries"""
  337. queue = ['a', 'b', 'c', 'd', 'e', 'f']
  338. self.queue._count_mock_calls = 0
  339. for entry in self.add_mails_to_queue(*queue):
  340. def side_effect():
  341. entry.queue._count_mock_calls += 1
  342. entry.queue.remove(entry.identifier)
  343. return False
  344. self.patch_methods(entry, processing_task_started=True,
  345. processing_task_finished=side_effect)
  346. self.queue.process_queue()
  347. self.assertEqual(self.queue._count_mock_calls, len(queue))
  348. def test_sleep_timeout_mailqueue_empty(self):
  349. self.assertEqual(self.queue.sleep_timeout(),
  350. self.queue.SLEEP_TIMEOUT_EMPTY)
  351. def _add_entry_task_running(self, name):
  352. entry = self.add_mails_to_queue(name)[0]
  353. self.patch_methods(entry, processing_task_started=True,
  354. processing_task_finished=False)
  355. return entry
  356. def test_sleep_timeout_task_started_not_finished(self):
  357. self._add_entry_task_running('a')
  358. self.assertEqual(self.queue.sleep_timeout(),
  359. self.queue.SLEEP_TIMEOUT_TASK_RUNNING)
  360. def _add_entry_task_finished(self, name):
  361. entry = self.add_mails_to_queue(name)[0]
  362. self.patch_methods(entry, processing_task_started=True,
  363. processing_task_finished=True)
  364. return entry
  365. def test_sleep_timeout_task_finished(self):
  366. self._add_entry_task_finished('a')
  367. self.assertEqual(self.queue.sleep_timeout(),
  368. self.queue.SLEEP_TIMEOUT_TASK_FINISHED)
  369. def _add_entry_task_waiting_next_try(self, name):
  370. entry = self.add_mails_to_queue(name)[0]
  371. entry.schedule_next_try()
  372. return entry
  373. def _get_wait_time(self, entry):
  374. wait_time = entry.get_data('next_try_time') - self.current_datetime
  375. return wait_time.total_seconds()
  376. def test_sleep_timeout_task_waiting_next_try(self):
  377. self.patch_now()
  378. entry = self._add_entry_task_waiting_next_try('a')
  379. wait_time = self._get_wait_time(entry)
  380. self.assertEqual(self.queue.sleep_timeout(), wait_time)
  381. def _add_entry_task_runnable(self, name):
  382. entry = self.add_mails_to_queue(name)[0]
  383. return entry
  384. def test_sleep_timeout_task_runnable(self):
  385. self._add_entry_task_runnable('a')
  386. self.assertEqual(self.queue.sleep_timeout(),
  387. self.queue.SLEEP_TIMEOUT_TASK_RUNNABLE)
  388. def test_sleep_timeout_picks_the_shorter_wait_time(self):
  389. self.patch_now()
  390. self._add_entry_task_running('a')
  391. self._add_entry_task_runnable('b')
  392. self._add_entry_task_finished('c')
  393. entry_d = self._add_entry_task_waiting_next_try('d')
  394. wait_time = self._get_wait_time(entry_d)
  395. self.assertEqual(self.queue.sleep_timeout(),
  396. min(self.queue.SLEEP_TIMEOUT_TASK_RUNNING,
  397. self.queue.SLEEP_TIMEOUT_TASK_RUNNABLE,
  398. self.queue.SLEEP_TIMEOUT_TASK_FINISHED,
  399. wait_time))
  400. def start_process_loop(self, stop_after=None):
  401. """
  402. Start process_loop() in a dedicated process and ensure it's
  403. ready to proocess new files before returning
  404. """
  405. self.mkdir(self.queue._get_maildir())
  406. lock = multiprocessing.Lock()
  407. lock.acquire()
  408. def process_loop(lock):
  409. def release_lock():
  410. lock.release()
  411. queue = MailQueue()
  412. queue.process_loop(stop_after=stop_after, ready_cb=release_lock)
  413. process = multiprocessing.Process(target=process_loop, args=(lock,))
  414. process.start()
  415. lock.acquire()
  416. return process
  417. def test_process_loop_processes_one_mail(self):
  418. self.patch_mail_processor()
  419. process = self.start_process_loop(stop_after=1)
  420. # The mail is created after process_loop() is ready
  421. path = self.create_mail('a')
  422. # We wait the end of the task for max 1 second
  423. process.join(1)
  424. # Process finished successfully (and we're not here due to timeout)
  425. if process.is_alive():
  426. process.terminate()
  427. self.fail("process_loop did not terminate")
  428. self.assertFalse(process.is_alive())
  429. self.assertEqual(process.exitcode, 0)
  430. # And it did its job by handling the mail
  431. self.assertFalse(os.path.exists(path))
  432. @mock.patch('distro_tracker.mail.processor.MailQueueWatcher')
  433. def test_process_loop_calls_sleep_timeout(self, mock_watcher):
  434. """Ensure we feed the sleep timeout to watcher.process_events"""
  435. self.mkdir(self.queue._get_maildir())
  436. self.patch_methods(self.queue, sleep_timeout=mock.sentinel.DELAY)
  437. self.queue.process_loop(stop_after=0)
  438. mock_watcher.return_value.process_events.assert_called_with(
  439. timeout=mock.sentinel.DELAY)
  440. @mock.patch('distro_tracker.mail.processor.MailQueueWatcher')
  441. def test_process_loop_calls_initialize(self, mock_watcher):
  442. """Ensure we call the initialize method before announcing readyness"""
  443. self.patch_methods(self.queue, initialize=None)
  444. def check_when_ready():
  445. self.queue.initialize.assert_called_with()
  446. self.queue.process_loop(stop_after=0, ready_cb=check_when_ready)
  447. class MailQueueEntryTest(TestCase, QueueHelperMixin):
  448. def setUp(self):
  449. self.queue = MailQueue()
  450. self.identifier = 'mail-abc'
  451. self.entry = self.queue.add(self.identifier)
  452. def test_attributes(self):
  453. maildir = self.queue._get_maildir()
  454. self.assertEqual(self.entry.identifier, self.identifier)
  455. self.assertEqual(self.entry.queue, self.queue)
  456. self.assertEqual(self.entry.path,
  457. os.path.join(maildir, self.identifier))
  458. self.assertIsInstance(self.entry.data, dict)
  459. def test_entry_has_creation_time(self):
  460. self.patch_now()
  461. entry = MailQueueEntry(self.queue, self.identifier)
  462. self.assertEqual(entry.get_data('creation_time'), self.current_datetime)
  463. def test_set_get_data_cycle(self):
  464. self.entry.set_data('key', mock.sentinel.data_value)
  465. self.assertIs(self.entry.get_data('key'), mock.sentinel.data_value)
  466. def test_get_data_on_unset_data(self):
  467. self.assertIsNone(self.entry.get_data('key'))
  468. def test_move_to_subfolder(self):
  469. old_path = self.entry.path
  470. new_path = os.path.join(self.queue._get_maildir('subfolder'),
  471. self.identifier)
  472. self.mkdir(self.queue._get_maildir())
  473. self.create_mail(old_path, subject='move_to_subfolder')
  474. self.assertTrue(os.path.exists(old_path))
  475. self.assertFalse(os.path.exists(new_path))
  476. self.entry.move_to_subfolder('subfolder')
  477. self.assertFalse(os.path.exists(old_path))
  478. self.assertTrue(os.path.exists(new_path))
  479. with open(new_path, 'rb') as f:
  480. msg = message_from_bytes(f.read())
  481. self.assertEqual(msg['Subject'], 'move_to_subfolder')
  482. def test_start_processing_task_does_its_job(self):
  483. self.create_mail(self.identifier)
  484. self.patch_mail_processor()
  485. self.assertFalse(self.entry.processing_task_started())
  486. self.assertInQueue(self.entry)
  487. self.assertTrue(os.path.exists(self.entry.path))
  488. self.entry.start_processing_task()
  489. self.assertTrue(self.entry.processing_task_started())
  490. self.queue.close_pool() # Wais until the worker finished
  491. self.assertNotInQueue(self.entry)
  492. self.assertFalse(os.path.exists(self.entry.path))
  493. def test_start_processing_task_stores_task_result(self):
  494. self.patch_mail_processor()
  495. self.entry.start_processing_task()
  496. result = self.entry.get_data('task_result')
  497. self.assertIsInstance(result, multiprocessing.pool.AsyncResult)
  498. def test_start_processing_task_respects_next_try_time(self):
  499. self.patch_now()
  500. self.patch_mail_processor()
  501. self.entry.set_data('next_try_time',
  502. self.current_datetime + timedelta(seconds=10))
  503. self.entry.start_processing_task()
  504. self.assertFalse(self.entry.processing_task_started())
  505. self.current_datetime += timedelta(seconds=10)
  506. self.entry.start_processing_task()
  507. self.assertTrue(self.entry.processing_task_started())
  508. def test_processing_task_finished(self):
  509. self.patch_mail_processor()
  510. self.assertFalse(self.entry.processing_task_finished())
  511. self.entry.start_processing_task()
  512. self.entry.get_data('task_result').wait()
  513. self.assertTrue(self.entry.processing_task_finished())
  514. def test_task_result_get_forwards_exceptions(self):
  515. """
  516. Ensure that the get() method of a task's AsyncResult forwards
  517. exceptions thrown by the worker. This should be so as per
  518. documentation but ensuring that we can reproduce is better.
  519. """
  520. self.create_mail(self.identifier)
  521. mock_process = self.patch_mail_processor()
  522. mock_process.side_effect = MailProcessorException
  523. self.entry.start_processing_task()
  524. task_result = self.entry.get_data('task_result')
  525. with self.assertRaises(MailProcessorException):
  526. task_result.get()
  527. @staticmethod
  528. def _get_fake_task_result(side_effect=None):
  529. mock_task_result = mock.MagicMock()
  530. if side_effect:
  531. mock_task_result.get.side_effect = side_effect
  532. return mock_task_result
  533. def test_handle_processing_task_result(self):
  534. """Should not fail with a task that actually worked"""
  535. task_result = self._get_fake_task_result()
  536. self.entry.set_data('task_result', task_result)
  537. self.entry.handle_processing_task_result()
  538. task_result.get.assert_called_with()
  539. self.assertNotInQueue(self.entry)
  540. def test_handle_processing_task_result_without_results(self):
  541. """Should do nothing when task is not yet finished"""
  542. self.entry.handle_processing_task_result()
  543. self.assertInQueue(self.entry)
  544. def test_handle_processing_task_result_mail_processor_exception(self):
  545. '''Task failing with a MailProcessorException result in
  546. immediate failure and move to the failed subfolder'''
  547. task_result = self._get_fake_task_result(
  548. side_effect=MailProcessorException)
  549. self.entry.set_data('task_result', task_result)
  550. with mock.patch.object(self.entry, 'move_to_subfolder') as mock_move:
  551. self.entry.handle_processing_task_result()
  552. mock_move.assert_called_once_with('failed')
  553. self.assertNotInQueue(self.entry)
  554. def test_handle_processing_task_resulted_in_exception_no_tries_left(self):
  555. '''Task failing with a generic exception result in failure when
  556. the entry refuses to schedule a new try'''
  557. task_result = self._get_fake_task_result(side_effect=Exception)
  558. self.entry.set_data('task_result', task_result)
  559. self.patch_methods(self.entry, move_to_subfolder=None,
  560. schedule_next_try=False)
  561. self.entry.handle_processing_task_result()
  562. self.entry.move_to_subfolder.assert_called_with('broken')
  563. self.assertNotInQueue(self.entry)
  564. def test_handle_processing_task_resulted_in_exception_tries_left(self):
  565. '''Task failing with a generic exception result in a new try when
  566. allowed'''
  567. task_result = self._get_fake_task_result(side_effect=Exception)
  568. self.entry.set_data('task_result', task_result)
  569. self.patch_methods(self.entry, move_to_subfolder=None,
  570. schedule_next_try=True)
  571. self.entry.handle_processing_task_result()
  572. self.assertFalse(self.entry.move_to_subfolder.called)
  573. self.assertInQueue(self.entry)
  574. def test_schedule_next_try_returns_true_a_few_times(self):
  575. '''Accept to schedule a new try a few times'''
  576. for i in range(4):
  577. self.assertTrue(self.entry.schedule_next_try())
  578. def test_schedule_next_try_eventually_returns_false(self):
  579. '''Eventually decide that enough is enough'''
  580. count = 0
  581. while self.entry.schedule_next_try():
  582. count += 1
  583. if count > 50:
  584. self.fail("schedule_next_try doesn't want to fail")
  585. def test_schedule_next_try_sets_log_failure_on_first_try(self):
  586. '''Should not log failure on first try'''
  587. self.entry.set_data('log_failure', True)
  588. self.entry.schedule_next_try()
  589. self.assertFalse(self.entry.get_data('log_failure'))
  590. def test_schedule_next_try_does_log_failure_on_last_try_only(self):
  591. '''Should log failure on last try'''
  592. while not self.entry.get_data('log_failure'):
  593. self.assertTrue(self.entry.schedule_next_try())
  594. self.assertFalse(self.entry.schedule_next_try())
  595. def test_schedule_next_try_sets_next_try_time(self):
  596. '''The scheduling is done via next_try_time data entry'''
  597. self.patch_now()
  598. for i in range(10):
  599. if not self.entry.schedule_next_try():
  600. break
  601. next_try = self.entry.get_data('next_try_time')
  602. self.assertGreater(next_try, self.current_datetime)
  603. self.curent_datetime = next_try
  604. def test_schedule_next_try_reset_started_flag(self):
  605. self.entry.start_processing_task()
  606. self.assertTrue(self.entry.processing_task_started())
  607. self.entry.schedule_next_try()
  608. self.assertFalse(self.entry.processing_task_started())
  609. class MailQueueWatcherTest(TestCase, QueueHelperMixin):
  610. def setUp(self):
  611. self.queue = MailQueue()
  612. self.watcher = MailQueueWatcher(self.queue)
  613. self.mkdir(self.queue._get_maildir())
  614. def test_process_events(self):
  615. self.watcher.start()
  616. self.assertQueueIsEmpty()
  617. self.create_mail('a')
  618. self.watcher.process_events()
  619. self.assertEqual(len(self.queue.queue), 1)
  620. self.assertEqual(self.queue.queue[0].identifier, 'a')
  621. def test_process_events_when_hardlinked_into_maildir(self):
  622. self.watcher.start()
  623. self.assertQueueIsEmpty()
  624. self.create_mail('a', hardlink_from_tmp=True)
  625. self.watcher.process_events()
  626. self.assertEqual(len(self.queue.queue), 1)
  627. self.assertEqual(self.queue.queue[0].identifier, 'a')
  628. def test_process_events_does_not_block(self):
  629. self.watcher.start()
  630. before = datetime.now()
  631. self.watcher.process_events()
  632. after = datetime.now()
  633. delta = after - before
  634. self.assertLess(delta.total_seconds(), 1)
  635. def test_start_fails_when_dir_does_not_exist(self):
  636. os.rmdir(self.queue._get_maildir())
  637. with self.assertRaises(pyinotify.WatchManagerError):
  638. self.watcher.start()