pape2.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. """An implementation of the OpenID Provider Authentication Policy
  2. Extension 1.0
  3. @see: http://openid.net/developers/specs/
  4. @since: 2.1.0
  5. """
  6. __all__ = [
  7. 'Request',
  8. 'Response',
  9. 'ns_uri',
  10. 'AUTH_PHISHING_RESISTANT',
  11. 'AUTH_MULTI_FACTOR',
  12. 'AUTH_MULTI_FACTOR_PHYSICAL',
  13. ]
  14. from openid.extension import Extension
  15. import re
  16. ns_uri = "http://specs.openid.net/extensions/pape/1.0"
  17. AUTH_MULTI_FACTOR_PHYSICAL = \
  18. 'http://schemas.openid.net/pape/policies/2007/06/multi-factor-physical'
  19. AUTH_MULTI_FACTOR = \
  20. 'http://schemas.openid.net/pape/policies/2007/06/multi-factor'
  21. AUTH_PHISHING_RESISTANT = \
  22. 'http://schemas.openid.net/pape/policies/2007/06/phishing-resistant'
  23. TIME_VALIDATOR = re.compile('^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\dZ$')
  24. class Request(Extension):
  25. """A Provider Authentication Policy request, sent from a relying
  26. party to a provider
  27. @ivar preferred_auth_policies: The authentication policies that
  28. the relying party prefers
  29. @type preferred_auth_policies: [str]
  30. @ivar max_auth_age: The maximum time, in seconds, that the relying
  31. party wants to allow to have elapsed before the user must
  32. re-authenticate
  33. @type max_auth_age: int or NoneType
  34. """
  35. ns_alias = 'pape'
  36. def __init__(self, preferred_auth_policies=None, max_auth_age=None):
  37. super(Request, self).__init__()
  38. if not preferred_auth_policies:
  39. preferred_auth_policies = []
  40. self.preferred_auth_policies = preferred_auth_policies
  41. self.max_auth_age = max_auth_age
  42. def __nonzero__(self):
  43. return bool(self.preferred_auth_policies or
  44. self.max_auth_age is not None)
  45. def addPolicyURI(self, policy_uri):
  46. """Add an acceptable authentication policy URI to this request
  47. This method is intended to be used by the relying party to add
  48. acceptable authentication types to the request.
  49. @param policy_uri: The identifier for the preferred type of
  50. authentication.
  51. @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-01.html#auth_policies
  52. """
  53. if policy_uri not in self.preferred_auth_policies:
  54. self.preferred_auth_policies.append(policy_uri)
  55. def getExtensionArgs(self):
  56. """@see: C{L{Extension.getExtensionArgs}}
  57. """
  58. ns_args = {
  59. 'preferred_auth_policies':' '.join(self.preferred_auth_policies)
  60. }
  61. if self.max_auth_age is not None:
  62. ns_args['max_auth_age'] = str(self.max_auth_age)
  63. return ns_args
  64. def fromOpenIDRequest(cls, request):
  65. """Instantiate a Request object from the arguments in a
  66. C{checkid_*} OpenID message
  67. """
  68. self = cls()
  69. args = request.message.getArgs(self.ns_uri)
  70. if args == {}:
  71. return None
  72. self.parseExtensionArgs(args)
  73. return self
  74. fromOpenIDRequest = classmethod(fromOpenIDRequest)
  75. def parseExtensionArgs(self, args):
  76. """Set the state of this request to be that expressed in these
  77. PAPE arguments
  78. @param args: The PAPE arguments without a namespace
  79. @rtype: None
  80. @raises ValueError: When the max_auth_age is not parseable as
  81. an integer
  82. """
  83. # preferred_auth_policies is a space-separated list of policy URIs
  84. self.preferred_auth_policies = []
  85. policies_str = args.get('preferred_auth_policies')
  86. if policies_str:
  87. for uri in policies_str.split(' '):
  88. if uri not in self.preferred_auth_policies:
  89. self.preferred_auth_policies.append(uri)
  90. # max_auth_age is base-10 integer number of seconds
  91. max_auth_age_str = args.get('max_auth_age')
  92. self.max_auth_age = None
  93. if max_auth_age_str:
  94. try:
  95. self.max_auth_age = int(max_auth_age_str)
  96. except ValueError:
  97. pass
  98. def preferredTypes(self, supported_types):
  99. """Given a list of authentication policy URIs that a provider
  100. supports, this method returns the subsequence of those types
  101. that are preferred by the relying party.
  102. @param supported_types: A sequence of authentication policy
  103. type URIs that are supported by a provider
  104. @returns: The sub-sequence of the supported types that are
  105. preferred by the relying party. This list will be ordered
  106. in the order that the types appear in the supported_types
  107. sequence, and may be empty if the provider does not prefer
  108. any of the supported authentication types.
  109. @returntype: [str]
  110. """
  111. return filter(self.preferred_auth_policies.__contains__,
  112. supported_types)
  113. Request.ns_uri = ns_uri
  114. class Response(Extension):
  115. """A Provider Authentication Policy response, sent from a provider
  116. to a relying party
  117. """
  118. ns_alias = 'pape'
  119. def __init__(self, auth_policies=None, auth_time=None,
  120. nist_auth_level=None):
  121. super(Response, self).__init__()
  122. if auth_policies:
  123. self.auth_policies = auth_policies
  124. else:
  125. self.auth_policies = []
  126. self.auth_time = auth_time
  127. self.nist_auth_level = nist_auth_level
  128. def addPolicyURI(self, policy_uri):
  129. """Add a authentication policy to this response
  130. This method is intended to be used by the provider to add a
  131. policy that the provider conformed to when authenticating the user.
  132. @param policy_uri: The identifier for the preferred type of
  133. authentication.
  134. @see: http://openid.net/specs/openid-provider-authentication-policy-extension-1_0-01.html#auth_policies
  135. """
  136. if policy_uri not in self.auth_policies:
  137. self.auth_policies.append(policy_uri)
  138. def fromSuccessResponse(cls, success_response):
  139. """Create a C{L{Response}} object from a successful OpenID
  140. library response
  141. (C{L{openid.consumer.consumer.SuccessResponse}}) response
  142. message
  143. @param success_response: A SuccessResponse from consumer.complete()
  144. @type success_response: C{L{openid.consumer.consumer.SuccessResponse}}
  145. @rtype: Response or None
  146. @returns: A provider authentication policy response from the
  147. data that was supplied with the C{id_res} response or None
  148. if the provider sent no signed PAPE response arguments.
  149. """
  150. self = cls()
  151. # PAPE requires that the args be signed.
  152. args = success_response.getSignedNS(self.ns_uri)
  153. # Only try to construct a PAPE response if the arguments were
  154. # signed in the OpenID response. If not, return None.
  155. if args is not None:
  156. self.parseExtensionArgs(args)
  157. return self
  158. else:
  159. return None
  160. def parseExtensionArgs(self, args, strict=False):
  161. """Parse the provider authentication policy arguments into the
  162. internal state of this object
  163. @param args: unqualified provider authentication policy
  164. arguments
  165. @param strict: Whether to raise an exception when bad data is
  166. encountered
  167. @returns: None. The data is parsed into the internal fields of
  168. this object.
  169. """
  170. policies_str = args.get('auth_policies')
  171. if policies_str and policies_str != 'none':
  172. self.auth_policies = policies_str.split(' ')
  173. nist_level_str = args.get('nist_auth_level')
  174. if nist_level_str:
  175. try:
  176. nist_level = int(nist_level_str)
  177. except ValueError:
  178. if strict:
  179. raise ValueError('nist_auth_level must be an integer between '
  180. 'zero and four, inclusive')
  181. else:
  182. self.nist_auth_level = None
  183. else:
  184. if 0 <= nist_level < 5:
  185. self.nist_auth_level = nist_level
  186. auth_time = args.get('auth_time')
  187. if auth_time:
  188. if TIME_VALIDATOR.match(auth_time):
  189. self.auth_time = auth_time
  190. elif strict:
  191. raise ValueError("auth_time must be in RFC3339 format")
  192. fromSuccessResponse = classmethod(fromSuccessResponse)
  193. def getExtensionArgs(self):
  194. """@see: C{L{Extension.getExtensionArgs}}
  195. """
  196. if len(self.auth_policies) == 0:
  197. ns_args = {
  198. 'auth_policies':'none',
  199. }
  200. else:
  201. ns_args = {
  202. 'auth_policies':' '.join(self.auth_policies),
  203. }
  204. if self.nist_auth_level is not None:
  205. if self.nist_auth_level not in range(0, 5):
  206. raise ValueError('nist_auth_level must be an integer between '
  207. 'zero and four, inclusive')
  208. ns_args['nist_auth_level'] = str(self.nist_auth_level)
  209. if self.auth_time is not None:
  210. if not TIME_VALIDATOR.match(self.auth_time):
  211. raise ValueError('auth_time must be in RFC3339 format')
  212. ns_args['auth_time'] = self.auth_time
  213. return ns_args
  214. Response.ns_uri = ns_uri