123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- """
- ForgeFed plugin for Pagure.
- Copyright (C) 2020-2021 zPlus <zplus@peers.community>
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License along
- with this program; if not, see <https://www.gnu.org/licenses/>.
- SPDX-FileCopyrightText: 2020-2021 zPlus <zplus@peers.community>
- SPDX-License-Identifier: GPL-2.0-only
- """
- from .. import settings
- import celery
- import logging
- import pagure.config
- import pagure.lib.model_base
- # When the plugin is imported by Pagure, the logging module already has a handler
- # defined. When however the celery queue is started, it lives in its own process,
- # and there is not a handler. However, celery already has one handler set up
- # (see https://docs.celeryproject.org/en/latest/userguide/tasks.html#logging) so
- # we reuse that one for logging task execution.
- log = celery.utils.log.get_task_logger(__name__)
- log.setLevel(settings.LOG_LEVEL)
- broker_url = pagure.config.config.get('BROKER_URL', None)
- # Without a broker, forgefed cannot schedule/process any activity
- if not broker_url:
- log.critical('Broker not defined.')
- raise Exception('Broker not defined.')
- log.info('Using broker: ' + broker_url)
- # The Celery instance used to register forgefed tasks
- broker = celery.Celery('forgefed', broker=broker_url, backend=broker_url)
- broker.conf.update({
- **pagure.config.config["CELERY_CONFIG"],
- # These settings apply to all of our tasks by default.
- # See https://docs.celeryproject.org/en/latest/userguide/configuration.html#task-settings
- # and https://docs.celeryproject.org/en/stable/userguide/tasks.html#retrying
- # for more info.
- # TODO move these settings to config file.
- 'task_annotations': {
- '*': { 'queue': 'forgefed',
- 'autoretry_for': [ Exception ],
- 'retry_kwargs': { 'max_retries': 20 },
- 'retry_backoff': True, # Exponential backoff
- 'retry_backoff_max': 60*60*24, # Do not delay for more than 24h
- 'retry_jitter': True,
- #'rate_limit': '10/s'
- }}
- })
- class database_session:
- """
- Because tasks are used from the independent Celery workers, these functions
- do not have access to the database sessions that are automatically created
- at the beginning of HTTP requests within Flask. So the job of this
- Context Manager is to create a new database session that tasks can use.
- This class can be used like this:
- with database_session() as (pagure_db, forgefed_db):
- pagure_db.query()...
- forgefed_db.query()...
- ...
- NOTE An alternative way of obtaining the same behavior would be with a
- decorator that contains a try...except...finally block.
- """
- def __init__(self):
- self.session = None
- def __enter__(self):
- self.session = pagure.lib.model_base.create_session(pagure.config.config['DB_URL'])
- return self.session
- def __exit__(self, exception_type, exception_value, exception_traceback):
- # If the task has raised an exception we need to rollback the session
- # first, in order not to leave uncommitted transactions hanging.
- if exception_type:
- self.session.rollback()
- else:
- self.session.commit()
- # Close the database sessions
- self.session.remove()
- # Optionally we can return True to prevent the exception from bubbling up.
- # However, because these are Celery tasks, Celery will automatically
- # retry the task after some time so it's OK to pass the exception up
- # to Celery.
- # return True
- from . import activities, delivery, notification
|