services.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # Copyright (C) 2014 Andrey Antukh <niwi@niwi.be>
  2. # Copyright (C) 2014 Jesús Espino <jespinog@gmail.com>
  3. # Copyright (C) 2014 David Barragán <bameda@dbarragan.com>
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as
  6. # published by the Free Software Foundation, either version 3 of the
  7. # License, or (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. import io
  17. import csv
  18. from taiga.base.utils import db, text
  19. from taiga.projects.history.services import take_snapshot
  20. from taiga.projects.tasks.apps import (
  21. connect_tasks_signals,
  22. disconnect_tasks_signals)
  23. from taiga.events import events
  24. from taiga.projects.votes import services as votes_services
  25. from . import models
  26. def get_tasks_from_bulk(bulk_data, **additional_fields):
  27. """Convert `bulk_data` into a list of tasks.
  28. :param bulk_data: List of tasks in bulk format.
  29. :param additional_fields: Additional fields when instantiating each task.
  30. :return: List of `Task` instances.
  31. """
  32. return [models.Task(subject=line, **additional_fields)
  33. for line in text.split_in_lines(bulk_data)]
  34. def create_tasks_in_bulk(bulk_data, callback=None, precall=None, **additional_fields):
  35. """Create tasks from `bulk_data`.
  36. :param bulk_data: List of tasks in bulk format.
  37. :param callback: Callback to execute after each task save.
  38. :param additional_fields: Additional fields when instantiating each task.
  39. :return: List of created `Task` instances.
  40. """
  41. tasks = get_tasks_from_bulk(bulk_data, **additional_fields)
  42. disconnect_tasks_signals()
  43. try:
  44. db.save_in_bulk(tasks, callback, precall)
  45. finally:
  46. connect_tasks_signals()
  47. return tasks
  48. def update_tasks_order_in_bulk(bulk_data:list, field:str, project:object):
  49. """
  50. Update the order of some tasks.
  51. `bulk_data` should be a list of tuples with the following format:
  52. [(<task id>, {<field>: <value>, ...}), ...]
  53. """
  54. task_ids = []
  55. new_order_values = []
  56. for task_data in bulk_data:
  57. task_ids.append(task_data["task_id"])
  58. new_order_values.append({field: task_data["order"]})
  59. events.emit_event_for_ids(ids=task_ids,
  60. content_type="tasks.task",
  61. projectid=project.pk)
  62. db.update_in_bulk_with_ids(task_ids, new_order_values, model=models.Task)
  63. def snapshot_tasks_in_bulk(bulk_data, user):
  64. task_ids = []
  65. for task_data in bulk_data:
  66. try:
  67. task = models.Task.objects.get(pk=task_data['task_id'])
  68. take_snapshot(task, user=user)
  69. except models.UserStory.DoesNotExist:
  70. pass
  71. def tasks_to_csv(project, queryset):
  72. csv_data = io.StringIO()
  73. fieldnames = ["ref", "subject", "description", "user_story", "milestone", "owner",
  74. "owner_full_name", "assigned_to", "assigned_to_full_name",
  75. "status", "is_iocaine", "is_closed", "us_order",
  76. "taskboard_order", "attachments", "external_reference", "tags",
  77. "watchers", "voters"]
  78. for custom_attr in project.taskcustomattributes.all():
  79. fieldnames.append(custom_attr.name)
  80. writer = csv.DictWriter(csv_data, fieldnames=fieldnames)
  81. writer.writeheader()
  82. for task in queryset:
  83. task_data = {
  84. "ref": task.ref,
  85. "subject": task.subject,
  86. "description": task.description,
  87. "user_story": task.user_story.ref if task.user_story else None,
  88. "milestone": task.milestone.name if task.milestone else None,
  89. "owner": task.owner.username,
  90. "owner_full_name": task.owner.get_full_name(),
  91. "assigned_to": task.assigned_to.username if task.assigned_to else None,
  92. "assigned_to_full_name": task.assigned_to.get_full_name() if task.assigned_to else None,
  93. "status": task.status.name,
  94. "is_iocaine": task.is_iocaine,
  95. "is_closed": task.status.is_closed,
  96. "us_order": task.us_order,
  97. "taskboard_order": task.taskboard_order,
  98. "attachments": task.attachments.count(),
  99. "external_reference": task.external_reference,
  100. "tags": ",".join(task.tags or []),
  101. "watchers": [u.id for u in task.get_watchers()],
  102. "voters": votes_services.get_voters(task).count(),
  103. }
  104. for custom_attr in project.taskcustomattributes.all():
  105. value = task.custom_attributes_values.attributes_values.get(str(custom_attr.id), None)
  106. task_data[custom_attr.name] = value
  107. writer.writerow(task_data)
  108. return csv_data