__init__.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # -*- coding: utf-8 -*-
  2. """
  3. (c) 2020 - Copyright ...
  4. Authors:
  5. zPlus <zplus@peers.community>
  6. """
  7. from .. import settings
  8. import celery
  9. import jinja2
  10. import logging
  11. import os
  12. import pagure.config
  13. import pagure.lib.model
  14. import pagure.lib.model_base
  15. # When the plugin is imported by Pagure, the logging module already has a handler
  16. # defined. When however the celery queue is started, it lives in its own process,
  17. # and there is not a handler. However, celery already has one handler set up
  18. # (see https://docs.celeryproject.org/en/latest/userguide/tasks.html#logging) so
  19. # we reuse that one for logging task execution.
  20. log = celery.utils.log.get_task_logger(__name__)
  21. log.setLevel(settings.LOG_LEVEL)
  22. broker_url = pagure.config.config.get('BROKER_URL', None)
  23. # Without a broker, forgefed cannot schedule/process any activity
  24. if not broker_url:
  25. log.critical('Broker not defined.')
  26. raise Exception('Broker not defined.')
  27. log.info('Using broker: ' + broker_url)
  28. # The Celery instance used to register forgefed tasks
  29. broker = celery.Celery('forgefed', broker=broker_url, backend=broker_url)
  30. broker.conf.update({
  31. **pagure.config.config["CELERY_CONFIG"],
  32. # These settings apply to all of our tasks by default.
  33. # See https://docs.celeryproject.org/en/latest/userguide/configuration.html#task-settings
  34. # and https://docs.celeryproject.org/en/stable/userguide/tasks.html#retrying
  35. # for more info.
  36. # TODO move these settings to config file.
  37. 'task_annotations': {
  38. '*': { 'queue': 'forgefed',
  39. 'autoretry_for': [ Exception ],
  40. 'retry_kwargs': { 'max_retries': 20 },
  41. 'retry_backoff': True, # Exponential backoff
  42. 'retry_backoff_max': 60*60*24, # Do not delay for more than 24h
  43. 'retry_jitter': True,
  44. #'rate_limit': '10/s'
  45. }}
  46. })
  47. class database_session:
  48. """
  49. Because tasks are used from the independent Celery workers, these functions
  50. do not have access to the database sessions that are automatically created
  51. at the beginning of HTTP requests within Flask. So the job of this
  52. Context Manager is to create a new database session that tasks can use.
  53. This class can be used like this:
  54. with database_session() as (pagure_db, forgefed_db):
  55. pagure_db.query()...
  56. forgefed_db.query()...
  57. ...
  58. NOTE An alternative way of obtaining the same behavior would be with a
  59. decorator that contains a try...except...finally block.
  60. """
  61. def __init__(self):
  62. self.pagure = None
  63. #self.forgefed = None
  64. def __enter__(self):
  65. self.pagure = pagure.lib.model_base.create_session(pagure.config.config['DB_URL'])
  66. #self.forgefed = database.start_database_session()
  67. #return (self.pagure, self.forgefed)
  68. return self.pagure
  69. def __exit__(self, exception_type, exception_value, exception_traceback):
  70. # If the task has raised an exception we need to rollback the session
  71. # first, in order not to leave uncommitted transactions hanging.
  72. if exception_type:
  73. self.pagure.rollback()
  74. #self.forgefed.rollback()
  75. else:
  76. self.pagure.commit()
  77. #self.forgefed.commit()
  78. # Close the database sessions
  79. self.pagure.remove()
  80. #self.forgefed.remove()
  81. # Optionally we can return True to prevent the exception from bubbling up.
  82. # However, because these are Celery tasks, Celery will automatically
  83. # retry the task after some time so it's OK to pass the exception up
  84. # to Celery.
  85. # return True
  86. from . import activities, delivery, notification