delivery.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. import celery
  2. import datetime
  3. import json
  4. import rdflib
  5. import requests
  6. import requests_http_signature
  7. from .. import APP_URL
  8. from .. import activitypub
  9. from .. import model
  10. from .. import settings
  11. from . import activities
  12. from . import broker_url
  13. from . import broker
  14. from . import database_session
  15. log = celery.utils.log.get_task_logger(__name__)
  16. log.setLevel(settings.LOG_LEVEL)
  17. @broker.task
  18. def distribute(activity):
  19. """
  20. This function prepares an Activity for delivery and then schedules new
  21. delivery tasks for each recipient.
  22. :param activity: The activities.Activity to be send.
  23. """
  24. # Outgoing Activities MUST have an Actor
  25. if 'actor' not in activity:
  26. log.debug('Activity does not have an Actor. Ignoring Activity.\n{}'.format(activity))
  27. return
  28. # Set the JSON-LD context
  29. if '@context' not in activity:
  30. activity['@context'] = activitypub.jsonld_context
  31. # Assign an ID to the Activity
  32. activity['id'] = '{}/federation/activity/{}'.format(APP_URL, activitypub.new_activity_id())
  33. # Add publishing datetime
  34. # - use UTC
  35. # - remove microseconds, use HH:MM:SS only
  36. # - add timezone info. There is also .astimezone() but it seems to
  37. # return the wrong value when used with .utcnow(). Bug?
  38. # - convert to ISO 8601 format
  39. activity['published'] = activitypub.format_datetime(datetime.datetime.utcnow())
  40. with database_session() as database:
  41. actor_uri = activity['actor']
  42. actor = model.from_uri(database, actor_uri)
  43. # By default, the Activity is sent to the Actor followers collection.
  44. # https://www.w3.org/TR/activitypub/#delivery
  45. if 'to' not in activity:
  46. activity['to'] = [ actor.followers_uri ]
  47. # Create the list of recipients that we need to deliver to
  48. recipients = []
  49. for field in [ 'to', 'cc', 'bto', 'bcc' ]:
  50. if field in activity:
  51. if isinstance(activity[field], str):
  52. recipients.append(activity[field])
  53. else:
  54. recipients.extend(activity[field])
  55. # Remove duplicates in the list of recipients, so that we will schedule
  56. # only one task for the same recipient Actor.
  57. # NOTE list(set()) might change the order of the recipients because set()
  58. # does not preserve the order. However this is not a problem because
  59. # for each recipient a separate task is scheduled.
  60. recipients = list(set(recipients))
  61. # Save a copy of the Activity in the database
  62. database.add(model.Resource(
  63. uri = activity['id'],
  64. document = json.dumps(activity)))
  65. # Add the Activity to the Actor's OUTBOX before sending it
  66. database.add(model.Collection(
  67. uri = actor.outbox_uri,
  68. item = activity['id']))
  69. database.commit()
  70. # Now we are ready to POST to the remote actors
  71. # Before sending, remove bto and bcc according to spec.
  72. # https://www.w3.org/TR/activitypub/#client-to-server-interactions
  73. activity.pop('bto', None)
  74. activity.pop('bcc', None)
  75. # Stop here if there are no recipients.
  76. # https://www.w3.org/TR/activitypub/#h-note-8
  77. if len(recipients) == 0:
  78. log.debug('No recipients. Activity will not be sent.')
  79. return
  80. # Make sure the local actor has a GPG key before POSTing anything. The
  81. # remote Actor will use the key for verifying the Activity.
  82. model.GpgKey.test_or_set(database, actor.local_uri, actor.publickey_uri)
  83. # Create a new Celery task for each recipient. Activities are POSTed
  84. # individually because if one request fails we don't want to resend
  85. # the same Activity to *all* the recipients.
  86. for recipient in recipients:
  87. log.debug('Scheduling new activity: id={} recipient={}'.format(
  88. activity['id'], recipient))
  89. post.delay(activity = activity,
  90. recipient_uri = recipient,
  91. depth = settings.DELIVERY_DEPTH)
  92. @broker.task
  93. def post(activity, recipient_uri, depth):
  94. """
  95. POST an Activity to a remote Actor. If the remote object is a Collection,
  96. a new task is scheduled for every item of that collection.
  97. If an error occurs during the HTTP request, the task is automatically
  98. rescheduled a number of times depending on the Celery configuration.
  99. IMPORTANT: This task is only responsible for the actual delivery (HTTP POST)
  100. of the Activity to the remote Actor's INBOX. This task will *not* attempt
  101. to perform any sort of validation of the Activity being sent. In other
  102. words: whoever is using this task should validate its own Activities
  103. before sending them.
  104. :param activity: The Activity to be sent.
  105. :type activity: dict
  106. :param recipient_uri: The URI of a remote actor to send the Activity to.
  107. :param depth: Number of indirections to follow if recipient_uri is a
  108. Collection. See the settings file for more info about this option.
  109. """
  110. with database_session() as database:
  111. actor_uri = activity['actor']
  112. # Make sure the Actor is not sending the Activity to itself.
  113. # https://www.w3.org/TR/activitypub/#delivery
  114. if actor_uri == recipient_uri:
  115. log.debug('Activity actor and recipient are the same. '
  116. 'Refuting to send.')
  117. return
  118. # If the Activity is addressed to as:Public, the Activity is not POSTed
  119. # to anyone.
  120. # https://www.w3.org/TR/activitypub/#public-addressing
  121. if recipient_uri == 'Public':
  122. log.debug('Not sending to as:Public.')
  123. return
  124. # Retrieve remote object (actor)
  125. remote_object = activitypub.fetch(recipient_uri)
  126. # Make sure we got an object, or abort task
  127. assert remote_object, 'Could not fetch remote actor.'
  128. # Select the Actor INBOX. Use sharedInbox if there is one.
  129. # https://www.w3.org/TR/activitypub/#sharedInbox
  130. if 'endpoints' in remote_object and 'sharedInbox' in remote_object['endpoints']:
  131. recipient_inbox = remote_object['endpoints']['sharedInbox']
  132. elif 'inbox' in remote_object:
  133. recipient_inbox = remote_object['inbox']
  134. else:
  135. recipient_inbox = None
  136. # If the remote object does not have an INBOX, we check if it's a
  137. # Collection, in which case we retrieve all its items.
  138. if not recipient_inbox:
  139. log.debug('Recipient is not an Actor. Checking if it\'s a '
  140. 'collection.')
  141. # Do not follow any more Collections.
  142. if depth < 1:
  143. log.debug('Max number of indirections reached. I will not '
  144. 'expand any more collections.')
  145. return
  146. if any(collection == remote_object['type'] for collection in
  147. [ 'Collection', 'OrderedCollection',
  148. 'CollectionPage', 'OrderedCollectionPage' ]):
  149. items = []
  150. page = []
  151. if 'items' in remote_object:
  152. if isinstance(remote_object['items'], str):
  153. items.append(remote_object['items'])
  154. else:
  155. items.extend(remote_object['items'])
  156. if 'orderedItems' in remote_object:
  157. if isinstance(remote_object['orderedItems'], str):
  158. items.append(remote_object['orderedItems'])
  159. else:
  160. items.extend(remote_object['orderedItems'])
  161. if 'first' in remote_object:
  162. page.append(remote_object['first'])
  163. if 'next' in remote_object:
  164. page.append(remote_object['next'])
  165. # Schedule a new delivery for every object found in the collection
  166. for recipient_uri in items:
  167. post.delay(activity, recipient_uri, depth - 1)
  168. # TODO If a page "next" links to a previous page (which should
  169. # not happen), this will not detect the loop.
  170. for recipient_uri in page:
  171. post.delay(activity, recipient_uri, depth)
  172. if len(items) == 0 and len(page) == 0:
  173. log.debug('Collection found, but it contains no items. '
  174. + 'Activity will not be sent.')
  175. # Since this object does *not* have an INBOX, we stop here for this
  176. # task.
  177. return
  178. # Check if this Activity was already sent (successfully) to this INBOX.
  179. # If it was, we do not resend the same Activity twice. This situation
  180. # could happen with a sharedInbox, or if for some reasons the same
  181. # Activity is sent twice (maybe the job queue didn't remove it?).
  182. # NOTE The collection "recipient_inbox" is fictitious because we
  183. # cannot have actual access to a remote Actor's INBOX.
  184. if database.query(
  185. database.query(model.Collection)
  186. .filter(model.Collection.uri == recipient_inbox)
  187. .filter(model.Collection.item == activity['id'])
  188. .exists()
  189. ).scalar():
  190. log.warning(
  191. 'Activity ' + activity['id'] + ' was already delivered to '
  192. 'the INBOX ' + recipient_inbox + '. Not sending again.')
  193. return
  194. # Retrieve the private key of the local Actor for signing the HTTP request
  195. key = database.query(model.GpgKey) \
  196. .filter(model.GpgKey.actor_uri == actor_uri) \
  197. .one_or_none()
  198. if not key:
  199. log.debug('Actor {} does not have a key. Cannot sign HTTP request.')
  200. return
  201. # This will add a "Signature:" header to the HTTP request.
  202. auth_method = requests_http_signature.HTTPSignatureHeaderAuth(
  203. key = key.private,
  204. key_id = key.uri,
  205. algorithm = 'rsa-sha256',
  206. headers = [ '(request-target)', 'host', 'date', 'digest' ])
  207. # passphrase = None,
  208. # expires_in = None)
  209. log.debug('Posting Activity ' + activity['id'] + ' to ' + recipient_inbox)
  210. log.debug(json.dumps(activity, indent=4, sort_keys=True))
  211. # Finally! Send out the Activity to the INBOX of the remote Actor
  212. response = requests.post(
  213. recipient_inbox,
  214. headers=activitypub.REQUEST_HEADERS,
  215. data=json.dumps(activity).encode('UTF-8'),
  216. auth=auth_method)
  217. log.debug('Activity ' + activity['id'] + ' POST return code: '
  218. + str(response.status_code))
  219. # ActivityPub instances could return "200 OK" or "202 Accepted" upon
  220. # receiving the Activity.
  221. assert 200 <= response.status_code < 300, 'Error [HTTP {}] when POSTing Activity'.format(response.status_code)
  222. log.debug('Activity posted without errors.')
  223. # Save in the database that this Activity was delivered successfully to
  224. # the remote Actor. This way it will not be resent twice (just in case).
  225. database.add(model.Collection(
  226. uri = recipient_inbox,
  227. item = activity['id']))
  228. @broker.task
  229. def validate_incoming_activity(actor_uri, activity):
  230. """
  231. This task is scheduled after receiving a new activity from a remote actor.
  232. It's called from the actor_receive() view in app.py.
  233. When a new Activity has been received the plugin will schedule a new task
  234. to validate the Activity, then return immediately.
  235. :param actor_uri: URI of the Actor that has received the Activity
  236. :type actor_uri: str
  237. :param activity: The incoming activity document.
  238. :type activity: dict
  239. """
  240. if not actor_uri:
  241. log.info('Missing Actor. Ignoring task.')
  242. return
  243. if not activity:
  244. log.info('Missing Activity. Ignoring task.')
  245. return
  246. log.debug('Actor {} has received a new Activity with id {}:\n{}'.format(
  247. actor_uri, activity['id'], json.dumps(activity, indent=4, sort_keys=True)))
  248. with database_session() as database:
  249. # Recreate the actor class from its URI
  250. actor = model.from_uri(database, actor_uri)
  251. if not actor:
  252. log.debug('Actor {} doesn\'t exist. Ignoring incoming Activity.'.format(actor_uri))
  253. return
  254. # Check if this Activity was already delivered to this Actor. If it was,
  255. # we don't do anything since it was already processed in the past. New
  256. # activities should not have the same ID of older ones.
  257. if database.query(database \
  258. .query(model.Collection) \
  259. .filter(model.Collection.uri == actor.inbox_uri) \
  260. .filter(model.Collection.item == activity['id']) \
  261. .exists() \
  262. ).scalar():
  263. log.debug('Activity already delivered to this actor.')
  264. return
  265. # Add the Activity to the Actor's INBOX
  266. database.add(model.Collection(uri = actor.inbox_uri, item = activity['id']))
  267. #######################################################################
  268. # Now, we could stop here after the Activity has been added to the
  269. # Actor's INBOX, but Pagure is not just a server it also works
  270. # as a user client with an interface that allows user interactions.
  271. # For this reason we go on and process the Activity, which will likely
  272. # result in changes to the Pagure database.
  273. # The INBOX can also be read by other clients (eg. desktop apps).
  274. #######################################################################
  275. #######################################################################
  276. # Because JSON-LD can represent the same graph in several different
  277. # ways, we should probably normalize the JSONLD object before passing
  278. # it to the actor for processing. This simplifies working with the object.
  279. # Normalization could mean "flattening" or "compaction" of the JSONLD
  280. # document.
  281. # However, this step is left out for now and not implemented unless
  282. # needed because the ActivityStream specs already specifies that
  283. # objects should be served in compact form:
  284. # https://www.w3.org/TR/social-web-protocols/#content-representation
  285. #
  286. # activity = normalize_activity(activity)
  287. #######################################################################
  288. # Check if this Activity was already received
  289. if database.query(database \
  290. .query(model.Resource) \
  291. .filter(model.Resource.uri == activity['id']) \
  292. .exists() \
  293. ).scalar():
  294. log.debug('This Activity was already received and parsed in the past. Ignoring {}'.format(activity['id']))
  295. return
  296. # Let's save a copy of the Activity in the database
  297. database.add(model.Resource(
  298. uri = activity['id'],
  299. document = json.dumps(activity)))
  300. # Schedule a task for updating our local instance with the content of
  301. # the Activity.
  302. # NOTE It's important to schedule a new task instead of executing the
  303. # Activity from this task because if something fails during the
  304. # execution of the Activity, only that part will be retried by the
  305. # scheduler. The delivery to an actor INBOX has already happened
  306. # (from this task) and won't be retried again.
  307. activities.perform.delay(actor_uri, activity)