task.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # GNU MediaGoblin -- federated, autonomous media hosting
  2. # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  16. import logging
  17. from six.moves.urllib import request, parse
  18. import celery
  19. from celery.registry import tasks
  20. from mediagoblin import mg_globals as mgg
  21. from . import mark_entry_failed, BaseProcessingFail
  22. from mediagoblin.tools.processing import json_processing_callback
  23. from mediagoblin.processing import get_entry_and_processing_manager
  24. _log = logging.getLogger(__name__)
  25. logging.basicConfig()
  26. _log.setLevel(logging.DEBUG)
  27. @celery.task(default_retry_delay=2 * 60)
  28. def handle_push_urls(feed_url):
  29. """Subtask, notifying the PuSH servers of new content
  30. Retry 3 times every 2 minutes if run in separate process before failing."""
  31. if not mgg.app_config["push_urls"]:
  32. return # Nothing to do
  33. _log.debug('Notifying Push servers for feed {0}'.format(feed_url))
  34. hubparameters = {
  35. 'hub.mode': 'publish',
  36. 'hub.url': feed_url}
  37. hubdata = parse.urlencode(hubparameters)
  38. hubheaders = {
  39. "Content-type": "application/x-www-form-urlencoded",
  40. "Connection": "close"}
  41. for huburl in mgg.app_config["push_urls"]:
  42. hubrequest = request.Request(huburl, hubdata, hubheaders)
  43. try:
  44. hubresponse = request.urlopen(hubrequest)
  45. except (request.HTTPError, request.URLError) as exc:
  46. # We retry by default 3 times before failing
  47. _log.info("PuSH url %r gave error %r", huburl, exc)
  48. try:
  49. return handle_push_urls.retry(exc=exc, throw=False)
  50. except Exception as e:
  51. # All retries failed, Failure is no tragedy here, probably.
  52. _log.warn('Failed to notify PuSH server for feed {0}. '
  53. 'Giving up.'.format(feed_url))
  54. return False
  55. ################################
  56. # Media processing initial steps
  57. ################################
  58. class ProcessMedia(celery.Task):
  59. """
  60. Pass this entry off for processing.
  61. """
  62. def run(self, media_id, feed_url, reprocess_action, reprocess_info=None):
  63. """
  64. Pass the media entry off to the appropriate processing function
  65. (for now just process_image...)
  66. :param media_id: MediaEntry().id
  67. :param feed_url: The feed URL that the PuSH server needs to be
  68. updated for.
  69. :param reprocess_action: What particular action should be run. For
  70. example, 'initial'.
  71. :param reprocess: A dict containing all of the necessary reprocessing
  72. info for the media_type.
  73. """
  74. reprocess_info = reprocess_info or {}
  75. entry, manager = get_entry_and_processing_manager(media_id)
  76. # Try to process, and handle expected errors.
  77. try:
  78. processor_class = manager.get_processor(reprocess_action, entry)
  79. with processor_class(manager, entry) as processor:
  80. # Initial state change has to be here because
  81. # the entry.state gets recorded on processor_class init
  82. entry.state = u'processing'
  83. entry.save()
  84. _log.debug('Processing {0}'.format(entry))
  85. try:
  86. processor.process(**reprocess_info)
  87. except Exception as exc:
  88. if processor.entry_orig_state == 'processed':
  89. _log.error(
  90. 'Entry {0} failed to process due to the following'
  91. ' error: {1}'.format(entry.id, exc))
  92. _log.info(
  93. 'Setting entry.state back to "processed"')
  94. pass
  95. else:
  96. raise
  97. # We set the state to processed and save the entry here so there's
  98. # no need to save at the end of the processing stage, probably ;)
  99. entry.state = u'processed'
  100. entry.save()
  101. # Notify the PuSH servers as async task
  102. if mgg.app_config["push_urls"] and feed_url:
  103. handle_push_urls.subtask().delay(feed_url)
  104. json_processing_callback(entry)
  105. except BaseProcessingFail as exc:
  106. mark_entry_failed(entry.id, exc)
  107. json_processing_callback(entry)
  108. return
  109. except ImportError as exc:
  110. _log.error(
  111. 'Entry {0} failed to process due to an import error: {1}'\
  112. .format(
  113. entry.title,
  114. exc))
  115. mark_entry_failed(entry.id, exc)
  116. json_processing_callback(entry)
  117. except Exception as exc:
  118. _log.error('An unhandled exception was raised while'
  119. + ' processing {0}'.format(
  120. entry))
  121. mark_entry_failed(entry.id, exc)
  122. json_processing_callback(entry)
  123. raise
  124. def on_failure(self, exc, task_id, args, kwargs, einfo):
  125. """
  126. If the processing failed we should mark that in the database.
  127. Assuming that the exception raised is a subclass of
  128. BaseProcessingFail, we can use that to get more information
  129. about the failure and store that for conveying information to
  130. users about the failure, etc.
  131. """
  132. entry_id = args[0]
  133. mark_entry_failed(entry_id, exc)
  134. entry = mgg.database.MediaEntry.query.filter_by(id=entry_id).first()
  135. json_processing_callback(entry)
  136. mgg.database.reset_after_request()
  137. def after_return(self, *args, **kwargs):
  138. """
  139. This is called after the task has returned, we should clean up.
  140. We need to rollback the database to prevent ProgrammingError exceptions
  141. from being raised.
  142. """
  143. # In eager mode we get DetachedInstanceError, we do rollback on_failure
  144. # to deal with that case though when in eager mode.
  145. if not celery.app.default_app.conf['CELERY_ALWAYS_EAGER']:
  146. mgg.database.reset_after_request()
  147. tasks.register(ProcessMedia)