123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510 |
- """
- ForgeFed plugin for Pagure.
- Copyright (C) 2020-2021 zPlus <zplus@peers.community>
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License along
- with this program; if not, see <https://www.gnu.org/licenses/>.
- SPDX-FileCopyrightText: 2020-2021 zPlus <zplus@peers.community>
- SPDX-License-Identifier: GPL-2.0-only
- """
- import celery
- import datetime
- import json
- import rdflib
- import requests
- import requests_http_signature
- import sqlalchemy
- from .. import APP_URL
- from .. import activitypub
- from .. import model
- from .. import settings
- from . import activities
- from . import broker_url
- from . import broker
- from . import database_session
- log = celery.utils.log.get_task_logger(__name__)
- log.setLevel(settings.LOG_LEVEL)
- @broker.task
- def distribute(activity):
- """
- This function prepares an Activity for delivery and then schedules new
- delivery tasks for each recipient.
- :param activity: The activities.Activity to be send.
- """
- activity = activitypub.Activity.from_dict(activity)
- # Outgoing Activities MUST have an Actor
- if 'actor' not in activity:
- log.debug('Activity does not have an Actor. Ignoring Activity.\n{}'.format(json.dumps(activity, indent=4, sort_keys=True)))
- return
- # Set the JSON-LD context
- if '@context' not in activity:
- activity['@context'] = activitypub.jsonld_context
- # Assign an ID to the Activity
- activity['id'] = '{}/federation/activity/{}'.format(APP_URL, activitypub.new_activity_id())
- # Add publishing datetime
- # - use UTC
- # - remove microseconds, use HH:MM:SS only
- # - add timezone info. There is also .astimezone() but it seems to
- # return the wrong value when used with .utcnow(). Bug?
- # - convert to ISO 8601 format
- activity['published'] = activitypub.format_datetime(datetime.datetime.utcnow())
- with database_session() as database:
- actor_uri = activity['actor']
- actor = model.from_uri(database, actor_uri)
- actor_json = activitypub.fetch(actor_uri)
- # By default, the Activity is sent to the Actor followers collection.
- # https://www.w3.org/TR/activitypub/#delivery
- if 'to' not in activity:
- activity['to'] = [ actor_json['followers'] ]
- # Save a copy of the Activity in the database
- database.add(model.Resource(
- uri = activity['id'],
- document = json.dumps(activity)))
- # Add the Activity to the Actor's OUTBOX before sending it
- database.add(model.Collection(
- uri = actor_json['outbox'],
- item = activity['id']))
- # Now we are ready to POST to the remote actors
- # Create the list of recipients that we need to deliver to
- recipients = activity.get_receivers_addresses()
- # Before sending, remove bto and bcc according to spec.
- # https://www.w3.org/TR/activitypub/#client-to-server-interactions
- activity.pop('bto', None)
- activity.pop('bcc', None)
- # Stop here if there are no recipients.
- # https://www.w3.org/TR/activitypub/#h-note-8
- if len(recipients) == 0:
- log.debug('No recipients. Activity will not be sent.')
- return
- # Create a new Celery task for each recipient. Activities are POSTed
- # individually because if one request fails we don't want to resend
- # the same Activity to *all* the recipients.
- for recipient in recipients:
- log.debug('Deliverying {} to {}'.format(activity['id'], recipient))
- post.delay(activity, recipient)
- @broker.task
- def forward(activity):
- """
- Forward Activity. This task is scheduled when a new Activity has been received
- in a INBOX (see validate_incoming_activity()).
- https://www.w3.org/TR/activitypub/#inbox-forwarding
- """
- activity = activitypub.Activity.from_dict(activity)
- with database_session() as database:
- # Create the list of recipients that we need to deliver to
- recipients = activity.get_forwarding_addresses()
- if len(recipients) == 0:
- log.debug('No recipients. Activity {} will not be forwarded to anybody.'.format(activity['id']))
- return
- # Create a new Celery task for each recipient. Activities are POSTed
- # individually because if one request fails we don't want to resend
- # the same Activity to *all* the recipients.
- for recipient in recipients:
- log.debug('Forwarding {} to {}'.format(activity['id'], recipient))
- post.delay(activity, recipient)
- @broker.task
- def post(activity, recipient_uri, depth=settings.DELIVERY_DEPTH):
- """
- POST an Activity to a remote Actor. If the remote object is a Collection,
- a new task is scheduled for every item of that collection.
- If an error occurs during the HTTP request, the task is automatically
- rescheduled a number of times depending on the Celery configuration.
- IMPORTANT: This task is only responsible for the actual delivery (HTTP POST)
- of the Activity to the remote Actor's INBOX. This task will *not* attempt
- to perform any sort of validation of the Activity being sent. In other
- words: whoever is using this task should validate its own Activities
- before sending them.
- :param activity: The Activity to be sent.
- :type activity: dict
- :param recipient_uri: The URI of a remote actor to send the Activity to.
- :param depth: Number of indirections to follow if recipient_uri is a
- Collection. See the settings file for more info about this option.
- """
- activity = activitypub.Activity.from_dict(activity)
- actor_uri = activity['actor']
- # Make sure the Actor is not sending the Activity to itself.
- # https://www.w3.org/TR/activitypub/#delivery
- if actor_uri == recipient_uri:
- log.debug('Activity actor and recipient are the same. Will not send to {}'.format(actor_uri))
- return
- # If the Activity is addressed to as:Public, the Activity is not POSTed
- # to anyone.
- # https://www.w3.org/TR/activitypub/#public-addressing
- if recipient_uri == 'Public':
- log.debug('Not sending Activity to as:Public.')
- return
- # Retrieve remote object (actor)
- remote_object = activitypub.fetch(recipient_uri)
- assert remote_object, 'Could not fetch remote Actor {}'.format(recipient_uri)
- # Select the recipient INBOX. Use sharedInbox if there is one.
- # https://www.w3.org/TR/activitypub/#sharedInbox
- if 'endpoints' in remote_object and 'sharedInbox' in remote_object['endpoints']:
- recipient_inbox = remote_object['endpoints']['sharedInbox']
- elif 'inbox' in remote_object:
- recipient_inbox = remote_object['inbox']
- else:
- recipient_inbox = None
- # If the remote object does not have an INBOX, we check if it's a
- # Collection, in which case we retrieve all its items.
- if not recipient_inbox:
- log.debug('Recipient {} is not an Actor. I will now check if it is a Collection.'.format(recipient_uri))
- # Do not follow any more Collections.
- if depth < 1:
- log.debug('Max number of indirections reached. I will not '
- 'expand any more Collections.')
- return
- if any(collection == remote_object['type'] for collection in
- [ 'Collection', 'OrderedCollection',
- 'CollectionPage', 'OrderedCollectionPage' ]):
- items = []
- page = []
- if 'items' in remote_object:
- if isinstance(remote_object['items'], str):
- items.append(remote_object['items'])
- else:
- items.extend(remote_object['items'])
- if 'orderedItems' in remote_object:
- if isinstance(remote_object['orderedItems'], str):
- items.append(remote_object['orderedItems'])
- else:
- items.extend(remote_object['orderedItems'])
- if 'first' in remote_object:
- page.append(remote_object['first'])
- if 'next' in remote_object:
- page.append(remote_object['next'])
- # Schedule a new delivery for every object found in the collection
- for recipient_uri in items:
- post.delay(activity, recipient_uri, depth - 1)
- # TODO If a page "next" links to a previous page (which should
- # not happen), this will not detect the loop.
- for recipient_uri in page:
- post.delay(activity, recipient_uri, depth)
- if len(items) == 0 and len(page) == 0:
- log.debug('Collection found, but it contains no items. Activity will not be sent.')
- # Since this object does *not* have an INBOX, we stop here for this
- # task. If a Collection was found, a new post() has been scheduled.
- return
- with database_session() as database:
- # Check if this Activity was already sent (successfully) to this INBOX.
- # If it was, we do not resend the same Activity twice. This situation
- # could happen with a sharedInbox, or if for some reasons the same
- # Activity is sent twice (maybe the job queue didn't remove it?).
- # NOTE The collection "recipient_inbox" is fictitious because we
- # cannot have actual access to a remote Actor's INBOX.
- if database.query(
- database.query(model.Collection)
- .filter(model.Collection.uri == recipient_inbox)
- .filter(model.Collection.item == activity['id'])
- .exists()
- ).scalar():
- log.debug(
- 'Activity {} was already delivered to INBOX {}. Not sending again.'.format(
- activity['id'], recipient_inbox))
- return
- # If the Actor of the Activity is *not* a user of this instance, it means
- # that we're forwarding this Activity. In this case we cannot sign the
- # HTTP request (because we don't know the private key of the Actor). We
- # sign it as user forgefed@APP_URL instead.
- signer_uri = actor_uri
- if not actor_uri.startswith(APP_URL):
- signer_uri = model.test_or_set_forgefed_user(database).uri
- signer_jsonld = activitypub.fetch(signer_uri)
- # Make sure the Actor has a GPG key for signing the HTTP request
- key = model.GpgKey.test_or_set(database, signer_uri, signer_jsonld['publicKey'])
- if not key:
- log.debug('Actor {} does not have a key and cannot sign HTTP requests. Activity {} will not be delivered.'.format(
- signer_uri, activity['id']))
- return
- # This will add a "Signature:" header to the HTTP request.
- # The key ID should be the Actor URI, see: https://www.w3.org/wiki/SocialCG/ActivityPub/Authentication_Authorization#Signing_requests_using_HTTP_Signatures
- auth_method = requests_http_signature.HTTPSignatureHeaderAuth(
- key = key.private,
- key_id = signer_uri,
- algorithm = 'rsa-sha256',
- headers = [ '(request-target)', 'host', 'date', 'digest' ])
- # passphrase = None,
- # expires_in = None)
- log.debug('Posting Activity {} to {}'.format(activity['id'], recipient_inbox))
- log.debug(json.dumps(activity, indent=4, sort_keys=True))
- # Finally! POST the Activity to the INBOX of the remote Actor
- response = requests.post(
- recipient_inbox,
- headers=activitypub.REQUEST_HEADERS,
- data=json.dumps(activity).encode('UTF-8'),
- auth=auth_method)
- log.debug('Activity {} POST return code: {}'.format(activity['id'], response.status_code))
- # ActivityPub instances could return "200 OK" or "202 Accepted" upon
- # receiving the Activity.
- assert 200 <= response.status_code < 300, 'Error [HTTP {}] when POSTing Activity'.format(response.status_code)
- log.debug('Activity {} POSTed to {} without errors.'.format(
- activity['id'], recipient_inbox))
- # If we're sending this Activity to a remote instance, we remember that
- # this Activity was already delivered to them. This way it will not be
- # resent twice to the same INBOX (avoid spamming, just in case). For
- # Activities directed toward the local instance we don't need this because
- # we have access to Actors INBOXes so we already know if it was received.
- if not recipient_inbox.startswith(APP_URL):
- database.add(model.Collection(
- uri = recipient_inbox,
- item = activity['id']))
- @broker.task
- def validate_incoming_activity(actor_uri, activity, http_signature_verified):
- """
- This task is scheduled after receiving a new activity from a remote actor.
- It's called from the actor_receive() view in app.py.
- When a new Activity has been received the plugin will schedule a new task
- to validate the Activity, then return immediately.
- :param actor_uri: URI of the Actor that has received the Activity
- :type actor_uri: str
- :param activity: The incoming activity document.
- :type activity: dict
- :param http_signature_verified: Whether if the HTTP Signature header was
- verified or not.
- :type http_signature_verified: Bool
- """
- with database_session() as database:
- actor = model.from_uri(database, actor_uri)
- if not actor:
- log.debug('Received Activity {} for nonexistent Actor {}'.format(activity['id'], actor_uri))
- return
- # This is not a local Actor on this instance. It was likely sent to the wrong address.
- if actor.is_remote:
- log.debug('{} is a remote Actor. Ignoring incoming Activity.'.format(actor_uri))
- return
- # Get the JSONLD of the (local) Actor
- actor_jsonld = activitypub.fetch(actor_uri)
- if 'inbox' not in actor_jsonld:
- log.debug('{} does not seem to be a valid Actor. Ignoring incoming Activity {}'.format(actor_uri, activity['id']))
- return
- # Add the Activity to the Actor's INBOX
- # NOTE It's important that this line comes before checking if a copy of
- # the Activity is already in the database. This way the Activity
- # will be added to the Actor INBOX even if it won't be performed
- # (because it was already performed in the past).
- try:
- database.add(model.Collection(uri = actor_jsonld['inbox'],
- item = activity['id']))
- # flush() are automatically called by SQLAlchemy *before* updating a
- # Session. So we manually trigger it now to catch the exception, rather
- # than waiting for the next update below.
- database.flush()
- log.debug('Actor {} has received a new Activity with id {}:\n{}'.format(
- actor_uri, activity['id'], json.dumps(activity, indent=4, sort_keys=True)))
- except sqlalchemy.exc.IntegrityError:
- log.debug('Activity {} already in INBOX {}'.format(activity['id'], actor_jsonld['inbox']))
- database.rollback()
- return
- except Exception as e:
- raise Exception('Activity {} could not be added to INBOX {}'.format(activity['id'], actor_jsonld['inbox'])) from e
- # Now we need to check if this Activity was already handled in the past.
- # If it was, we don't do anything since it was already processed in the
- # past, and new Activities should have a new ID.
- # To do so, we count how many times this Activity has been added to a
- # collection (either INBOX or OUTBOX). If it's a local Activity (created
- # by this instance) we would have the ID in 2 collections: the OUTBOX of
- # the Actor who sent it, and the INBOX of the Actor who received it. If
- # it's an Activity created by another instance, this would be the first
- # time seeing it so we only have it in a Actor INBOX. Any more than that
- # it means that this Activity has been processed already previously.
- activity_count = database \
- .query(sqlalchemy.func.count(model.Collection.item)) \
- .filter(model.Collection.item == activity['id']) \
- .scalar()
- assert activity_count > 0
- if ( activity['id'].startswith(APP_URL) and activity_count > 2) \
- or (not activity['id'].startswith(APP_URL) and activity_count > 1):
- log.debug('This Activity was already parsed in the past. Ignoring {}'.format(activity['id']))
- return
- # TODO We need to verify the JSON-LD Activity signature (LD-signatures)
- # in order to make sure it wasn't forged.
- if not http_signature_verified:
- log.debug('HTTP Signature was not verified successfully.'
- 'We need to check the LD-signature.')
- return
- # Let's save a copy of the Activity in the database (only if it's a
- # remote Activity, otherwise it's already been saved during OUTBOX
- # delivery).
- if activity_count == 1:
- database.add(model.Resource(
- uri = activity['id'],
- document = json.dumps(activity)))
- # Will forward the Activity if necessary
- forward.delay(activity)
- #######################################################################
- # Now, we could stop here after the Activity has been added to the
- # Actor's INBOX, but Pagure is not just a server it also works
- # as a user client with an interface that allows user interactions.
- # For this reason we go on and process the Activity, which will likely
- # result in changes to the Pagure database.
- #######################################################################
- #######################################################################
- # Because JSON-LD can represent the same graph in several different
- # ways, we should probably normalize the JSONLD object before passing
- # it to the actor for processing. This simplifies working with the object.
- # Normalization could mean "flattening" or "compaction" of the JSONLD
- # document.
- # However, this step is left out for now and not implemented unless
- # needed because the ActivityStream specs already specifies that
- # objects should be served in compact form:
- # https://www.w3.org/TR/social-web-protocols/#content-representation
- #
- # activity = normalize_activity(activity)
- #######################################################################
- # Schedule a task for updating our local instance with the content of
- # the Activity.
- # NOTE It's important to schedule a new task instead of executing the
- # Activity from this task because if something fails during the
- # execution of the Activity, only that part will be retried by the
- # scheduler. The delivery to an actor INBOX has already happened
- # (from this task) and won't be retried again.
- activities.perform.delay(activity)
- @broker.task
- def validate_outgoing_activity(actor_uri, activity):
- """
- This task is scheduled after receiving a new activity from a local actor.
- It's called from the actor_send() view in app.py.
- :param actor_uri: URI of the Actor that has received the Activity
- :type actor_uri: str
- :param activity: The incoming activity document.
- :type activity: dict
- """
- activity = activitypub.Activity.from_dict(activity)
- # Get the JSONLD of the (local) Actor
- actor_jsonld = activitypub.fetch(actor_uri)
- if 'inbox' not in actor_jsonld:
- log.debug('{} does not seem to be a valid Actor. Ignoring Activity {}'.format(actor_uri, activity['id']))
- return
- with database_session() as database:
- actor = model.from_uri(database, actor_uri)
- if not actor:
- log.debug('Activity {} has nonexistent Actor {}'.format(activity['id'], actor_uri))
- return
- if actor.is_remote:
- log.debug('{} is a remote Actor. Ignoring Activity.'.format(actor_uri))
- return
- # Now we need to check if this Activity was already handled in the past.
- # If it was, we don't do anything since it was already processed in the
- # past, and new Activities should have a new ID.
- activity_count = database \
- .query(sqlalchemy.func.count(model.Collection.item)) \
- .filter(model.Collection.uri == actor_jsonld['outbox'],
- model.Collection.item == activity['id']) \
- .scalar()
- if activity_count > 0:
- log.debug('This Activity was already parsed in the past. Ignoring {}'.format(activity['id']))
- return
- # Update
- activities.perform.delay(activity)
- # And send it
- activity.distribute()
|