123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- import celery
- import datetime
- import json
- import rdflib
- import requests
- import requests_http_signature
- from .. import APP_URL
- from .. import activitypub
- from .. import model
- from .. import settings
- from . import activities
- from . import broker_url
- from . import broker
- from . import database_session
- log = celery.utils.log.get_task_logger(__name__)
- log.setLevel(settings.LOG_LEVEL)
- @broker.task
- def distribute(activity):
- """
- This function prepares an Activity for delivery and then schedules new
- delivery tasks for each recipient.
- :param activity: The activities.Activity to be send.
- """
-
- # Outgoing Activities MUST have an Actor
- if 'actor' not in activity:
- log.debug('Activity does not have an Actor. Ignoring Activity.\n{}'.format(activity))
- return
-
- # Set the JSON-LD context
- if '@context' not in activity:
- activity['@context'] = activitypub.jsonld_context
-
- # Assign an ID to the Activity
- activity['id'] = '{}/federation/activity/{}'.format(APP_URL, activitypub.new_activity_id())
-
- # Add publishing datetime
- # - use UTC
- # - remove microseconds, use HH:MM:SS only
- # - add timezone info. There is also .astimezone() but it seems to
- # return the wrong value when used with .utcnow(). Bug?
- # - convert to ISO 8601 format
- activity['published'] = activitypub.format_datetime(datetime.datetime.utcnow())
-
- with database_session() as database:
-
- actor_uri = activity['actor']
- actor = model.from_uri(database, actor_uri)
- # By default, the Activity is sent to the Actor followers collection.
- # https://www.w3.org/TR/activitypub/#delivery
- if 'to' not in activity:
- activity['to'] = [ actor.followers_uri ]
- # Create the list of recipients that we need to deliver to
- recipients = []
- for field in [ 'to', 'cc', 'bto', 'bcc' ]:
- if field in activity:
- if isinstance(activity[field], str):
- recipients.append(activity[field])
- else:
- recipients.extend(activity[field])
- # Remove duplicates in the list of recipients, so that we will schedule
- # only one task for the same recipient Actor.
- # NOTE list(set()) might change the order of the recipients because set()
- # does not preserve the order. However this is not a problem because
- # for each recipient a separate task is scheduled.
- recipients = list(set(recipients))
- # Save a copy of the Activity in the database
- database.add(model.Resource(
- uri = activity['id'],
- document = json.dumps(activity)))
- # Add the Activity to the Actor's OUTBOX before sending it
- database.add(model.Collection(
- uri = actor.outbox_uri,
- item = activity['id']))
- database.commit()
- # Now we are ready to POST to the remote actors
- # Before sending, remove bto and bcc according to spec.
- # https://www.w3.org/TR/activitypub/#client-to-server-interactions
- activity.pop('bto', None)
- activity.pop('bcc', None)
- # Stop here if there are no recipients.
- # https://www.w3.org/TR/activitypub/#h-note-8
- if len(recipients) == 0:
- log.debug('No recipients. Activity will not be sent.')
- return
- # Make sure the local actor has a GPG key before POSTing anything. The
- # remote Actor will use the key for verifying the Activity.
- model.GpgKey.test_or_set(database, actor.local_uri, actor.publickey_uri)
- # Create a new Celery task for each recipient. Activities are POSTed
- # individually because if one request fails we don't want to resend
- # the same Activity to *all* the recipients.
- for recipient in recipients:
- log.debug('Scheduling new activity: id={} recipient={}'.format(
- activity['id'], recipient))
-
- post.delay(activity = activity,
- recipient_uri = recipient,
- depth = settings.DELIVERY_DEPTH)
- @broker.task
- def post(activity, recipient_uri, depth):
- """
- POST an Activity to a remote Actor. If the remote object is a Collection,
- a new task is scheduled for every item of that collection.
- If an error occurs during the HTTP request, the task is automatically
- rescheduled a number of times depending on the Celery configuration.
-
- IMPORTANT: This task is only responsible for the actual delivery (HTTP POST)
- of the Activity to the remote Actor's INBOX. This task will *not* attempt
- to perform any sort of validation of the Activity being sent. In other
- words: whoever is using this task should validate its own Activities
- before sending them.
-
- :param activity: The Activity to be sent.
- :type activity: dict
-
- :param recipient_uri: The URI of a remote actor to send the Activity to.
-
- :param depth: Number of indirections to follow if recipient_uri is a
- Collection. See the settings file for more info about this option.
- """
-
- with database_session() as database:
-
- actor_uri = activity['actor']
-
- # Make sure the Actor is not sending the Activity to itself.
- # https://www.w3.org/TR/activitypub/#delivery
- if actor_uri == recipient_uri:
- log.debug('Activity actor and recipient are the same. '
- 'Refuting to send.')
- return
-
- # If the Activity is addressed to as:Public, the Activity is not POSTed
- # to anyone.
- # https://www.w3.org/TR/activitypub/#public-addressing
- if recipient_uri == 'Public':
- log.debug('Not sending to as:Public.')
- return
-
- # Retrieve remote object (actor)
- remote_object = activitypub.fetch(recipient_uri)
-
- # Make sure we got an object, or abort task
- assert remote_object, 'Could not fetch remote actor.'
-
- # Select the Actor INBOX. Use sharedInbox if there is one.
- # https://www.w3.org/TR/activitypub/#sharedInbox
- if 'endpoints' in remote_object and 'sharedInbox' in remote_object['endpoints']:
- recipient_inbox = remote_object['endpoints']['sharedInbox']
- elif 'inbox' in remote_object:
- recipient_inbox = remote_object['inbox']
- else:
- recipient_inbox = None
-
- # If the remote object does not have an INBOX, we check if it's a
- # Collection, in which case we retrieve all its items.
- if not recipient_inbox:
- log.debug('Recipient is not an Actor. Checking if it\'s a '
- 'collection.')
-
- # Do not follow any more Collections.
- if depth < 1:
- log.debug('Max number of indirections reached. I will not '
- 'expand any more collections.')
- return
-
- if any(collection == remote_object['type'] for collection in
- [ 'Collection', 'OrderedCollection',
- 'CollectionPage', 'OrderedCollectionPage' ]):
-
- items = []
- page = []
-
- if 'items' in remote_object:
- if isinstance(remote_object['items'], str):
- items.append(remote_object['items'])
- else:
- items.extend(remote_object['items'])
-
- if 'orderedItems' in remote_object:
- if isinstance(remote_object['orderedItems'], str):
- items.append(remote_object['orderedItems'])
- else:
- items.extend(remote_object['orderedItems'])
-
- if 'first' in remote_object:
- page.append(remote_object['first'])
-
- if 'next' in remote_object:
- page.append(remote_object['next'])
-
- # Schedule a new delivery for every object found in the collection
- for recipient_uri in items:
- post.delay(activity, recipient_uri, depth - 1)
-
- # TODO If a page "next" links to a previous page (which should
- # not happen), this will not detect the loop.
- for recipient_uri in page:
- post.delay(activity, recipient_uri, depth)
-
- if len(items) == 0 and len(page) == 0:
- log.debug('Collection found, but it contains no items. '
- + 'Activity will not be sent.')
-
- # Since this object does *not* have an INBOX, we stop here for this
- # task.
- return
-
- # Check if this Activity was already sent (successfully) to this INBOX.
- # If it was, we do not resend the same Activity twice. This situation
- # could happen with a sharedInbox, or if for some reasons the same
- # Activity is sent twice (maybe the job queue didn't remove it?).
- # NOTE The collection "recipient_inbox" is fictitious because we
- # cannot have actual access to a remote Actor's INBOX.
- if database.query(
- database.query(model.Collection)
- .filter(model.Collection.uri == recipient_inbox)
- .filter(model.Collection.item == activity['id'])
- .exists()
- ).scalar():
-
- log.warning(
- 'Activity ' + activity['id'] + ' was already delivered to '
- 'the INBOX ' + recipient_inbox + '. Not sending again.')
-
- return
-
- # Retrieve the private key of the local Actor for signing the HTTP request
- key = database.query(model.GpgKey) \
- .filter(model.GpgKey.actor_uri == actor_uri) \
- .one_or_none()
-
- if not key:
- log.debug('Actor {} does not have a key. Cannot sign HTTP request.')
- return
-
- # This will add a "Signature:" header to the HTTP request.
- auth_method = requests_http_signature.HTTPSignatureHeaderAuth(
- key = key.private,
- key_id = key.uri,
- algorithm = 'rsa-sha256',
- headers = [ '(request-target)', 'host', 'date', 'digest' ])
- # passphrase = None,
- # expires_in = None)
-
- log.debug('Posting Activity ' + activity['id'] + ' to ' + recipient_inbox)
- log.debug(json.dumps(activity, indent=4, sort_keys=True))
-
- # Finally! Send out the Activity to the INBOX of the remote Actor
- response = requests.post(
- recipient_inbox,
- headers=activitypub.REQUEST_HEADERS,
- data=json.dumps(activity).encode('UTF-8'),
- auth=auth_method)
-
- log.debug('Activity ' + activity['id'] + ' POST return code: '
- + str(response.status_code))
-
- # ActivityPub instances could return "200 OK" or "202 Accepted" upon
- # receiving the Activity.
- assert 200 <= response.status_code < 300, 'Error [HTTP {}] when POSTing Activity'.format(response.status_code)
-
- log.debug('Activity posted without errors.')
-
- # Save in the database that this Activity was delivered successfully to
- # the remote Actor. This way it will not be resent twice (just in case).
- database.add(model.Collection(
- uri = recipient_inbox,
- item = activity['id']))
- @broker.task
- def validate_incoming_activity(actor_uri, activity):
- """
- This task is scheduled after receiving a new activity from a remote actor.
- It's called from the actor_receive() view in app.py.
- When a new Activity has been received the plugin will schedule a new task
- to validate the Activity, then return immediately.
-
- :param actor_uri: URI of the Actor that has received the Activity
- :type actor_uri: str
-
- :param activity: The incoming activity document.
- :type activity: dict
- """
-
- if not actor_uri:
- log.info('Missing Actor. Ignoring task.')
- return
-
- if not activity:
- log.info('Missing Activity. Ignoring task.')
- return
-
- log.debug('Actor {} has received a new Activity with id {}:\n{}'.format(
- actor_uri, activity['id'], json.dumps(activity, indent=4, sort_keys=True)))
-
- with database_session() as database:
-
- # Recreate the actor class from its URI
- actor = model.from_uri(database, actor_uri)
-
- if not actor:
- log.debug('Actor {} doesn\'t exist. Ignoring incoming Activity.'.format(actor_uri))
- return
-
- # Check if this Activity was already delivered to this Actor. If it was,
- # we don't do anything since it was already processed in the past. New
- # activities should not have the same ID of older ones.
- if database.query(database \
- .query(model.Collection) \
- .filter(model.Collection.uri == actor.inbox_uri) \
- .filter(model.Collection.item == activity['id']) \
- .exists() \
- ).scalar():
-
- log.debug('Activity already delivered to this actor.')
- return
-
- # Add the Activity to the Actor's INBOX
- database.add(model.Collection(uri = actor.inbox_uri, item = activity['id']))
-
- #######################################################################
- # Now, we could stop here after the Activity has been added to the
- # Actor's INBOX, but Pagure is not just a server it also works
- # as a user client with an interface that allows user interactions.
- # For this reason we go on and process the Activity, which will likely
- # result in changes to the Pagure database.
- # The INBOX can also be read by other clients (eg. desktop apps).
- #######################################################################
-
- #######################################################################
- # Because JSON-LD can represent the same graph in several different
- # ways, we should probably normalize the JSONLD object before passing
- # it to the actor for processing. This simplifies working with the object.
- # Normalization could mean "flattening" or "compaction" of the JSONLD
- # document.
- # However, this step is left out for now and not implemented unless
- # needed because the ActivityStream specs already specifies that
- # objects should be served in compact form:
- # https://www.w3.org/TR/social-web-protocols/#content-representation
- #
- # activity = normalize_activity(activity)
- #######################################################################
-
- # Check if this Activity was already received
- if database.query(database \
- .query(model.Resource) \
- .filter(model.Resource.uri == activity['id']) \
- .exists() \
- ).scalar():
-
- log.debug('This Activity was already received and parsed in the past. Ignoring {}'.format(activity['id']))
- return
-
- # Let's save a copy of the Activity in the database
- database.add(model.Resource(
- uri = activity['id'],
- document = json.dumps(activity)))
-
- # Schedule a task for updating our local instance with the content of
- # the Activity.
- # NOTE It's important to schedule a new task instead of executing the
- # Activity from this task because if something fails during the
- # execution of the Activity, only that part will be retried by the
- # scheduler. The delivery to an actor INBOX has already happened
- # (from this task) and won't be retried again.
-
- activities.perform.delay(actor_uri, activity)
|