optimize.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # This Source Code Form is subject to the terms of the Mozilla Public
  2. # License, v. 2.0. If a copy of the MPL was not distributed with this
  3. # file, You can obtain one at http://mozilla.org/MPL/2.0/.
  4. from __future__ import absolute_import, print_function, unicode_literals
  5. import logging
  6. import re
  7. from .graph import Graph
  8. from .taskgraph import TaskGraph
  9. from slugid import nice as slugid
  10. logger = logging.getLogger(__name__)
  11. TASK_REFERENCE_PATTERN = re.compile('<([^>]+)>')
  12. def optimize_task_graph(target_task_graph, params, do_not_optimize, existing_tasks=None):
  13. """
  14. Perform task optimization, without optimizing tasks named in
  15. do_not_optimize.
  16. """
  17. named_links_dict = target_task_graph.graph.named_links_dict()
  18. label_to_taskid = {}
  19. # This proceeds in two phases. First, mark all optimized tasks (those
  20. # which will be removed from the graph) as such, including a replacement
  21. # taskId where applicable. Second, generate a new task graph containing
  22. # only the non-optimized tasks, with all task labels resolved to taskIds
  23. # and with task['dependencies'] populated.
  24. annotate_task_graph(target_task_graph=target_task_graph,
  25. params=params,
  26. do_not_optimize=do_not_optimize,
  27. named_links_dict=named_links_dict,
  28. label_to_taskid=label_to_taskid,
  29. existing_tasks=existing_tasks)
  30. return get_subgraph(target_task_graph, named_links_dict, label_to_taskid), label_to_taskid
  31. def resolve_task_references(label, task_def, taskid_for_edge_name):
  32. def repl(match):
  33. key = match.group(1)
  34. try:
  35. return taskid_for_edge_name[key]
  36. except KeyError:
  37. # handle escaping '<'
  38. if key == '<':
  39. return key
  40. raise KeyError("task '{}' has no dependency named '{}'".format(label, key))
  41. def recurse(val):
  42. if isinstance(val, list):
  43. return [recurse(v) for v in val]
  44. elif isinstance(val, dict):
  45. if val.keys() == ['task-reference']:
  46. return TASK_REFERENCE_PATTERN.sub(repl, val['task-reference'])
  47. else:
  48. return {k: recurse(v) for k, v in val.iteritems()}
  49. else:
  50. return val
  51. return recurse(task_def)
  52. def annotate_task_graph(target_task_graph, params, do_not_optimize,
  53. named_links_dict, label_to_taskid, existing_tasks):
  54. """
  55. Annotate each task in the graph with .optimized (boolean) and .task_id
  56. (possibly None), following the rules for optimization and calling the task
  57. kinds' `optimize_task` method.
  58. As a side effect, label_to_taskid is updated with labels for all optimized
  59. tasks that are replaced with existing tasks.
  60. """
  61. # set .optimized for all tasks, and .task_id for optimized tasks
  62. # with replacements
  63. for label in target_task_graph.graph.visit_postorder():
  64. task = target_task_graph.tasks[label]
  65. named_task_dependencies = named_links_dict.get(label, {})
  66. # check whether any dependencies have been optimized away
  67. dependencies = [target_task_graph.tasks[l] for l in named_task_dependencies.itervalues()]
  68. for t in dependencies:
  69. if t.optimized and not t.task_id:
  70. raise Exception(
  71. "task {} was optimized away, but {} depends on it".format(
  72. t.label, label))
  73. # if this task is blacklisted, don't even consider optimizing
  74. replacement_task_id = None
  75. if label in do_not_optimize:
  76. optimized = False
  77. # Let's check whether this task has been created before
  78. elif existing_tasks is not None and label in existing_tasks:
  79. optimized = True
  80. replacement_task_id = existing_tasks[label]
  81. # otherwise, examine the task itself (which may be an expensive operation)
  82. else:
  83. optimized, replacement_task_id = task.optimize(params)
  84. task.optimized = optimized
  85. task.task_id = replacement_task_id
  86. if replacement_task_id:
  87. label_to_taskid[label] = replacement_task_id
  88. if optimized:
  89. if replacement_task_id:
  90. logger.debug("optimizing `{}`, replacing with task `{}`"
  91. .format(label, replacement_task_id))
  92. else:
  93. logger.debug("optimizing `{}` away".format(label))
  94. # note: any dependent tasks will fail when they see this
  95. else:
  96. if replacement_task_id:
  97. raise Exception("{}: optimize_task returned False with a taskId".format(label))
  98. def get_subgraph(annotated_task_graph, named_links_dict, label_to_taskid):
  99. """
  100. Return the subgraph of annotated_task_graph consisting only of
  101. non-optimized tasks and edges between them.
  102. To avoid losing track of taskIds for tasks optimized away, this method
  103. simultaneously substitutes real taskIds for task labels in the graph, and
  104. populates each task definition's `dependencies` key with the appropriate
  105. taskIds. Task references are resolved in the process.
  106. """
  107. # resolve labels to taskIds and populate task['dependencies']
  108. tasks_by_taskid = {}
  109. for label in annotated_task_graph.graph.visit_postorder():
  110. task = annotated_task_graph.tasks[label]
  111. if task.optimized:
  112. continue
  113. task.task_id = label_to_taskid[label] = slugid()
  114. named_task_dependencies = {
  115. name: label_to_taskid[label]
  116. for name, label in named_links_dict.get(label, {}).iteritems()}
  117. task.task = resolve_task_references(task.label, task.task, named_task_dependencies)
  118. task.task.setdefault('dependencies', []).extend(named_task_dependencies.itervalues())
  119. tasks_by_taskid[task.task_id] = task
  120. # resolve edges to taskIds
  121. edges_by_taskid = (
  122. (label_to_taskid.get(left), label_to_taskid.get(right), name)
  123. for (left, right, name) in annotated_task_graph.graph.edges
  124. )
  125. # ..and drop edges that are no longer in the task graph
  126. edges_by_taskid = set(
  127. (left, right, name)
  128. for (left, right, name) in edges_by_taskid
  129. if left in tasks_by_taskid and right in tasks_by_taskid
  130. )
  131. return TaskGraph(
  132. tasks_by_taskid,
  133. Graph(set(tasks_by_taskid), edges_by_taskid))