delivery.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. """
  2. ForgeFed plugin for Pagure.
  3. Copyright (C) 2020-2021 zPlus <zplus@peers.community>
  4. This program is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published by
  6. the Free Software Foundation; either version 2 of the License, or
  7. (at your option) any later version.
  8. This program is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU General Public License for more details.
  12. You should have received a copy of the GNU General Public License along
  13. with this program; if not, see <https://www.gnu.org/licenses/>.
  14. SPDX-FileCopyrightText: 2020-2021 zPlus <zplus@peers.community>
  15. SPDX-License-Identifier: GPL-2.0-only
  16. """
  17. import celery
  18. import datetime
  19. import json
  20. import rdflib
  21. import requests
  22. import requests_http_signature
  23. import sqlalchemy
  24. from .. import APP_URL
  25. from .. import activitypub
  26. from .. import model
  27. from .. import settings
  28. from . import activities
  29. from . import broker_url
  30. from . import broker
  31. from . import database_session
  32. log = celery.utils.log.get_task_logger(__name__)
  33. log.setLevel(settings.LOG_LEVEL)
  34. @broker.task
  35. def distribute(activity):
  36. """
  37. This function prepares an Activity for delivery and then schedules new
  38. delivery tasks for each recipient.
  39. :param activity: The activities.Activity to be send.
  40. """
  41. activity = activitypub.Activity.from_dict(activity)
  42. # Outgoing Activities MUST have an Actor
  43. if 'actor' not in activity:
  44. log.debug('Activity does not have an Actor. Ignoring Activity.\n{}'.format(json.dumps(activity, indent=4, sort_keys=True)))
  45. return
  46. # Set the JSON-LD context
  47. if '@context' not in activity:
  48. activity['@context'] = activitypub.jsonld_context
  49. # Assign an ID to the Activity
  50. activity['id'] = '{}/federation/activity/{}'.format(APP_URL, activitypub.new_activity_id())
  51. # Add publishing datetime
  52. # - use UTC
  53. # - remove microseconds, use HH:MM:SS only
  54. # - add timezone info. There is also .astimezone() but it seems to
  55. # return the wrong value when used with .utcnow(). Bug?
  56. # - convert to ISO 8601 format
  57. activity['published'] = activitypub.format_datetime(datetime.datetime.utcnow())
  58. with database_session() as database:
  59. actor_uri = activity['actor']
  60. actor = model.from_uri(database, actor_uri)
  61. actor_json = activitypub.fetch(actor_uri)
  62. # By default, the Activity is sent to the Actor followers collection.
  63. # https://www.w3.org/TR/activitypub/#delivery
  64. if 'to' not in activity:
  65. activity['to'] = [ actor_json['followers'] ]
  66. # Save a copy of the Activity in the database
  67. database.add(model.Resource(
  68. uri = activity['id'],
  69. document = json.dumps(activity)))
  70. # Add the Activity to the Actor's OUTBOX before sending it
  71. database.add(model.Collection(
  72. uri = actor_json['outbox'],
  73. item = activity['id']))
  74. # Now we are ready to POST to the remote actors
  75. # Create the list of recipients that we need to deliver to
  76. recipients = activity.get_receivers_addresses()
  77. # Before sending, remove bto and bcc according to spec.
  78. # https://www.w3.org/TR/activitypub/#client-to-server-interactions
  79. activity.pop('bto', None)
  80. activity.pop('bcc', None)
  81. # Stop here if there are no recipients.
  82. # https://www.w3.org/TR/activitypub/#h-note-8
  83. if len(recipients) == 0:
  84. log.debug('No recipients. Activity will not be sent.')
  85. return
  86. # Create a new Celery task for each recipient. Activities are POSTed
  87. # individually because if one request fails we don't want to resend
  88. # the same Activity to *all* the recipients.
  89. for recipient in recipients:
  90. log.debug('Deliverying {} to {}'.format(activity['id'], recipient))
  91. post.delay(activity, recipient)
  92. @broker.task
  93. def forward(activity):
  94. """
  95. Forward Activity. This task is scheduled when a new Activity has been received
  96. in a INBOX (see validate_incoming_activity()).
  97. https://www.w3.org/TR/activitypub/#inbox-forwarding
  98. """
  99. activity = activitypub.Activity.from_dict(activity)
  100. with database_session() as database:
  101. # Create the list of recipients that we need to deliver to
  102. recipients = activity.get_forwarding_addresses()
  103. if len(recipients) == 0:
  104. log.debug('No recipients. Activity {} will not be forwarded to anybody.'.format(activity['id']))
  105. return
  106. # Create a new Celery task for each recipient. Activities are POSTed
  107. # individually because if one request fails we don't want to resend
  108. # the same Activity to *all* the recipients.
  109. for recipient in recipients:
  110. log.debug('Forwarding {} to {}'.format(activity['id'], recipient))
  111. post.delay(activity, recipient)
  112. @broker.task
  113. def post(activity, recipient_uri, depth=settings.DELIVERY_DEPTH):
  114. """
  115. POST an Activity to a remote Actor. If the remote object is a Collection,
  116. a new task is scheduled for every item of that collection.
  117. If an error occurs during the HTTP request, the task is automatically
  118. rescheduled a number of times depending on the Celery configuration.
  119. IMPORTANT: This task is only responsible for the actual delivery (HTTP POST)
  120. of the Activity to the remote Actor's INBOX. This task will *not* attempt
  121. to perform any sort of validation of the Activity being sent. In other
  122. words: whoever is using this task should validate its own Activities
  123. before sending them.
  124. :param activity: The Activity to be sent.
  125. :type activity: dict
  126. :param recipient_uri: The URI of a remote actor to send the Activity to.
  127. :param depth: Number of indirections to follow if recipient_uri is a
  128. Collection. See the settings file for more info about this option.
  129. """
  130. activity = activitypub.Activity.from_dict(activity)
  131. actor_uri = activity['actor']
  132. # Make sure the Actor is not sending the Activity to itself.
  133. # https://www.w3.org/TR/activitypub/#delivery
  134. if actor_uri == recipient_uri:
  135. log.debug('Activity actor and recipient are the same. Will not send to {}'.format(actor_uri))
  136. return
  137. # If the Activity is addressed to as:Public, the Activity is not POSTed
  138. # to anyone.
  139. # https://www.w3.org/TR/activitypub/#public-addressing
  140. if recipient_uri == 'Public':
  141. log.debug('Not sending Activity to as:Public.')
  142. return
  143. # Retrieve remote object (actor)
  144. remote_object = activitypub.fetch(recipient_uri)
  145. assert remote_object, 'Could not fetch remote Actor {}'.format(recipient_uri)
  146. # Select the recipient INBOX. Use sharedInbox if there is one.
  147. # https://www.w3.org/TR/activitypub/#sharedInbox
  148. if 'endpoints' in remote_object and 'sharedInbox' in remote_object['endpoints']:
  149. recipient_inbox = remote_object['endpoints']['sharedInbox']
  150. elif 'inbox' in remote_object:
  151. recipient_inbox = remote_object['inbox']
  152. else:
  153. recipient_inbox = None
  154. # If the remote object does not have an INBOX, we check if it's a
  155. # Collection, in which case we retrieve all its items.
  156. if not recipient_inbox:
  157. log.debug('Recipient {} is not an Actor. I will now check if it is a Collection.'.format(recipient_uri))
  158. # Do not follow any more Collections.
  159. if depth < 1:
  160. log.debug('Max number of indirections reached. I will not '
  161. 'expand any more Collections.')
  162. return
  163. if any(collection == remote_object['type'] for collection in
  164. [ 'Collection', 'OrderedCollection',
  165. 'CollectionPage', 'OrderedCollectionPage' ]):
  166. items = []
  167. page = []
  168. if 'items' in remote_object:
  169. if isinstance(remote_object['items'], str):
  170. items.append(remote_object['items'])
  171. else:
  172. items.extend(remote_object['items'])
  173. if 'orderedItems' in remote_object:
  174. if isinstance(remote_object['orderedItems'], str):
  175. items.append(remote_object['orderedItems'])
  176. else:
  177. items.extend(remote_object['orderedItems'])
  178. if 'first' in remote_object:
  179. page.append(remote_object['first'])
  180. if 'next' in remote_object:
  181. page.append(remote_object['next'])
  182. # Schedule a new delivery for every object found in the collection
  183. for recipient_uri in items:
  184. post.delay(activity, recipient_uri, depth - 1)
  185. # TODO If a page "next" links to a previous page (which should
  186. # not happen), this will not detect the loop.
  187. for recipient_uri in page:
  188. post.delay(activity, recipient_uri, depth)
  189. if len(items) == 0 and len(page) == 0:
  190. log.debug('Collection found, but it contains no items. Activity will not be sent.')
  191. # Since this object does *not* have an INBOX, we stop here for this
  192. # task. If a Collection was found, a new post() has been scheduled.
  193. return
  194. with database_session() as database:
  195. # Check if this Activity was already sent (successfully) to this INBOX.
  196. # If it was, we do not resend the same Activity twice. This situation
  197. # could happen with a sharedInbox, or if for some reasons the same
  198. # Activity is sent twice (maybe the job queue didn't remove it?).
  199. # NOTE The collection "recipient_inbox" is fictitious because we
  200. # cannot have actual access to a remote Actor's INBOX.
  201. if database.query(
  202. database.query(model.Collection)
  203. .filter(model.Collection.uri == recipient_inbox)
  204. .filter(model.Collection.item == activity['id'])
  205. .exists()
  206. ).scalar():
  207. log.debug(
  208. 'Activity {} was already delivered to INBOX {}. Not sending again.'.format(
  209. activity['id'], recipient_inbox))
  210. return
  211. # If the Actor of the Activity is *not* a user of this instance, it means
  212. # that we're forwarding this Activity. In this case we cannot sign the
  213. # HTTP request (because we don't know the private key of the Actor). We
  214. # sign it as user forgefed@APP_URL instead.
  215. signer_uri = actor_uri
  216. if not actor_uri.startswith(APP_URL):
  217. signer_uri = model.test_or_set_forgefed_user(database).uri
  218. signer_jsonld = activitypub.fetch(signer_uri)
  219. # Make sure the Actor has a GPG key for signing the HTTP request
  220. key = model.GpgKey.test_or_set(database, signer_uri, signer_jsonld['publicKey'])
  221. if not key:
  222. log.debug('Actor {} does not have a key and cannot sign HTTP requests. Activity {} will not be delivered.'.format(
  223. signer_uri, activity['id']))
  224. return
  225. # This will add a "Signature:" header to the HTTP request.
  226. # The key ID should be the Actor URI, see: https://www.w3.org/wiki/SocialCG/ActivityPub/Authentication_Authorization#Signing_requests_using_HTTP_Signatures
  227. auth_method = requests_http_signature.HTTPSignatureHeaderAuth(
  228. key = key.private,
  229. key_id = signer_uri,
  230. algorithm = 'rsa-sha256',
  231. headers = [ '(request-target)', 'host', 'date', 'digest' ])
  232. # passphrase = None,
  233. # expires_in = None)
  234. log.debug('Posting Activity {} to {}'.format(activity['id'], recipient_inbox))
  235. log.debug(json.dumps(activity, indent=4, sort_keys=True))
  236. # Finally! POST the Activity to the INBOX of the remote Actor
  237. response = requests.post(
  238. recipient_inbox,
  239. headers=activitypub.REQUEST_HEADERS,
  240. data=json.dumps(activity).encode('UTF-8'),
  241. auth=auth_method)
  242. log.debug('Activity {} POST return code: {}'.format(activity['id'], response.status_code))
  243. # ActivityPub instances could return "200 OK" or "202 Accepted" upon
  244. # receiving the Activity.
  245. assert 200 <= response.status_code < 300, 'Error [HTTP {}] when POSTing Activity'.format(response.status_code)
  246. log.debug('Activity {} POSTed to {} without errors.'.format(
  247. activity['id'], recipient_inbox))
  248. # If we're sending this Activity to a remote instance, we remember that
  249. # this Activity was already delivered to them. This way it will not be
  250. # resent twice to the same INBOX (avoid spamming, just in case). For
  251. # Activities directed toward the local instance we don't need this because
  252. # we have access to Actors INBOXes so we already know if it was received.
  253. if not recipient_inbox.startswith(APP_URL):
  254. database.add(model.Collection(
  255. uri = recipient_inbox,
  256. item = activity['id']))
  257. @broker.task
  258. def validate_incoming_activity(actor_uri, activity, http_signature_verified):
  259. """
  260. This task is scheduled after receiving a new activity from a remote actor.
  261. It's called from the actor_receive() view in app.py.
  262. When a new Activity has been received the plugin will schedule a new task
  263. to validate the Activity, then return immediately.
  264. :param actor_uri: URI of the Actor that has received the Activity
  265. :type actor_uri: str
  266. :param activity: The incoming activity document.
  267. :type activity: dict
  268. :param http_signature_verified: Whether if the HTTP Signature header was
  269. verified or not.
  270. :type http_signature_verified: Bool
  271. """
  272. with database_session() as database:
  273. actor = model.from_uri(database, actor_uri)
  274. if not actor:
  275. log.debug('Received Activity {} for nonexistent Actor {}'.format(activity['id'], actor_uri))
  276. return
  277. # This is not a local Actor on this instance. It was likely sent to the wrong address.
  278. if actor.is_remote:
  279. log.debug('{} is a remote Actor. Ignoring incoming Activity.'.format(actor_uri))
  280. return
  281. # Get the JSONLD of the (local) Actor
  282. actor_jsonld = activitypub.fetch(actor_uri)
  283. if 'inbox' not in actor_jsonld:
  284. log.debug('{} does not seem to be a valid Actor. Ignoring incoming Activity {}'.format(actor_uri, activity['id']))
  285. return
  286. # Add the Activity to the Actor's INBOX
  287. # NOTE It's important that this line comes before checking if a copy of
  288. # the Activity is already in the database. This way the Activity
  289. # will be added to the Actor INBOX even if it won't be performed
  290. # (because it was already performed in the past).
  291. try:
  292. database.add(model.Collection(uri = actor_jsonld['inbox'],
  293. item = activity['id']))
  294. # flush() are automatically called by SQLAlchemy *before* updating a
  295. # Session. So we manually trigger it now to catch the exception, rather
  296. # than waiting for the next update below.
  297. database.flush()
  298. log.debug('Actor {} has received a new Activity with id {}:\n{}'.format(
  299. actor_uri, activity['id'], json.dumps(activity, indent=4, sort_keys=True)))
  300. except sqlalchemy.exc.IntegrityError:
  301. log.debug('Activity {} already in INBOX {}'.format(activity['id'], actor_jsonld['inbox']))
  302. database.rollback()
  303. return
  304. except Exception as e:
  305. raise Exception('Activity {} could not be added to INBOX {}'.format(activity['id'], actor_jsonld['inbox'])) from e
  306. # Now we need to check if this Activity was already handled in the past.
  307. # If it was, we don't do anything since it was already processed in the
  308. # past, and new Activities should have a new ID.
  309. # To do so, we count how many times this Activity has been added to a
  310. # collection (either INBOX or OUTBOX). If it's a local Activity (created
  311. # by this instance) we would have the ID in 2 collections: the OUTBOX of
  312. # the Actor who sent it, and the INBOX of the Actor who received it. If
  313. # it's an Activity created by another instance, this would be the first
  314. # time seeing it so we only have it in a Actor INBOX. Any more than that
  315. # it means that this Activity has been processed already previously.
  316. activity_count = database \
  317. .query(sqlalchemy.func.count(model.Collection.item)) \
  318. .filter(model.Collection.item == activity['id']) \
  319. .scalar()
  320. assert activity_count > 0
  321. if ( activity['id'].startswith(APP_URL) and activity_count > 2) \
  322. or (not activity['id'].startswith(APP_URL) and activity_count > 1):
  323. log.debug('This Activity was already parsed in the past. Ignoring {}'.format(activity['id']))
  324. return
  325. # TODO We need to verify the JSON-LD Activity signature (LD-signatures)
  326. # in order to make sure it wasn't forged.
  327. if not http_signature_verified:
  328. log.debug('HTTP Signature was not verified successfully.'
  329. 'We need to check the LD-signature.')
  330. return
  331. # Let's save a copy of the Activity in the database (only if it's a
  332. # remote Activity, otherwise it's already been saved during OUTBOX
  333. # delivery).
  334. if activity_count == 1:
  335. database.add(model.Resource(
  336. uri = activity['id'],
  337. document = json.dumps(activity)))
  338. # Will forward the Activity if necessary
  339. forward.delay(activity)
  340. #######################################################################
  341. # Now, we could stop here after the Activity has been added to the
  342. # Actor's INBOX, but Pagure is not just a server it also works
  343. # as a user client with an interface that allows user interactions.
  344. # For this reason we go on and process the Activity, which will likely
  345. # result in changes to the Pagure database.
  346. #######################################################################
  347. #######################################################################
  348. # Because JSON-LD can represent the same graph in several different
  349. # ways, we should probably normalize the JSONLD object before passing
  350. # it to the actor for processing. This simplifies working with the object.
  351. # Normalization could mean "flattening" or "compaction" of the JSONLD
  352. # document.
  353. # However, this step is left out for now and not implemented unless
  354. # needed because the ActivityStream specs already specifies that
  355. # objects should be served in compact form:
  356. # https://www.w3.org/TR/social-web-protocols/#content-representation
  357. #
  358. # activity = normalize_activity(activity)
  359. #######################################################################
  360. # Schedule a task for updating our local instance with the content of
  361. # the Activity.
  362. # NOTE It's important to schedule a new task instead of executing the
  363. # Activity from this task because if something fails during the
  364. # execution of the Activity, only that part will be retried by the
  365. # scheduler. The delivery to an actor INBOX has already happened
  366. # (from this task) and won't be retried again.
  367. activities.perform.delay(activity)
  368. @broker.task
  369. def validate_outgoing_activity(actor_uri, activity):
  370. """
  371. This task is scheduled after receiving a new activity from a local actor.
  372. It's called from the actor_send() view in app.py.
  373. :param actor_uri: URI of the Actor that has received the Activity
  374. :type actor_uri: str
  375. :param activity: The incoming activity document.
  376. :type activity: dict
  377. """
  378. activity = activitypub.Activity.from_dict(activity)
  379. # Get the JSONLD of the (local) Actor
  380. actor_jsonld = activitypub.fetch(actor_uri)
  381. if 'inbox' not in actor_jsonld:
  382. log.debug('{} does not seem to be a valid Actor. Ignoring Activity {}'.format(actor_uri, activity['id']))
  383. return
  384. with database_session() as database:
  385. actor = model.from_uri(database, actor_uri)
  386. if not actor:
  387. log.debug('Activity {} has nonexistent Actor {}'.format(activity['id'], actor_uri))
  388. return
  389. if actor.is_remote:
  390. log.debug('{} is a remote Actor. Ignoring Activity.'.format(actor_uri))
  391. return
  392. # Now we need to check if this Activity was already handled in the past.
  393. # If it was, we don't do anything since it was already processed in the
  394. # past, and new Activities should have a new ID.
  395. activity_count = database \
  396. .query(sqlalchemy.func.count(model.Collection.item)) \
  397. .filter(model.Collection.uri == actor_jsonld['outbox'],
  398. model.Collection.item == activity['id']) \
  399. .scalar()
  400. if activity_count > 0:
  401. log.debug('This Activity was already parsed in the past. Ignoring {}'.format(activity['id']))
  402. return
  403. # Update
  404. activities.perform.delay(activity)
  405. # And send it
  406. activity.distribute()