123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- # -*- coding: utf-8 -*-
- """
- (c) 2020 - Copyright ...
-
- Authors:
- zPlus <zplus@peers.community>
- """
- import json
- import logging
- import pyld
- import rdflib
- from Crypto.PublicKey import RSA
- from . import activitypub
- from . import settings
- # This constant is used for rdflib persistent storage, as we need an identifier
- # when creating a new graph.
- GRAPH_NAME = 'forgefed_graph'
- # Define some RDF namespaces to use with rdflib
- AS = rdflib.Namespace('https://www.w3.org/ns/activitystreams#')
- SEC = rdflib.Namespace('https://w3id.org/security#')
- FORGE = rdflib.Namespace('https://forgefed.peers.community/ns#')
- log = logging.getLogger(__name__)
- class Graph(rdflib.Graph):
- """
- A class representing the Forgefed Graph.
- """
- def __init__(self, connect=True, create=False):
- """
- :param connect: Automatically connect graph to storage.
- :param create: Automatically try to create persistent storage.
- """
- if settings.STORAGE == 'berkeleydb':
- super().__init__(store='Sleepycat', identifier=GRAPH_NAME)
- elif settings.STORAGE == 'sqlalchemy':
- super().__init__(
- store=rdflib.plugin.get('SQLAlchemy', rdflib.store.Store)(identifier=GRAPH_NAME),
- identifier=GRAPH_NAME)
- else:
- error_message = 'Unknown storage: ' + settings.STORAGE
- log.critical(error_message)
- raise Exception(error_message)
- if create:
- self.connect(create=create)
- self.disconnect()
- if connect:
- self.connect()
- def connect(self, create=False):
- self.open(settings.STORAGE_PATH, create=create)
- def disconnect(self):
- self.close()
-
- def subgraph(self, triple):
- """
- rdflib only seems to have a triples() function that returns an iterator.
- There are not functions for returning triples as a rdflib.Graph object.
- This function allows to retrieve a set of triples and return a new
- graph.
-
- Since this function returns a Graph and not an iterator, do *not* use
- this when you're fetching too many triples because they're all loaded
- into memory.
-
- :param triple: The triple to match. Every triple that matches "triple"
- will be added to the subgraph.
- Example: (subject, None, None)
- """
-
- triples = self.triples(triple)
- new_graph = rdflib.Graph()
- new_graph += triples
- return new_graph
- def json_subgraph(self, triple):
- return pyld.jsonld.compact(
- json.loads(self.subgraph(triple).serialize(format='json-ld')),
- activitypub.cached_jsonld_context)
-
- def get_node(self, node_uri):
- """
- A special case of subgraph() for returning a single node of the graph.
- """
-
- return self.subgraph((rdflib.URIRef(node_uri), None, None))
-
- def get_json_node(self, node_uri):
- """
- A special case of json_subgraph() for returning a single node of the graph.
- """
-
- return self.json_subgraph((rdflib.URIRef(node_uri), None, None))
-
- # TODO can this be moved to the Actor class?
- def test_or_set_key(self, actor_id, key_id):
- """
- Test if an Actor already has a GPG key, otherwise automatically
- generate a new one and add it to the graph. These keys are used to sign
- HTTP requests.
- :param actor_id: ID (URL) of the ActivityPub Actor.
- :param key_id: ID (URL) of the GPG key object.
- """
- # Check if we already have a node in our graph for the key
- if (rdflib.URIRef(key_id), None, None) in self:
- return
- # Triples not found! Let's create a new key pair for the Actor
- key = RSA.generate(settings.HTTP_SIGNATURE_KEY_BITS)
- # And create the graph node for the key
- node = {
- '@context': activitypub.cached_jsonld_context,
- 'id': key_id,
- 'type': 'CryptographicKey',
- 'owner': actor_id,
- #'created': None,
- #'expires': None,
- #'revoked': None,
- 'privateKeyPem': key.export_key('PEM').decode('UTF-8'),
- 'publicKeyPem': key.publickey().export_key('PEM').decode('UTF-8')
- }
- # Finally add the new node to the graph
- self.parse(data=json.dumps(node), format='json-ld')
- self.commit()
-
- # TODO can this be moved to the Actor class?
- def is_valid_key(self, actor_uri, key_uri):
- """
- Check if the given key belongs to the given actor.
- """
-
- return (rdflib.URIRef(key_uri), rdflib.RDF.type, SEC.Key) in self \
- and \
- (rdflib.URIRef(key_uri), SEC.owner, rdflib.URIRef(actor_uri)) in self
-
- def test_or_set_ordered_collection(self, uri):
- """
- Test if an OrderedCollection already exists, otherwise automatically
- generate a new one and add it to the graph.
- :param uri: The URI of the object.
- """
- # Check if we already have a node in our graph for the key
- if (rdflib.URIRef(uri), None, None) in self:
- return
-
- collection = {
- '@context': activitypub.cached_jsonld_context,
- 'type': 'OrderedCollection',
- 'id': uri,
- 'totalItems': 0,
- 'first': {
- 'id': uri + '/0',
- 'type': 'OrderedCollectionPage',
- 'partOf': uri,
- # 'orderedItems': []
- },
- 'last': uri + '/0',
- 'current': uri + '/0' }
-
- self.parse(data=json.dumps(collection), format='json-ld')
- self.commit()
-
- def get_collection(self, collection_uri):
- """
- Used to retrieve a collection for displaying when "followers/following"
- objects are requested.
- """
-
- node = self.subgraph((rdflib.URIRef(collection_uri), None, None))
-
- # Retrieve all the @list nodes (rdf:rest)
- for a_object in self[ rdflib.URIRef(collection_uri)
- : (AS.items|AS.orderedItems)/(rdflib.RDF.rest*'*') ]:
-
- node += self.subgraph((rdflib.URIRef(a_object), None, None))
-
- return pyld.jsonld.compact(
- json.loads(node.serialize(format='json-ld')),
- activitypub.cached_jsonld_context)
-
- def remove_collection_page(self, page_uri):
- # Remove all the as:items @list nodes
- objs = self[ rdflib.URIRef(page_uri)
- : AS.items/(rdflib.RDF.rest*'*') ]
-
- for a_object in objs:
- self.remove((rdflib.URIRef(a_object), None, None))
-
- # Remove the page node
- self.remove((rdflib.URIRef(page_uri), AS.items, None))
- self.commit()
-
- def add_collection_item(self, collection_uri, object_uri):
- self.test_or_set_ordered_collection(collection_uri)
-
- # Get URI of the last page
- page = self.value(
- subject=rdflib.URIRef(collection_uri),
- predicate=AS.last)
-
- page = self.get_collection(page)
-
- # Append the new object to the list of ordered items
- # An OrderedCollection MUST be presented consistently in reverse chronological order.
- if 'orderedItems' not in page:
- page['orderedItems'] = [ object_uri ]
- else:
- page['orderedItems'].insert(0, object_uri)
-
- # Before updating the items list, we need to remove the old list. This
- # is required because a @list in RDF is the equivalent of a linked
- # list, where each item is a blank node with rdf:first and rdf:rest.
- # When we parse a JSON-LD document and insert the new ordered list, the
- # blank nodes are assigned a new ID (remember: blank nodes ID are not
- # fixed like proper IDs of non-blank nodes) and all the old nodes will
- # be unlinked albeit they still exist in the database.1
- self.remove_collection_page(page['id'])
-
- self.parse(data=json.dumps(page), format='json-ld')
- self.commit()
-
- def collection_contains(self, collection_uri, object_uri):
- # Graph slicing: https://rdflib.readthedocs.io/en/stable/utilities.html#slicing-graphs
- # Graph paths: https://rdflib.readthedocs.io/en/stable/apidocs/rdflib.html#module-rdflib.paths
- #
- # We only use paged collections. This means that a Collection will have
- # a as:first property pointing to the first CollectionPage. as:next are
- # optional, if there are more pages. The SPARQL path below checks
- # exactly for this. It starts from a Collection node, follows as:first,
- # follows any optional as:next, follows the (ordered) list of as:items,
- # and finally tries to match any rdf:first that links to the target
- # object. Remember: an RDF @list is basically a linked list consisting
- # of blank nodes with rdf:first and rdf:rest properties.
- return self[rdflib.URIRef(collection_uri)
- : AS.first/(AS.next*'*')/AS.items/(rdflib.RDF.first*'+')
- : rdflib.URIRef(object_uri) ]
- # Create the graph storage during initialization
- Graph(connect=False, create=True)
|