tasks.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. # Copyright 2013-2014 The Distro Tracker Developers
  2. # See the COPYRIGHT file at the top-level directory of this distribution and
  3. # at http://deb.li/DTAuthors
  4. #
  5. # This file is part of Distro Tracker. It is subject to the license terms
  6. # in the LICENSE file found in the top-level directory of this
  7. # distribution and at http://deb.li/DTLicense. No part of Distro Tracker,
  8. # including this file, may be copied, modified, propagated, or distributed
  9. # except according to the terms contained in the LICENSE file.
  10. """
  11. Implements a framework for implementing interdependent tasks.
  12. It provides a way to run all tasks dependent on the original task
  13. automatically.
  14. """
  15. from __future__ import unicode_literals
  16. from distro_tracker.core.utils.plugins import PluginRegistry
  17. from distro_tracker.core.utils.datastructures import DAG
  18. from distro_tracker.core.models import RunningJob
  19. from django.utils import six
  20. from django.conf import settings
  21. from collections import defaultdict
  22. import importlib
  23. import logging
  24. import sys
  25. logger = logging.getLogger('distro_tracker.tasks')
  26. class BaseTask(six.with_metaclass(PluginRegistry)):
  27. """
  28. A class representing the base class for all data processing tasks of
  29. Distro Tracker.
  30. Each task can produce or depend on certain events.
  31. The list :attr:`DEPENDS_ON_EVENTS` gives a list of events that, if raised
  32. during the processing of another task, cause this task to run as well.
  33. Events defined in the :attr:`PRODUCES_EVENTS` list are the ones this task
  34. is allowed to produce. Other tasks which depend on those events can then
  35. be registered.
  36. It is possible that the task does not produce all events given in this list
  37. in which case only tasks depending on the events which *were* raised are
  38. initiated afterwards.
  39. ..note::
  40. Subclasses of this class are automatically registered when created which
  41. allows the :class:`BaseTask` to have the full picture of all tasks and
  42. their mutual dependencies. However, to make sure the subclass is always
  43. loaded, make sure to place it in a ``tracker_tasks`` module at the top
  44. level of a Django app.
  45. """
  46. DEPENDS_ON_EVENTS = ()
  47. PRODUCES_EVENTS = ()
  48. @classmethod
  49. def task_name(cls):
  50. """
  51. The classmethod should return the name of the task.
  52. It can be given as a ``NAME`` class-level attribute or by overriding
  53. this classmethod.
  54. If none of those is done, the default value is the name of the class,
  55. i.e. the ``__name__`` attribute of the class.
  56. """
  57. if hasattr(cls, 'NAME'):
  58. return cls.NAME
  59. else:
  60. return cls.__name__
  61. def __init__(self, job=None):
  62. #: A flag signalling whether the task has received any events.
  63. #: A task with no received events does not need to run.
  64. self.event_received = False
  65. self._raised_events = []
  66. #: A reference to the job to which this task belongs, if any
  67. self.job = job
  68. def is_initial_task(self):
  69. """
  70. :returns True: If the task is the first task in a job (or if it's not
  71. part of a job).
  72. :returns False: If the task is not the first task in a job.
  73. """
  74. if self.job is None:
  75. return True
  76. return len(self.job.job_state.processed_tasks) == 0
  77. def execute(self):
  78. """
  79. Performs the actual processing of the task.
  80. This method must raise appropriate events by using the
  81. :meth:`raise_event` method during the processing so that tasks which are
  82. dependent on those events can be notified.
  83. """
  84. pass
  85. @property
  86. def raised_events(self):
  87. """
  88. :returns: Events which the task raised during its execution
  89. :rtype: ``iterable`` of :class:`Event`
  90. """
  91. return self._raised_events
  92. def raise_event(self, event_name, arguments=None):
  93. """
  94. Helper method which should be used by subclasses to signal that an
  95. event has been triggered.
  96. :param event_name: The name of the event to be raised.
  97. :type event_name: string
  98. :param arguments: Passed on to to the :class:`Event` instance's
  99. :attr:`arguments <Event.arguments>`. It becomes available to any
  100. tasks which receive the raised event.
  101. :type arguments: dict
  102. """
  103. self._raised_events.append(Event(event_name, arguments))
  104. def clear_events(self):
  105. """
  106. Clears all events the task raised.
  107. """
  108. self._raised_events = []
  109. def get_all_events(self):
  110. """
  111. Returns all events raised during the processing of a job which are
  112. relevant for this task.
  113. If the task is running independently of a job, an empty list is
  114. returned.
  115. """
  116. if self.job:
  117. return self.job.job_state.events_for_task(self)
  118. else:
  119. return []
  120. def set_parameters(self, parameters):
  121. """
  122. Allows clients to set additional task-specific parameters once a task
  123. is already created.
  124. :param parameters: The extra parameters.
  125. :type parameters: dict
  126. """
  127. pass
  128. @classmethod
  129. def get_task_class_by_name(cls, task_name):
  130. """
  131. Returns a :class:`BaseTask` subclass which has the given name, i.e. its
  132. :meth:`task_name` method returns the ``task_name`` given in the
  133. parameters.
  134. :param task_name: The name of the task which should be returned.
  135. """
  136. for task_class in cls.plugins:
  137. if task_class.task_name() == task_name:
  138. return task_class
  139. return None
  140. @classmethod
  141. def build_full_task_dag(cls):
  142. """
  143. A class method which builds a full :class:`TaskDAG` where only
  144. subclasses of ``cls`` are included in the DAG.
  145. If `cls` is :class:`BaseTask` then the DAG contains all tasks.
  146. The :class:`TaskDAG` instance represents the dependencies between
  147. :class:`BaseTask` subclasses based on the events they produce and
  148. depend on.
  149. :rtype: :class:`TaskDAG`
  150. """
  151. dag = TaskDAG()
  152. # Add all existing tasks to the dag.
  153. for task in BaseTask.plugins:
  154. if task is not cls and issubclass(task, cls):
  155. dag.add_task(task)
  156. # Create the edges of the graph by creating an edge between each pair of
  157. # tasks T1, T2 where T1 produces an event E and T2 depends on the event
  158. # E.
  159. from itertools import product as cross_product
  160. events = cls.build_task_event_dependency_graph()
  161. for event_producers, event_consumers in events.values():
  162. for task1, task2 in cross_product(event_producers, event_consumers):
  163. dag.add_dependency(task1, task2)
  164. return dag
  165. @classmethod
  166. def build_task_event_dependency_graph(cls):
  167. """
  168. Returns a dict mapping event names to a two-tuple of a list of task
  169. classes which produce the event and a list of task classes which depend
  170. on the event, respectively.
  171. Only tasks which are subclassed from `cls` are included.
  172. .. note::
  173. "Task classes" are all subclasses of :class:`BaseTask`
  174. """
  175. events = defaultdict(lambda: ([], []))
  176. for task in BaseTask.plugins:
  177. if task is cls or not issubclass(task, cls):
  178. continue
  179. for event in task.PRODUCES_EVENTS:
  180. events[event][0].append(task)
  181. for event in task.DEPENDS_ON_EVENTS:
  182. events[event][1].append(task)
  183. return events
  184. def log(self, message, *args, **kwargs):
  185. """Log a message about the progress of the task"""
  186. if 'level' in kwargs:
  187. level = kwargs['level']
  188. del kwargs['level']
  189. else:
  190. level = logging.INFO
  191. message = "{} {}".format(self.task_name(), message)
  192. logger.log(level, message, *args, **kwargs)
  193. class Event(object):
  194. """
  195. A class representing a particular event raised by a task.
  196. """
  197. def __init__(self, name, arguments=None):
  198. self.name = name
  199. self.arguments = arguments
  200. def __str__(self):
  201. return self.name
  202. def __repr__(self):
  203. return self.name
  204. class TaskDAG(DAG):
  205. """
  206. A :class:`DAG <distro_tracker.core.utils.datastructures.DAG>` subclass which
  207. exposes some methods specific for DAGs of dependent tasks.
  208. """
  209. @property
  210. def all_tasks(self):
  211. return self.all_nodes
  212. def all_dependent_tasks(self, task):
  213. """
  214. Returns all tasks that are dependent on the given ``task``.
  215. Effectively, this means all tasks reachable from this one in the DAG of
  216. tasks.
  217. :type task: :class:`BaseTask` subclass
  218. :rtype: ``list`` of :class:`BaseTask` subclasses
  219. """
  220. return self.nodes_reachable_from(task)
  221. def directly_dependent_tasks(self, task):
  222. """
  223. Returns only tasks which are directly dependent on the given ``task``
  224. This means all tasks to which this task has a direct edge
  225. (neighbour nodes).
  226. :type task: :class:`BaseTask` subclass
  227. :rtype: ``list`` of :class:`BaseTask` subclasses
  228. """
  229. return self.dependent_nodes(task)
  230. def remove_task(self, task):
  231. """
  232. Removes the given ``task`` from the DAG.
  233. :type task: :class:`BaseTask` subclass
  234. """
  235. return self.remove_node(task)
  236. def add_task(self, task):
  237. """
  238. Adds the given ``task`` to the DAG.
  239. :type task: :class:`BaseTask` subclass
  240. """
  241. return self.add_node(task)
  242. def add_dependency(self, task1, task2):
  243. """
  244. Registers the dependency between these two tasks.
  245. """
  246. return self.add_edge(task1, task2)
  247. class JobState(object):
  248. """
  249. Represents the current state of a running job.
  250. Provides a way to persist the state and reconstruct it in order to re-run
  251. failed tasks in a job.
  252. """
  253. def __init__(self, initial_task_name, additional_parameters=None):
  254. self.initial_task_name = initial_task_name
  255. self.additional_parameters = additional_parameters
  256. self.events = []
  257. self.processed_tasks = []
  258. self._running_job = None
  259. @classmethod
  260. def deserialize_running_job_state(cls, running_job):
  261. """
  262. Deserializes a :class:`RunningJob
  263. <distro_tracker.core.models.RunningJob>` instance and returns a matching
  264. :class:`JobState`.
  265. """
  266. instance = cls(running_job.initial_task_name)
  267. instance.additional_parameters = running_job.additional_parameters
  268. instance.events = [
  269. Event(name=event['name'], arguments=event.get('arguments', None))
  270. for event in running_job.state['events']
  271. ]
  272. instance.processed_tasks = running_job.state['processed_tasks']
  273. instance._running_job = running_job
  274. return instance
  275. def add_processed_task(self, task):
  276. """
  277. Marks a task as processed.
  278. :param task: The task which should be marked as processed
  279. :type task: :class:`BaseTask` subclass instance
  280. """
  281. self.events.extend(task.raised_events)
  282. self.processed_tasks.append(task.task_name())
  283. def save_state(self):
  284. """
  285. Saves the state to persistent storage.
  286. """
  287. state = {
  288. 'events': [
  289. {
  290. 'name': event.name,
  291. 'arguments': event.arguments,
  292. }
  293. for event in self.events
  294. ],
  295. 'processed_tasks': self.processed_tasks,
  296. }
  297. if not self._running_job:
  298. self._running_job = RunningJob(
  299. initial_task_name=self.initial_task_name,
  300. additional_parameters=self.additional_parameters)
  301. self._running_job.state = state
  302. self._running_job.save()
  303. def mark_as_complete(self):
  304. """
  305. Signals that the job is finished.
  306. """
  307. self._running_job.is_complete = True
  308. self.save_state()
  309. def events_for_task(self, task):
  310. """
  311. :param task: The task for which relevant :class:`Event` instances
  312. should be returned.
  313. :returns: Raised events which are relevant for the given ``task``
  314. :rtype: ``generator``
  315. """
  316. return (
  317. event
  318. for event in self.events
  319. if event.name in task.DEPENDS_ON_EVENTS
  320. )
  321. class Job(object):
  322. """
  323. A class used to initialize and run a set of interdependent tasks.
  324. """
  325. def __init__(self, initial_task, base_task_class=BaseTask):
  326. """
  327. Instantiates a new :class:`Job` instance based on the given
  328. ``initial_task``.
  329. The job constructs a :class:`TaskDAG` instance by using all
  330. possible dependencies between tasks.
  331. Tasks are run in toplogical sort order and it is left up to them to
  332. inspect the raised events and decide how to process them.
  333. .. note::
  334. "Task classes" are all subclasses of :class:`BaseTask`
  335. """
  336. # Build this job's DAG based on the full DAG of all tasks.
  337. self.job_dag = base_task_class.build_full_task_dag()
  338. # The full DAG contains dependencies between Task classes, but the job
  339. # needs to have Task instances, so it instantiates the Tasks dependent
  340. # on the initial task.
  341. reachable_tasks = self.job_dag.all_dependent_tasks(initial_task)
  342. for task_class in self.job_dag.all_tasks:
  343. if task_class is initial_task or task_class in reachable_tasks:
  344. task = task_class(job=self)
  345. if task_class is initial_task:
  346. # The initial task gets flagged with an event so that we
  347. # make sure that it is not skipped.
  348. task.event_received = True
  349. self.job_dag.replace_node(task_class, task)
  350. else:
  351. # Remove tasks which are not reachable from the initial task
  352. # from the job Tasks DAG, since those are in no way dependent
  353. # on it and will not need to run.
  354. self.job_dag.remove_task(task_class)
  355. self.job_state = JobState(initial_task.task_name())
  356. @classmethod
  357. def reconstruct_job_from_state(cls, job_state):
  358. """
  359. The method takes a :class:`JobState` and reconstructs a job if possible.
  360. :param job_state: The job state based on which the job should be
  361. reconstructed
  362. :type job_state: :class:`JobState`
  363. :returns: the reconstructed :class:`Job` instance.
  364. Calling the run method of this instance will continue execution of
  365. the job at the task following the last executed task in the job
  366. state.
  367. :rtype: :class:`Job`
  368. """
  369. job = cls(BaseTask.get_task_class_by_name(job_state.initial_task_name))
  370. job.job_state = job_state
  371. # Update the task instances event_received for all events which are
  372. # found in the job's state.
  373. raised_events_names = set(
  374. event.name
  375. for event in job_state.events
  376. )
  377. for task in job.job_dag.all_tasks:
  378. if task.event_received:
  379. continue
  380. for task_depends_event_name in task.DEPENDS_ON_EVENTS:
  381. if task_depends_event_name in raised_events_names:
  382. task.event_received = True
  383. break
  384. return job
  385. def _update_task_events(self, processed_task):
  386. """
  387. Performs an update of tasks in the job based on the events raised by
  388. ``processed_task``.
  389. Flags all tasks which are registered to depend on one of the raised
  390. events so that they are guaranteed to run.
  391. Tasks which are never flagged are skipped; there is no need to run them
  392. since no event they depend on was raised during the job's processing.
  393. :param processed_task: A finished task
  394. :type processed_task: :class:`BaseTask` subclass
  395. """
  396. event_names_raised = set(
  397. event.name
  398. for event in processed_task.raised_events
  399. )
  400. for dependent_task in \
  401. self.job_dag.directly_dependent_tasks(processed_task):
  402. if dependent_task.event_received:
  403. continue
  404. # Update this task's raised events.
  405. for event_name in event_names_raised:
  406. if event_name in dependent_task.DEPENDS_ON_EVENTS:
  407. dependent_task.event_received = True
  408. break
  409. def run(self, parameters=None):
  410. """
  411. Starts the Job processing.
  412. It runs all tasks which depend on the given initial task.
  413. :param parameters: Additional parameters which are given to each task
  414. before it is executed.
  415. """
  416. self.job_state.additional_parameters = parameters
  417. for task in self.job_dag.topsort_nodes():
  418. # This happens if the job was restarted. Skip such tasks since they
  419. # considered finish by this job. All its events will be propagated
  420. # to the following tasks correctly.
  421. if task.task_name() in self.job_state.processed_tasks:
  422. continue
  423. # A task does not need to run if none of the events it depends on
  424. # have been raised by this point.
  425. # If it's that task's turn in topological sort order when all
  426. # dependencies are used to construct the graph, it is guaranteed
  427. # that none of its dependencies will ever be raised since the tasks
  428. # which come afterwards do not raise any events which this task
  429. # depends on.
  430. # (Otherwise that task would have to be ahead of this one in the
  431. # topological sort order.)
  432. if task.event_received:
  433. # Run task
  434. try:
  435. # Inject additional parameters, if any
  436. if parameters:
  437. task.set_parameters(parameters)
  438. logger.info("Starting task {task}".format(
  439. task=task.task_name()))
  440. task.execute()
  441. logger.info("Successfully executed task {task}".format(
  442. task=task.task_name()))
  443. except Exception:
  444. logger.exception("Problem processing a task.")
  445. # Update dependent tasks based on events raised.
  446. # The update is performed regardless of a possible failure in
  447. # order not to miss some events.
  448. self._update_task_events(task)
  449. self.job_state.add_processed_task(task)
  450. self.job_state.save_state()
  451. self.job_state.mark_as_complete()
  452. logger.info("Finished all tasks")
  453. def clear_all_events_on_exception(func):
  454. """
  455. Decorator which makes sure that all events a task wanted to raise are
  456. cleared in case an exception is raised during its execution.
  457. This may not be what all tasks want so it is provided as a convenience
  458. decorator for those that do.
  459. """
  460. def wrapper(self):
  461. try:
  462. func(self)
  463. except Exception:
  464. self.clear_events()
  465. six.reraise(*sys.exc_info())
  466. return wrapper
  467. def import_all_tasks():
  468. """
  469. Imports tasks found in each installed app's ``tracker_tasks`` module.
  470. """
  471. for app in settings.INSTALLED_APPS:
  472. try:
  473. module_name = app + '.' + 'tracker_tasks'
  474. importlib.import_module(module_name)
  475. except ImportError:
  476. # The app does not implement Distro Tracker tasks.
  477. pass
  478. # This one is an exception, many core tasks are there
  479. import distro_tracker.core.retrieve_data # noqa
  480. def run_task(initial_task, parameters=None):
  481. """
  482. Receives a class of the task which should be executed and makes sure that
  483. all the tasks which have data dependencies on this task are ran after it.
  484. This is a convenience function which delegates this to a :class:`Job` class
  485. instance.
  486. :param initial_task: The task which should be run. Either the class object
  487. of the task or a string giving the task's name.
  488. :type initial_task: :class:`BaseTask` subclass or :class:`string`
  489. :param parameters: Additional parameters which are given to each task
  490. before it is executed.
  491. """
  492. # Import tasks implemented by all installed apps
  493. import_all_tasks()
  494. if isinstance(initial_task, six.text_type):
  495. task_name = initial_task
  496. initial_task = BaseTask.get_task_class_by_name(initial_task)
  497. if not initial_task:
  498. raise ValueError("Task '%s' doesn't exist." % task_name)
  499. job = Job(initial_task)
  500. return job.run(parameters)
  501. def run_all_tasks(parameters=None):
  502. """
  503. Runs all registered tasks which do not have any dependencies.
  504. :param parameters: Additional parameters which are given to each task
  505. before it is executed.
  506. """
  507. import_all_tasks()
  508. for task in BaseTask.plugins:
  509. if task is BaseTask:
  510. continue
  511. if not task.DEPENDS_ON_EVENTS:
  512. logger.info("Starting task %s", task.task_name())
  513. run_task(task)
  514. def continue_task_from_state(job_state):
  515. """
  516. Continues execution of a job from the last point in the given ``job_state``
  517. :param job_state: The state of the job from which it should be continued
  518. :type job_state: :class:`JobState`
  519. """
  520. job = Job.reconstruct_job_from_state(job_state)
  521. return job.run(job_state.additional_parameters)