123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- import dateutil.parser
- import logging
- try:
- from lxml import etree as ET
- logging.debug("running with lxml.etree")
- except ImportError as e:
- try:
- import elementtree.ElementTree as ET
- logging.debug("running with ElementTree")
- except ImportError as e:
- logging.error("Failed to import ElementTree from any known place")
- raise e
- NS = {
- 'atom': 'http://www.w3.org/2005/Atom',
- 'xhtml': 'http://www.w3.org/1999/xhtml',
- }
-
- def ns_tag(ns, tag):
- return '{{{!s}}}{!s}'.format(ns, tag)
- def atom_tag(tag):
- return ns_tag(NS['atom'], tag)
-
- def xhtml_tag(tag):
- return ns_tag(NS['xhtml'], tag)
-
- def element_factory(cls, el, **kwargs):
- if el is None:
- raise KeyError('Element not found')
- return cls(el, **kwargs)
- class AtomBaseElement():
- tree = None
- root = None
-
- def __init__(self, data, **kwargs):
- if len(kwargs) > 0:
- if not type(data) is str:
- raise TypeError('If you provide keyword arguments, the data parameter must be a tag name as a str')
- self.root = ET.Element(data)
- for key in kwargs:
- self.set_atom_property(key, kwargs[key])
-
- elif type(data) is ET._Element:
- self.root = data
- self.tree = self.root.getroottree()
- elif type(data) in (str, bytes) and data[:5] in ['<?xml', b'<?xml']:
- self.root = ET.fromstring(data.encode()) if type(data) is str else ET.fromstring(data)
- self.tree = self.root.getroottree()
- else:
- self.tree = ET.parse(data)
- self.root = self.tree.getroot()
-
- if self.tree is None:
- ValueError('Could not get an XML tree from provided data')
-
-
- def validate(self):
- pass
-
- def get_atom_property(self, tag, cls=None, ns=NS['atom']):
- if not type(tag) is str:
- raise TypeError('tag must be string')
-
- if cls is None:
- cls = cls_map[ns_tag(ns, tag)]
-
- try:
- return element_factory(cls, self.find(tag, ns=ns))
- except KeyError as e:
- return None
-
- def set_atom_property(self, tag, value):
- if not type(tag) is str:
- raise TypeError('tag must be string')
-
- tag = atom_tag(tag)
-
- cls = cls_map[tag]
-
- ''' find the property element '''
- el = self.find(tag)
- if el is None and value is None:
- ''' no property there to delete '''
- return
- elif value is None:
- ''' delete the property '''
- el.parent.remove(el)
- return
- elif el is None:
- el = ET.Element(tag)
- self.root.append(el)
-
- ''' create the AtomBaseElement extension and set a value through that '''
- element_factory(cls, el).set_value(value)
-
- @property
- def tag(self):
- return self.root.tag
- def find(self, tag, ns=NS['atom']):
- return self.root.find(ns_tag(ns, tag))
- def findall(self, tag, ns=NS['atom']):
- return self.root.findall(ns_tag(ns, tag))
- def get(self, attr):
- return self.root.get(attr)
-
- def set(self, attr, value):
- return self.root.set(attr, value)
- class Construct(AtomBaseElement):
- def get_value(self):
- return self.root.text
-
- def set_value(self, value):
- self.root.text = value
- class DateConstruct(Construct):
- def get_value(self):
- return dateutil.parser.parse(self.root.text)
-
- def set_value(self, value):
- if not isinstance(value, datetime):
- raise TypeError('dateconstruct set value must be datetime')
- self.root.text = value.isoformat()
- class TextConstruct(Construct):
- pass
- class UriConstruct(Construct):
- pass
- class AtomLink(Construct):
- def get_value(self):
- return self.root.get('href')
- def set_value(self, value):
- self.root.set('href', value)
-
- class ContentData(TextConstruct):
- mimetype = None
-
- def __init__(self, data, mimetype):
- super().__init__(data)
- self.mimetype = mimetype
- class ContentText(ContentData):
- def __init__(self, data):
- super().__init__(data, 'text/plain')
-
- class ContentHtml(ContentData):
- def __init__(self, data):
- super().__init__(data, 'text/html')
- class ContentXml(ContentData):
- def get_value(self):
- return self.root
-
- def set_value(self, value):
- if type(value) is str:
- value = ET.fromstring(value)
- if not type(value) is ET._Element:
- raise TypeError('You must provide an XML string or an Element here')
-
- raise IOError('Not implemented yet')
-
- def __str__(self):
- return self.root.text + ''.join(ET.tostring(el).decode() for el in self.root)
- class ContentXhtml(ContentXml):
- def __init__(self, data):
- super().__init__(data, 'text/xhtml')
- class ContentProcessingModel(Construct):
- @property
- def type(self):
- if self.get('type') is None:
- self.set('type', 'text')
- return self.get('type')
-
- @property
- def construct(self):
- '''
- Processing model: https://tools.ietf.org/html/rfc4287#section-4.1.3.3
-
- Atom Documents MUST conform to the following rules. Atom Processors
- MUST interpret atom:content according to the first applicable rule.
- '''
- if self.type == 'text':
- ''' 1. If the value of "type" is "text", the content of atom:content
- MUST NOT contain child elements. '''
- return ContentText(self.root)
-
- elif self.type == 'html':
- ''' 2. If the value of "type" is "html", the content of atom:content
- MUST NOT contain child elements and SHOULD be suitable for
- handling as HTML [HTML]. '''
- return ContentHtml(self.root)
-
- elif self.type == 'xhtml':
- ''' 3. If the value of "type" is "xhtml", the content of atom:content
- MUST be a single XHTML div element [XHTML] and SHOULD be suitable
- for handling as XHTML. The XHTML div element itself MUST NOT be
- considered part of the content. '''
- return ContentXhtml(self.root.find(ns_tag(NS['xhtml'], 'div')))
-
- elif self.type[-4:].lower() in [ '+xml', '/xml' ]:
- ''' 4. If the value of "type" is an XML media type [RFC3023] or ends
- with "+xml" or "/xml" (case insensitive), the content of
- atom:content MAY include child elements and SHOULD be suitable
- for handling as the indicated media type. If the "src" attribute
- is not provided, this would normally mean that the "atom:content"
- element would contain a single child element that would serve as
- the root element of the XML document of the indicated type. '''
- return ContentXml(self.root, self.type)
-
- elif self.type[:5].lower() == 'text/':
- ''' 5. If the value of "type" begins with "text/" (case insensitive),
- the content of atom:content MUST NOT contain child elements. '''
- return ContentData(self.root.text, self.type)
-
- else:
- ''' 6. For all other values of "type", the content of atom:content MUST
- be a valid Base64 encoding, as described in [RFC3548], section 3.
- When decoded, it SHOULD be suitable for handling as the indicated
- media type. '''
- return ContentData(base64_decode(self.root.text), self.type)
-
- def get_value(self):
- return self.construct.get_value()
-
- def __str__(self):
- return str(self.construct)
-
- class PersonConstruct(Construct):
- def get_value(self):
- return self
-
- @property
- def name(self):
- return self.get_atom_property('name')
- @name.setter
- def name(self, value):
- self.set_atom_property('name', value)
-
- @property
- def uri(self):
- return self.get_atom_property('uri')
- @uri.setter
- def uri(self, value):
- self.set_atom_property('uri', value)
- @property
- def email(self):
- return self.get_atom_property('email')
- @email.setter
- def email(self, value):
- self.set_atom_property('email', value)
- class AtomAuthor(PersonConstruct):
- def __init__(self, *args, **kwargs):
- if len(args) == 0:
- super().__init__(atom_tag('author'), **kwargs)
- else:
- super().__init__(*args, **kwargs)
-
- def set_value(self, value):
- if type(value) is type(self):
- for key in [ 'name', 'uri', 'email' ]:
- try:
- data = getattr(value, key).get_value()
- except AttributeError as e:
- data = None
- setattr(self, key, data)
- else:
- for key in value:
- setattr(self, key, value[key])
- def validate(self):
- if self.tag != atom_tag('author'):
- raise ValueError('Not an atom:author tag')
- class AtomContent(ContentProcessingModel):
- pass
-
- class AtomSummary(ContentProcessingModel):
- pass
- class AtomSubtitle(ContentProcessingModel):
- pass
- class AtomTitle(TextConstruct):
- pass
- cls_map = {
- atom_tag('id'): UriConstruct,
- atom_tag('published'): DateConstruct,
- atom_tag('updated'): DateConstruct,
- atom_tag('link'): AtomLink,
-
- atom_tag('author'): AtomAuthor,
- atom_tag('name'): TextConstruct,
- atom_tag('email'): TextConstruct,
- atom_tag('uri'): UriConstruct,
-
- atom_tag('title'): AtomTitle,
- atom_tag('content'): AtomContent,
- atom_tag('subtitle'): AtomSubtitle,
- atom_tag('summary'): AtomSummary,
- }
-
- class AtomFeed(AtomBaseElement):
- def __init__(self, *args, **kwargs):
- if len(args) == 0:
- super().__init__(atom_tag('feed'), **kwargs)
- else:
- super().__init__(*args, **kwargs)
- def find_link(self, rels, attr=None, mimetype=None, ns=NS['atom']):
- if not isinstance(rels, (list, tuple)):
- rels = (rels,)
- for el in self.root.findall("./{}".format(ns_tag(ns, 'link'))):
- if not el.get('rel') in rels:
- continue
- if mimetype and el.get('type') != mimetype:
- continue
- if attr:
- return el.get(attr)
- return element_factory(cls_map[ns_tag(ns, 'link')], el)
- @property
- def author(self):
- return self.get_atom_property('author')
- @author.setter
- def author(self, value):
- return self.set_atom_property('author', value)
- @property
- def updated(self):
- return self.get_atom_property('updated')
- @updated.setter
- def updated(self, value):
- if not isinstance(value, datetime):
- raise TypeError('You must provide a datetime')
- self.find('updated').text = value.isoformat()
-
- @property
- def id(self):
- return self.get_atom_property('id')
- @property
- def next(self):
- return self.find_link('next', mimetype='application/atom+xml')
- @property
- def prev(self):
- return self.find_link('prev', mimetype='application/atom+xml')
-
- @property
- def title(self):
- return self.get_atom_property('title')
- @title.setter
- def title(self, value):
- self.find('title').text = value
-
- @property
- def subtitle(self):
- return self.get_atom_property('subtitle')
- @subtitle.setter
- def subtitle(self, value):
- self.find('subtitle').text = value
-
- @property
- def entries(self):
- return AtomEntries(self.findall('entry'), feed=self)
-
- @property
- def categories(self):
- return AtomCategories(self.findall('category'), feed=self)
-
- def validate(self):
- if self.tag != atom_tag('feed'):
- raise TypeError('You did not supply an atom:feed')
- class AtomList(list):
- @property
- def length(self):
- return self.__len__()
-
- class AtomCategories(AtomList):
- def __init__(self, args, **kwargs):
- super().__init__(AtomCategory(arg, **kwargs) for arg in args)
- class AtomEntries(AtomList):
- feed = None
-
- def __init__(self, args, feed):
- self.feed = feed
- super().__init__(AtomEntry(arg, feed=feed) for arg in args)
-
- def append(self, value):
- self.feed.root.append(value.root)
- super().append(value)
- class AtomCategory(AtomBaseElement):
- feed = None
- entry = None
-
- def __init__(self, *args, feed=None, entry=None, **kwargs):
- if not feed is None and not entry is None:
- raise ValueError('An AtomCategory cannot have both a feed and entry as parent')
- self.feed = feed
- self.entry = entry
-
- super().__init__(*args, **kwargs)
-
- def __str__(self):
- return self.get('term')
-
- ''' Make the following act like dict keys
- @property
- def term(self):
- return self.get('term')
- @term.setter
- def term(self, value):
- return self.set('term', value)
-
- @property
- def label(self):
- return self.get('label')
- @label.setter
- def label(self, value):
- return self.set('label', value)
-
- @property
- def scheme(self):
- return self.get('scheme')
- @scheme.setter
- def label(self, value):
- return self.set('scheme', value)'''
- class AtomEntry(AtomBaseElement):
- feed = None
-
- def __init__(self, *args, feed=None, **kwargs):
- if len(args) == 0:
- super().__init__(atom_tag('entry'), **kwargs)
- else:
- super().__init__(*args, **kwargs)
- self.feed = feed
-
- @property
- def published(self):
- return self.get_atom_property('published')
- @published.setter
- def published(self, value):
- if not isinstance(value, datetime):
- raise TypeError('You must provide a datetime')
- self.find('published').text = value.isoformat()
-
- @property
- def updated(self):
- return self.get_atom_property('updated')
- @updated.setter
- def updated(self, value):
- if not isinstance(value, datetime):
- raise TypeError('You must provide a datetime')
- self.find('updated').text = value.isoformat()
-
- @property
- def id(self):
- return self.get_atom_property('id')
- @id.setter
- def id(self, value):
- self.get_atom_property('id').set_value(value)
-
- @property
- def author(self):
- return self.get_atom_property('author')
-
- @property
- def title(self):
- return self.get_atom_property('title')
-
- @title.setter
- def title(self, value):
- if not type(value) is str:
- raise TypeError('Cannot set title to non-string value')
- self.set_atom_property('title', value)
-
- @property
- def categories(self):
- return AtomCategories(self.findall('category'), entry=self)
-
- @property
- def content(self):
- return self.get_atom_property('content')
-
- @property
- def summary(self):
- return self.get_atom_property('summary')
-
- def validate(self):
- if self.tag != atom_tag('entry'):
- raise TypeError('You did not supply an atom:entry')
|