__init__.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. """
  2. ForgeFed plugin for Pagure.
  3. Copyright (C) 2020-2021 zPlus <zplus@peers.community>
  4. This program is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published by
  6. the Free Software Foundation; either version 2 of the License, or
  7. (at your option) any later version.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU General Public License for more details.
  12. You should have received a copy of the GNU General Public License along
  13. with this program; if not, see <https://www.gnu.org/licenses/>.
  14. SPDX-FileCopyrightText: 2020-2021 zPlus <zplus@peers.community>
  15. SPDX-License-Identifier: GPL-2.0-only
  16. """
  17. from .. import settings
  18. import celery
  19. import logging
  20. import pagure.config
  21. import pagure.lib.model_base
  22. # When the plugin is imported by Pagure, the logging module already has a handler
  23. # defined. When however the celery queue is started, it lives in its own process,
  24. # and there is not a handler. However, celery already has one handler set up
  25. # (see https://docs.celeryproject.org/en/latest/userguide/tasks.html#logging) so
  26. # we reuse that one for logging task execution.
  27. log = celery.utils.log.get_task_logger(__name__)
  28. log.setLevel(settings.LOG_LEVEL)
  29. broker_url = pagure.config.config.get('BROKER_URL', None)
  30. # Without a broker, forgefed cannot schedule/process any activity
  31. if not broker_url:
  32. log.critical('Broker not defined.')
  33. raise Exception('Broker not defined.')
  34. log.info('Using broker: ' + broker_url)
  35. # The Celery instance used to register forgefed tasks
  36. broker = celery.Celery('forgefed', broker=broker_url, backend=broker_url)
  37. broker.conf.update({
  38. **pagure.config.config["CELERY_CONFIG"],
  39. # These settings apply to all of our tasks by default.
  40. # See https://docs.celeryproject.org/en/latest/userguide/configuration.html#task-settings
  41. # and https://docs.celeryproject.org/en/stable/userguide/tasks.html#retrying
  42. # for more info.
  43. # TODO move these settings to config file.
  44. 'task_annotations': {
  45. '*': { 'queue': 'forgefed',
  46. 'autoretry_for': [ Exception ],
  47. 'retry_kwargs': { 'max_retries': 20 },
  48. 'retry_backoff': True, # Exponential backoff
  49. 'retry_backoff_max': 60*60*24, # Do not delay for more than 24h
  50. 'retry_jitter': True,
  51. #'rate_limit': '10/s'
  52. }}
  53. })
  54. class database_session:
  55. """
  56. Because tasks are used from the independent Celery workers, these functions
  57. do not have access to the database sessions that are automatically created
  58. at the beginning of HTTP requests within Flask. So the job of this
  59. Context Manager is to create a new database session that tasks can use.
  60. This class can be used like this:
  61. with database_session() as (pagure_db, forgefed_db):
  62. pagure_db.query()...
  63. forgefed_db.query()...
  64. ...
  65. NOTE An alternative way of obtaining the same behavior would be with a
  66. decorator that contains a try...except...finally block.
  67. """
  68. def __init__(self):
  69. self.session = None
  70. def __enter__(self):
  71. self.session = pagure.lib.model_base.create_session(pagure.config.config['DB_URL'])
  72. return self.session
  73. def __exit__(self, exception_type, exception_value, exception_traceback):
  74. # If the task has raised an exception we need to rollback the session
  75. # first, in order not to leave uncommitted transactions hanging.
  76. if exception_type:
  77. self.session.rollback()
  78. else:
  79. self.session.commit()
  80. # Close the database sessions
  81. self.session.remove()
  82. # Optionally we can return True to prevent the exception from bubbling up.
  83. # However, because these are Celery tasks, Celery will automatically
  84. # retry the task after some time so it's OK to pass the exception up
  85. # to Celery.
  86. # return True
  87. from . import activities, delivery, notification