tests_tasks.py 23 KB


  1. # -*- coding: utf-8 -*-
  2. # Copyright 2013 The Distro Tracker Developers
  3. # See the COPYRIGHT file at the top-level directory of this distribution and
  4. # at http://deb.li/DTAuthors
  5. #
  6. # This file is part of Distro Tracker. It is subject to the license terms
  7. # in the LICENSE file found in the top-level directory of this
  8. # distribution and at http://deb.li/DTLicense. No part of Distro Tracker,
  9. # including this file, may be copied, modified, propagated, or distributed
  10. # except according to the terms contained in the LICENSE file.
  11. """
  12. Tests for the Distro Tracker core's tasks framework.
  13. """
  14. from __future__ import unicode_literals
  15. from distro_tracker.test import TestCase
  16. from django.utils.six.moves import mock
  17. from distro_tracker.core.models import RunningJob
  18. from distro_tracker.core.tasks import BaseTask
  19. from distro_tracker.core.tasks import Event
  20. from distro_tracker.core.tasks import JobState
  21. from distro_tracker.core.tasks import run_task, continue_task_from_state
  22. from distro_tracker.core.tasks import run_all_tasks
  23. import logging
  24. logging.disable(logging.CRITICAL)
  25. # Don't let any other module's tests be loaded.
  26. @mock.patch('distro_tracker.core.tasks.import_all_tasks')
  27. class JobTests(TestCase):
  28. def create_task_class(self, produces, depends_on, raises, fail=False):
  29. """
  30. Helper method creates and returns a new BaseTask subclass.
  31. """
  32. self._created_task_count += 1
  33. exec_list = self.execution_list
  34. class TestTask(BaseTask):
  35. PRODUCES_EVENTS = produces
  36. DEPENDS_ON_EVENTS = depends_on
  37. NAME = 'a' * self._created_task_count
  38. def execute(self):
  39. for event in raises:
  40. self.raise_event(event)
  41. exec_list.append(self.__class__)
  42. if fail:
  43. raise Exception("This task fails")
  44. return TestTask
  45. def assert_contains_all(self, items, container):
  46. """
  47. Asserts that all of the given items are found in the given container.
  48. """
  49. for item in items:
  50. self.assertIn(item, container)
  51. def setUp(self):
  52. #: Tasks which execute add themselves to this list.
  53. self._created_task_count = 0
  54. self.execution_list = []
  55. self.original_plugins = [
  56. plugin
  57. for plugin in BaseTask.plugins
  58. ]
  59. # Now ignore all original plugins.
  60. BaseTask.plugins = [BaseTask]
  61. def assert_executed_tasks_equal(self, expected_tasks):
  62. """
  63. Helper method which checks whether the given list of expected tasks
  64. matches the actual list of executed tasks.
  65. """
  66. self.assertEqual(len(expected_tasks), len(self.execution_list))
  67. self.assert_contains_all(expected_tasks, self.execution_list)
  68. def assert_task_dependency_preserved(self, task, dependent_tasks):
  69. """
  70. Helper method which cheks whether the given dependent tasks were
  71. executed after their dependency was satisifed.
  72. """
  73. task_index = self.execution_list.index(task)
  74. for task in dependent_tasks:
  75. self.assertTrue(self.execution_list.index(task) > task_index)
  76. def tearDown(self):
  77. # Remove any extra plugins which may have been created during a test run
  78. BaseTask.plugins = self.original_plugins
  79. def test_simple_dependency(self, *args, **kwargs):
  80. """
  81. Tests creating a DAG of task dependencies when there is only one event
  82. """
  83. A = self.create_task_class(('a',), (), ('a',))
  84. B = self.create_task_class((), ('a',), ())
  85. # Is the event dependency built correctly
  86. events = BaseTask.build_task_event_dependency_graph()
  87. self.assertEqual(len(events), 1)
  88. self.assertEqual(len(events['a'][0]), 1)
  89. self.assertIn(A, events['a'][0])
  90. self.assertEqual(len(events['a'][1]), 1)
  91. self.assertIn(B, events['a'][1])
  92. # Is the DAG built correctly
  93. g = BaseTask.build_full_task_dag()
  94. self.assertEqual(len(g.all_nodes), 2)
  95. self.assertIn(A, g.all_nodes)
  96. self.assertIn(B, g.all_nodes)
  97. # B depends on A
  98. self.assertIn(B, g.dependent_nodes(A))
  99. def test_multiple_dependency(self, *args, **kwargs):
  100. """
  101. Tests creating a DAG of tasks dependencies when there are multiple
  102. events.
  103. """
  104. T0 = self.create_task_class(('A', 'B'), (), ('A',))
  105. T1 = self.create_task_class(('D', 'D1'), ('A',), ('D'))
  106. T2 = self.create_task_class(('C',), ('A',), ('C',))
  107. T3 = self.create_task_class(('E',), ('B',), ('E',))
  108. T4 = self.create_task_class((), ('B',), ())
  109. T5 = self.create_task_class(('evt-5',), ('D',), ('evt-5',))
  110. T6 = self.create_task_class(('evt-6',), ('C'), ('evt-6',))
  111. T7 = self.create_task_class((), ('D1', 'A'), ())
  112. T8 = self.create_task_class((), ('evt-5', 'evt-6', 'E'), ())
  113. g = BaseTask.build_full_task_dag()
  114. self.assertEqual(len(g.dependent_nodes(T0)), 5)
  115. self.assert_contains_all([T1, T2, T3, T4, T7], g.dependent_nodes(T0))
  116. self.assertEqual(len(g.dependent_nodes(T1)), 2)
  117. self.assert_contains_all([T5, T7], g.dependent_nodes(T1))
  118. self.assertEqual(len(g.dependent_nodes(T2)), 1)
  119. self.assert_contains_all([T6], g.dependent_nodes(T2))
  120. self.assertEqual(len(g.dependent_nodes(T3)), 1)
  121. self.assert_contains_all([T8], g.dependent_nodes(T3))
  122. self.assertEqual(len(g.dependent_nodes(T4)), 0)
  123. self.assertEqual(len(g.dependent_nodes(T5)), 1)
  124. self.assert_contains_all([T8], g.dependent_nodes(T5))
  125. self.assertEqual(len(g.dependent_nodes(T6)), 1)
  126. self.assert_contains_all([T8], g.dependent_nodes(T6))
  127. self.assertEqual(len(g.dependent_nodes(T7)), 0)
  128. self.assertEqual(len(g.dependent_nodes(T8)), 0)
  129. def test_run_job_simple(self, *args, **kwargs):
  130. """
  131. Tests running a job consisting of a simple dependency.
  132. """
  133. A = self.create_task_class(('a',), (), ('a',))
  134. B = self.create_task_class((), ('a',), ())
  135. run_task(A)
  136. self.assert_executed_tasks_equal([A, B])
  137. self.assert_task_dependency_preserved(A, [B])
  138. def test_run_job_by_task_name(self, *args, **kwargs):
  139. """
  140. Tests that the :func:`distro_tracker.core.tasks.run_task` function
  141. correctly runs a task when given its name, not a task class object.
  142. """
  143. A = self.create_task_class(('a',), (), ('a',))
  144. B = self.create_task_class((), ('a',), ())
  145. run_task(A.task_name())
  146. self.assert_executed_tasks_equal([A, B])
  147. self.assert_task_dependency_preserved(A, [B])
  148. def test_run_job_no_dependency(self, *args, **kwargs):
  149. """
  150. Tests running a job consisting of no dependencies.
  151. """
  152. self.create_task_class(('a',), (), ('a',))
  153. B = self.create_task_class(('b',), (), ('b',))
  154. run_task(B)
  155. self.assert_executed_tasks_equal([B])
  156. def test_run_job_no_events_emitted(self, *args, **kwargs):
  157. """
  158. Tests running a job consisting of a simple dependency, but the event is
  159. not emitted during execution.
  160. """
  161. A = self.create_task_class(('a',), (), ())
  162. self.create_task_class((), ('a',), ())
  163. run_task(A)
  164. self.assert_executed_tasks_equal([A])
  165. def test_run_job_complex_1(self, *args, **kwargs):
  166. """
  167. Tests running a job consisting of complex dependencies.
  168. """
  169. T0 = self.create_task_class(('A', 'B'), (), ('A',))
  170. T1 = self.create_task_class(('D', 'D1'), ('A',), ('D'))
  171. T2 = self.create_task_class(('C',), ('A',), ('C',))
  172. self.create_task_class(('E',), ('B',), ('E',)) # T3
  173. self.create_task_class((), ('B',), ()) # T4
  174. T5 = self.create_task_class(('evt-5',), ('D',), ('evt-5',))
  175. T6 = self.create_task_class(('evt-6',), ('C'), ('evt-6',))
  176. T7 = self.create_task_class((), ('D1', 'A'), ())
  177. T8 = self.create_task_class((), ('evt-5', 'evt-6', 'E'), ())
  178. run_task(T0)
  179. # Make sure the tasks which didn't have the appropriate events raised
  180. # during execution were not executed. These are tasks T3 and T4 in this
  181. # instance.
  182. self.assert_executed_tasks_equal([T0, T1, T2, T5, T6, T7, T8])
  183. # Check execution order.
  184. self.assert_task_dependency_preserved(T0, [T1, T2, T7])
  185. # Even though task T1 does not emit the event D1, it still needs to
  186. # execute before task T7.
  187. self.assert_task_dependency_preserved(T1, [T5, T7])
  188. self.assert_task_dependency_preserved(T2, [T6])
  189. self.assert_task_dependency_preserved(T5, [T8])
  190. self.assert_task_dependency_preserved(T6, [T8])
  191. def test_run_job_complex_2(self, *args, **kwargs):
  192. """
  193. Tests running a job consisting of complex dependencies.
  194. """
  195. T0 = self.create_task_class(('A', 'B'), (), ('B',))
  196. self.create_task_class(('D', 'D1'), ('A',), ('D')) # T1
  197. self.create_task_class(('C',), ('A',), ('C',)) # T2
  198. T3 = self.create_task_class(('E',), ('B',), ('E',))
  199. T4 = self.create_task_class((), ('B',), ())
  200. self.create_task_class(('evt-5',), ('D',), ('evt-5',)) # T5
  201. self.create_task_class(('evt-6',), ('C'), ('evt-6',)) # T6
  202. self.create_task_class((), ('D1', 'A'), ()) # T7
  203. T8 = self.create_task_class((), ('evt-5', 'evt-6', 'E'), ())
  204. run_task(T0)
  205. # In this test case, unlike test_run_job_complex_1, T0 emits event B so
  206. # no tasks depending on event A need to run.
  207. self.assert_executed_tasks_equal([T0, T3, T4, T8])
  208. # Check execution order.
  209. self.assert_task_dependency_preserved(T0, [T3, T4])
  210. self.assert_task_dependency_preserved(T3, [T8])
  211. def test_run_job_complex_3(self, *args, **kwargs):
  212. """
  213. Tests running a job consisting of complex dependencies.
  214. """
  215. T0 = self.create_task_class(('A', 'B', 'B1'), (), ('B', 'B1'))
  216. self.create_task_class(('D', 'D1'), ('A',), ('D')) # T1
  217. self.create_task_class(('C',), ('A',), ('C',)) # T2
  218. T3 = self.create_task_class(('E',), ('B',), ('E',))
  219. T4 = self.create_task_class((), ('B',), ())
  220. self.create_task_class(('evt-5',), ('D',), ('evt-5',)) # T5
  221. self.create_task_class(('evt-6',), ('C'), ('evt-6',)) # T6
  222. T7 = self.create_task_class((), ('D1', 'A', 'B1'), ())
  223. T8 = self.create_task_class((), ('evt-5', 'evt-6', 'E'), ())
  224. run_task(T0)
  225. self.assert_executed_tasks_equal([T0, T3, T4, T7, T8])
  226. self.assert_task_dependency_preserved(T0, [T3, T4, T7])
  227. self.assert_task_dependency_preserved(T3, [T8])
  228. def test_run_job_complex_4(self, *args, **kwargs):
  229. """
  230. Tests running a job consisting of complex dependencies when the initial
  231. task is not the task which has 0 dependencies in the full tasks DAG.
  232. """
  233. self.create_task_class(('A', 'B'), (), ('B',)) # T0
  234. T1 = self.create_task_class(('D', 'D1'), ('A',), ('D'))
  235. self.create_task_class(('C',), ('A',), ('C',)) # T2
  236. self.create_task_class(('E',), ('B',), ('E',)) # T3
  237. self.create_task_class((), ('B',), ()) # T4
  238. T5 = self.create_task_class(('evt-5',), ('D',), ('evt-5',))
  239. self.create_task_class(('evt-6',), ('C'), ('evt-6',)) # T6
  240. self.create_task_class((), ('D1', 'A'), ()) # T7
  241. T8 = self.create_task_class((), ('evt-5', 'evt-6', 'E'), ())
  242. run_task(T1)
  243. self.assert_executed_tasks_equal([T1, T5, T8])
  244. def test_run_job_complex_5(self, *args, **kwargs):
  245. """
  246. Tests running a job consisting of complex dependencies when the initial
  247. task is not the task which has 0 dependencies in the full tasks DAG.
  248. """
  249. self.create_task_class(('A', 'B'), (), ('B',)) # T0
  250. T1 = self.create_task_class(('D', 'D1'), ('A',), ('D', 'D1'))
  251. self.create_task_class(('C',), ('A',), ('C',)) # T2
  252. self.create_task_class(('E',), ('B',), ('E',)) # T3
  253. self.create_task_class((), ('B',), ()) # T4
  254. T5 = self.create_task_class(('evt-5',), ('D',), ('evt-5',))
  255. self.create_task_class(('evt-6',), ('C'), ('evt-6',)) # T6
  256. T7 = self.create_task_class((), ('D1', 'A'), ())
  257. T8 = self.create_task_class((), ('evt-5', 'evt-6', 'E'), ())
  258. run_task(T1)
  259. self.assert_executed_tasks_equal([T1, T5, T8, T7])
  260. self.assert_task_dependency_preserved(T1, [T7, T5])
  261. self.assert_task_dependency_preserved(T5, [T8])
  262. def test_run_all_tasks(self, *args, **kwargs):
  263. """
  264. Tests that all tasks are ran by calling the
  265. :func:`distro_tracker.core.tasks.run_all_tasks` function.
  266. """
  267. dependent_tasks = [
  268. self.create_task_class((), ('A',), ()),
  269. self.create_task_class((), ('B',), ()),
  270. ]
  271. independent_tasks = [
  272. self.create_task_class(('A',), (), ('A',)),
  273. self.create_task_class(('B',), (), ()),
  274. ]
  275. run_all_tasks()
  276. # All independent tasks were ran, along with the task whose dependency
  277. # was satisfied.
  278. self.assert_executed_tasks_equal(
  279. independent_tasks + [dependent_tasks[0]])
  280. # Makes sure the depenent task was executed after the dependency...
  281. self.assert_task_dependency_preserved(
  282. independent_tasks[0],
  283. [dependent_tasks[0]])
  284. def test_run_job_with_fail_task(self, *args, **kwargs):
  285. """
  286. Tests that running a job where one task fails works as expected.
  287. """
  288. fail_task = self.create_task_class(('fail',), (), ('fail'), fail=True)
  289. run_task(fail_task)
  290. # The job has gracefully exited without raising an exception.
  291. self.assert_executed_tasks_equal([fail_task])
  292. def test_run_job_with_fail_task_dependency(self, *args, **kwargs):
  293. """
  294. Tests that even though a task has failed, any events it raised while
  295. running affect the rest of the tasks.
  296. """
  297. root_task = self.create_task_class(('A',), (), ('A',))
  298. fail_task = self.create_task_class(('fail',), ('A',), ('fail',), True)
  299. depends_on_fail = self.create_task_class((), ('fail',), ())
  300. do_run = self.create_task_class((), ('A',), ())
  301. run_task(root_task)
  302. self.assert_executed_tasks_equal(
  303. [root_task, fail_task, depends_on_fail, do_run]
  304. )
  305. class JobPersistenceTests(TestCase):
  306. def create_mock_event(self, event_name, event_arguments=None):
  307. mock_event = mock.create_autospec(Event)
  308. mock_event.name = event_name
  309. mock_event.arguments = event_arguments
  310. return mock_event
  311. def create_mock_task(self, task_name, events=()):
  312. mock_task = mock.create_autospec(BaseTask)
  313. mock_task.task_name.return_value = task_name
  314. mock_task.raised_events = [
  315. self.create_mock_event(event['name'], event.get('arguments', None))
  316. for event in events
  317. ]
  318. return mock_task
  319. def test_serialize_start(self):
  320. """
  321. Tests serializing a job's state to a RunningJob instance.
  322. """
  323. state = JobState('initial-task-name')
  324. state.save_state()
  325. # A running job was created.
  326. self.assertEqual(RunningJob.objects.count(), 1)
  327. job = RunningJob.objects.all()[0]
  328. self.assertEqual(job.initial_task_name, 'initial-task-name')
  329. self.assertIsNone(job.additional_parameters)
  330. self.assertFalse(job.is_complete)
  331. def test_serialize_after_processed_task(self):
  332. """
  333. Tests serializing a job's state to a RunningJob instance.
  334. """
  335. task_name = 'task-1'
  336. state = JobState(task_name)
  337. state.save_state()
  338. expected_events = [
  339. {
  340. 'name': 'event-1',
  341. 'arguments': ['a', 'b'],
  342. },
  343. {
  344. 'name': 'event-2',
  345. 'arguments': None,
  346. }
  347. ]
  348. mock_task = self.create_mock_task(task_name, expected_events)
  349. state.add_processed_task(mock_task)
  350. state.save_state()
  351. # Stil only one running job instance
  352. self.assertEqual(RunningJob.objects.count(), 1)
  353. job = RunningJob.objects.all()[0]
  354. self.assertSequenceEqual(job.state['events'], expected_events)
  355. self.assertSequenceEqual(job.state['processed_tasks'], [task_name])
  356. self.assertFalse(job.is_complete)
  357. def test_serialize_after_finish(self):
  358. """
  359. Tests serializing a job's state to a RunningJob instance.
  360. """
  361. task_name = 'task-1'
  362. state = JobState(task_name)
  363. state.save_state()
  364. expected_events = [
  365. {
  366. 'name': 'event-1',
  367. 'arguments': ['a', 'b'],
  368. },
  369. {
  370. 'name': 'event-2',
  371. 'arguments': None,
  372. }
  373. ]
  374. mock_task = self.create_mock_task(task_name, expected_events)
  375. state.add_processed_task(mock_task)
  376. state.save_state()
  377. state.mark_as_complete()
  378. # Stil only one running job instance
  379. self.assertEqual(RunningJob.objects.count(), 1)
  380. job = RunningJob.objects.all()[0]
  381. self.assertSequenceEqual(job.state['events'], expected_events)
  382. self.assertSequenceEqual(job.state['processed_tasks'], [task_name])
  383. self.assertTrue(job.is_complete)
  384. def test_serialize_after_update(self):
  385. """
  386. Tests serializing a job's state after multiple tasks have finished.
  387. """
  388. task_names = ['task-1', 'task-2']
  389. state = JobState(task_names[0])
  390. state.save_state()
  391. expected_events = [
  392. {
  393. 'name': 'event-1',
  394. 'arguments': {
  395. 'a': 1,
  396. 'b': '2'
  397. },
  398. },
  399. {
  400. 'name': 'event-2',
  401. 'arguments': None,
  402. }
  403. ]
  404. mock_task_1 = self.create_mock_task(task_names[0], [expected_events[0]])
  405. state.add_processed_task(mock_task_1)
  406. state.save_state()
  407. mock_task_2 = self.create_mock_task(task_names[1], [expected_events[1]])
  408. state.add_processed_task(mock_task_2)
  409. state.save_state()
  410. # Stil only one running job instance
  411. self.assertEqual(RunningJob.objects.count(), 1)
  412. job = RunningJob.objects.all()[0]
  413. # All events found now
  414. self.assertSequenceEqual(job.state['events'], expected_events)
  415. # Both tasks processed
  416. self.assertSequenceEqual(job.state['processed_tasks'], task_names)
  417. self.assertFalse(job.is_complete)
  418. def test_deserialize(self):
  419. """
  420. Tests deserializing a RunningJob instance to a JobState.
  421. """
  422. initial_task_name = 'initial-task'
  423. additional_parameters = {
  424. 'param1': 1
  425. }
  426. job = RunningJob.objects.create(
  427. initial_task_name=initial_task_name,
  428. additional_parameters=additional_parameters)
  429. processed_tasks = ['initial-task', 'task-1']
  430. job.state = {
  431. 'events': [
  432. {
  433. 'name': 'event-1',
  434. },
  435. {
  436. 'name': 'event-2',
  437. 'arguments': {
  438. 'a': 1,
  439. 'b': '2'
  440. }
  441. }
  442. ],
  443. 'processed_tasks': processed_tasks
  444. }
  445. job.save()
  446. state = JobState.deserialize_running_job_state(job)
  447. self.assertEqual(state.initial_task_name, 'initial-task')
  448. self.assertEqual(state.additional_parameters, additional_parameters)
  449. self.assertEqual(state.processed_tasks, processed_tasks)
  450. self.assertEqual(len(state.events), 2)
  451. self.assertEqual(state.events[0].name, 'event-1')
  452. self.assertIsNone(state.events[0].arguments)
  453. self.assertEqual(state.events[1].arguments, {
  454. 'a': 1,
  455. 'b': '2'
  456. })
  457. self.assertEqual(state._running_job, job)
  458. class ContinuePersistedJobsTest(TestCase):
  459. def setUp(self):
  460. #: Tasks which execute add themselves to this list.
  461. self._created_task_count = 0
  462. self.execution_list = []
  463. self.original_plugins = [
  464. plugin
  465. for plugin in BaseTask.plugins
  466. ]
  467. # Now ignore all original plugins.
  468. BaseTask.plugins = []
  469. def tearDown(self):
  470. # Remove any extra plugins which may have been created during a test run
  471. BaseTask.plugins = self.original_plugins
  472. def clear_executed_tasks_list(self):
  473. self.execution_list[:] = []
  474. def assert_task_ran(self, task):
  475. self.assertIn(task, self.execution_list)
  476. def create_task_class(self, produces, depends_on, raises, fail=False):
  477. """
  478. Helper method creates and returns a new BaseTask subclass.
  479. """
  480. self._created_task_count += 1
  481. exec_list = self.execution_list
  482. class TestTask(BaseTask):
  483. PRODUCES_EVENTS = produces
  484. DEPENDS_ON_EVENTS = depends_on
  485. NAME = 'a' * self._created_task_count
  486. def execute(self):
  487. for event in raises:
  488. self.raise_event(event)
  489. exec_list.append(self.__class__)
  490. if fail:
  491. raise Exception("This task fails")
  492. return TestTask
  493. def test_continue_job_no_start(self):
  494. """
  495. Tests continuing a job from a job state which is only at the beginning.
  496. """
  497. task1 = self.create_task_class(('a',), (), ('a',))
  498. job_state = JobState(task1.task_name())
  499. continue_task_from_state(job_state)
  500. self.assert_task_ran(task1)
  501. def test_continue_job_started(self):
  502. """
  503. Tests continuing a job from a job state which has only just started
  504. (no tasks complete yet).
  505. """
  506. task1 = self.create_task_class(('a',), (), ('a',))
  507. job_state = JobState(task1.task_name())
  508. continue_task_from_state(job_state)
  509. self.assert_task_ran(task1)
  510. def test_continue_job_some_finished(self):
  511. """
  512. Tests continuing a job from a job state where a task has finished.
  513. """
  514. task1 = self.create_task_class(('a',), (), ('a',))
  515. task2 = self.create_task_class((), ('a',), ())
  516. job_state = JobState(task1.task_name())
  517. task1_instance = task1()
  518. task1_instance.execute()
  519. job_state.add_processed_task(task1_instance)
  520. job_state.save_state()
  521. self.clear_executed_tasks_list()
  522. continue_task_from_state(job_state)
  523. # Only one task ran
  524. self.assertEqual(len(self.execution_list), 1)
  525. # It was the one that was not completed before the continue
  526. self.assert_task_ran(task2)
  527. def test_continue_job_finished(self):
  528. """
  529. Tests continuing a job from a job state where the job was finished.
  530. """
  531. task1 = self.create_task_class(('a',), (), ('a',))
  532. job_state = JobState(task1.task_name())
  533. task1_instance = task1()
  534. task1_instance.execute()
  535. job_state.add_processed_task(task1_instance)
  536. job_state.save_state()
  537. self.clear_executed_tasks_list()
  538. continue_task_from_state(job_state)
  539. # No tasks were ran from the continue
  540. self.assertEqual(len(self.execution_list), 0)