123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- # -*- test-case-name: yadis.test.test_etxrd -*-
- """
- ElementTree interface to an XRD document.
- """
- __all__ = [
- 'nsTag',
- 'mkXRDTag',
- 'isXRDS',
- 'parseXRDS',
- 'getCanonicalID',
- 'getYadisXRD',
- 'getPriorityStrict',
- 'getPriority',
- 'prioSort',
- 'iterServices',
- 'expandService',
- 'expandServices',
- ]
- import sys
- import random
- from datetime import datetime
- from time import strptime
- from openid.oidutil import importElementTree
- ElementTree = importElementTree()
- # the different elementtree modules don't have a common exception
- # model. We just want to be able to catch the exceptions that signify
- # malformed XML data and wrap them, so that the other library code
- # doesn't have to know which XML library we're using.
- try:
- # Make the parser raise an exception so we can sniff out the type
- # of exceptions
- ElementTree.XML('> purposely malformed XML <')
- except (SystemExit, MemoryError, AssertionError, ImportError):
- raise
- except:
- XMLError = sys.exc_info()[0]
- from openid.yadis import xri
- class XRDSError(Exception):
- """An error with the XRDS document."""
- # The exception that triggered this exception
- reason = None
- class XRDSFraud(XRDSError):
- """Raised when there's an assertion in the XRDS that it does not have
- the authority to make.
- """
- def parseXRDS(text):
- """Parse the given text as an XRDS document.
- @return: ElementTree containing an XRDS document
- @raises XRDSError: When there is a parse error or the document does
- not contain an XRDS.
- """
- try:
- element = ElementTree.XML(text)
- except XMLError, why:
- exc = XRDSError('Error parsing document as XML')
- exc.reason = why
- raise exc
- else:
- tree = ElementTree.ElementTree(element)
- if not isXRDS(tree):
- raise XRDSError('Not an XRDS document')
- return tree
- XRD_NS_2_0 = 'xri://$xrd*($v*2.0)'
- XRDS_NS = 'xri://$xrds'
- def nsTag(ns, t):
- return '{%s}%s' % (ns, t)
- def mkXRDTag(t):
- """basestring -> basestring
- Create a tag name in the XRD 2.0 XML namespace suitable for using
- with ElementTree
- """
- return nsTag(XRD_NS_2_0, t)
- def mkXRDSTag(t):
- """basestring -> basestring
- Create a tag name in the XRDS XML namespace suitable for using
- with ElementTree
- """
- return nsTag(XRDS_NS, t)
- # Tags that are used in Yadis documents
- root_tag = mkXRDSTag('XRDS')
- service_tag = mkXRDTag('Service')
- xrd_tag = mkXRDTag('XRD')
- type_tag = mkXRDTag('Type')
- uri_tag = mkXRDTag('URI')
- expires_tag = mkXRDTag('Expires')
- # Other XRD tags
- canonicalID_tag = mkXRDTag('CanonicalID')
- def isXRDS(xrd_tree):
- """Is this document an XRDS document?"""
- root = xrd_tree.getroot()
- return root.tag == root_tag
- def getYadisXRD(xrd_tree):
- """Return the XRD element that should contain the Yadis services"""
- xrd = None
- # for the side-effect of assigning the last one in the list to the
- # xrd variable
- for xrd in xrd_tree.findall(xrd_tag):
- pass
- # There were no elements found, or else xrd would be set to the
- # last one
- if xrd is None:
- raise XRDSError('No XRD present in tree')
- return xrd
- def getXRDExpiration(xrd_element, default=None):
- """Return the expiration date of this XRD element, or None if no
- expiration was specified.
- @type xrd_element: ElementTree node
- @param default: The value to use as the expiration if no
- expiration was specified in the XRD.
- @rtype: datetime.datetime
- @raises ValueError: If the xrd:Expires element is present, but its
- contents are not formatted according to the specification.
- """
- expires_element = xrd_element.find(expires_tag)
- if expires_element is None:
- return default
- else:
- expires_string = expires_element.text
- # Will raise ValueError if the string is not the expected format
- expires_time = strptime(expires_string, "%Y-%m-%dT%H:%M:%SZ")
- return datetime(*expires_time[0:6])
- def getCanonicalID(iname, xrd_tree):
- """Return the CanonicalID from this XRDS document.
- @param iname: the XRI being resolved.
- @type iname: unicode
- @param xrd_tree: The XRDS output from the resolver.
- @type xrd_tree: ElementTree
- @returns: The XRI CanonicalID or None.
- @returntype: unicode or None
- """
- xrd_list = xrd_tree.findall(xrd_tag)
- xrd_list.reverse()
- try:
- canonicalID = xri.XRI(xrd_list[0].findall(canonicalID_tag)[0].text)
- except IndexError:
- return None
- childID = canonicalID.lower()
- for xrd in xrd_list[1:]:
- # XXX: can't use rsplit until we require python >= 2.4.
- parent_sought = childID[:childID.rindex('!')]
- parent = xri.XRI(xrd.findtext(canonicalID_tag))
- if parent_sought != parent.lower():
- raise XRDSFraud("%r can not come from %s" % (childID, parent))
- childID = parent_sought
- root = xri.rootAuthority(iname)
- if not xri.providerIsAuthoritative(root, childID):
- raise XRDSFraud("%r can not come from root %r" % (childID, root))
- return canonicalID
- class _Max(object):
- """Value that compares greater than any other value.
- Should only be used as a singleton. Implemented for use as a
- priority value for when a priority is not specified."""
- def __cmp__(self, other):
- if other is self:
- return 0
- return 1
- Max = _Max()
- def getPriorityStrict(element):
- """Get the priority of this element.
- Raises ValueError if the value of the priority is invalid. If no
- priority is specified, it returns a value that compares greater
- than any other value.
- """
- prio_str = element.get('priority')
- if prio_str is not None:
- prio_val = int(prio_str)
- if prio_val >= 0:
- return prio_val
- else:
- raise ValueError('Priority values must be non-negative integers')
- # Any errors in parsing the priority fall through to here
- return Max
- def getPriority(element):
- """Get the priority of this element
- Returns Max if no priority is specified or the priority value is invalid.
- """
- try:
- return getPriorityStrict(element)
- except ValueError:
- return Max
- def prioSort(elements):
- """Sort a list of elements that have priority attributes"""
- # Randomize the services before sorting so that equal priority
- # elements are load-balanced.
- random.shuffle(elements)
- prio_elems = [(getPriority(e), e) for e in elements]
- prio_elems.sort()
- sorted_elems = [s for (_, s) in prio_elems]
- return sorted_elems
- def iterServices(xrd_tree):
- """Return an iterable over the Service elements in the Yadis XRD
- sorted by priority"""
- xrd = getYadisXRD(xrd_tree)
- return prioSort(xrd.findall(service_tag))
- def sortedURIs(service_element):
- """Given a Service element, return a list of the contents of all
- URI tags in priority order."""
- return [uri_element.text for uri_element
- in prioSort(service_element.findall(uri_tag))]
- def getTypeURIs(service_element):
- """Given a Service element, return a list of the contents of all
- Type tags"""
- return [type_element.text for type_element
- in service_element.findall(type_tag)]
- def expandService(service_element):
- """Take a service element and expand it into an iterator of:
- ([type_uri], uri, service_element)
- """
- uris = sortedURIs(service_element)
- if not uris:
- uris = [None]
- expanded = []
- for uri in uris:
- type_uris = getTypeURIs(service_element)
- expanded.append((type_uris, uri, service_element))
- return expanded
- def expandServices(service_elements):
- """Take a sorted iterator of service elements and expand it into a
- sorted iterator of:
- ([type_uri], uri, service_element)
- There may be more than one item in the resulting list for each
- service element if there is more than one URI or type for a
- service, but each triple will be unique.
- If there is no URI or Type for a Service element, it will not
- appear in the result.
- """
- expanded = []
- for service_element in service_elements:
- expanded.extend(expandService(service_element))
- return expanded
|