123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- # -*- test-case-name: twisted.words.test.test_jabbersaslmechanisms -*-
- #
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Protocol agnostic implementations of SASL authentication mechanisms.
- """
- from __future__ import absolute_import, division
- import binascii, random, time, os
- from hashlib import md5
- from zope.interface import Interface, Attribute, implementer
- from twisted.python.compat import iteritems, networkString
- class ISASLMechanism(Interface):
- name = Attribute("""Common name for the SASL Mechanism.""")
- def getInitialResponse():
- """
- Get the initial client response, if defined for this mechanism.
- @return: initial client response string.
- @rtype: C{str}.
- """
- def getResponse(challenge):
- """
- Get the response to a server challenge.
- @param challenge: server challenge.
- @type challenge: C{str}.
- @return: client response.
- @rtype: C{str}.
- """
- @implementer(ISASLMechanism)
- class Anonymous(object):
- """
- Implements the ANONYMOUS SASL authentication mechanism.
- This mechanism is defined in RFC 2245.
- """
- name = 'ANONYMOUS'
- def getInitialResponse(self):
- return None
- @implementer(ISASLMechanism)
- class Plain(object):
- """
- Implements the PLAIN SASL authentication mechanism.
- The PLAIN SASL authentication mechanism is defined in RFC 2595.
- """
- name = 'PLAIN'
- def __init__(self, authzid, authcid, password):
- """
- @param authzid: The authorization identity.
- @type authzid: L{unicode}
- @param authcid: The authentication identity.
- @type authcid: L{unicode}
- @param password: The plain-text password.
- @type password: L{unicode}
- """
- self.authzid = authzid or u''
- self.authcid = authcid or u''
- self.password = password or u''
- def getInitialResponse(self):
- return (self.authzid.encode('utf-8') + b"\x00" +
- self.authcid.encode('utf-8') + b"\x00" +
- self.password.encode('utf-8'))
- @implementer(ISASLMechanism)
- class DigestMD5(object):
- """
- Implements the DIGEST-MD5 SASL authentication mechanism.
- The DIGEST-MD5 SASL authentication mechanism is defined in RFC 2831.
- """
- name = 'DIGEST-MD5'
- def __init__(self, serv_type, host, serv_name, username, password):
- """
- @param serv_type: An indication of what kind of server authentication
- is being attempted against. For example, C{u"xmpp"}.
- @type serv_type: C{unicode}
- @param host: The authentication hostname. Also known as the realm.
- This is used as a scope to help select the right credentials.
- @type host: C{unicode}
- @param serv_name: An additional identifier for the server.
- @type serv_name: C{unicode}
- @param username: The authentication username to use to respond to a
- challenge.
- @type username: C{unicode}
- @param username: The authentication password to use to respond to a
- challenge.
- @type password: C{unicode}
- """
- self.username = username
- self.password = password
- self.defaultRealm = host
- self.digest_uri = u'%s/%s' % (serv_type, host)
- if serv_name is not None:
- self.digest_uri += u'/%s' % (serv_name,)
- def getInitialResponse(self):
- return None
- def getResponse(self, challenge):
- directives = self._parse(challenge)
- # Compat for implementations that do not send this along with
- # a successful authentication.
- if b'rspauth' in directives:
- return b''
- charset = directives[b'charset'].decode('ascii')
- try:
- realm = directives[b'realm']
- except KeyError:
- realm = self.defaultRealm.encode(charset)
- return self._genResponse(charset,
- realm,
- directives[b'nonce'])
- def _parse(self, challenge):
- """
- Parses the server challenge.
- Splits the challenge into a dictionary of directives with values.
- @return: challenge directives and their values.
- @rtype: C{dict} of C{str} to C{str}.
- """
- s = challenge
- paramDict = {}
- cur = 0
- remainingParams = True
- while remainingParams:
- # Parse a param. We can't just split on commas, because there can
- # be some commas inside (quoted) param values, e.g.:
- # qop="auth,auth-int"
- middle = s.index(b"=", cur)
- name = s[cur:middle].lstrip()
- middle += 1
- if s[middle:middle+1] == b'"':
- middle += 1
- end = s.index(b'"', middle)
- value = s[middle:end]
- cur = s.find(b',', end) + 1
- if cur == 0:
- remainingParams = False
- else:
- end = s.find(b',', middle)
- if end == -1:
- value = s[middle:].rstrip()
- remainingParams = False
- else:
- value = s[middle:end].rstrip()
- cur = end + 1
- paramDict[name] = value
- for param in (b'qop', b'cipher'):
- if param in paramDict:
- paramDict[param] = paramDict[param].split(b',')
- return paramDict
- def _unparse(self, directives):
- """
- Create message string from directives.
- @param directives: dictionary of directives (names to their values).
- For certain directives, extra quotes are added, as
- needed.
- @type directives: C{dict} of C{str} to C{str}
- @return: message string.
- @rtype: C{str}.
- """
- directive_list = []
- for name, value in iteritems(directives):
- if name in (b'username', b'realm', b'cnonce',
- b'nonce', b'digest-uri', b'authzid', b'cipher'):
- directive = name + b'=' + value
- else:
- directive = name + b'=' + value
- directive_list.append(directive)
- return b','.join(directive_list)
- def _calculateResponse(self, cnonce, nc, nonce,
- username, password, realm, uri):
- """
- Calculates response with given encoded parameters.
- @return: The I{response} field of a response to a Digest-MD5 challenge
- of the given parameters.
- @rtype: L{bytes}
- """
- def H(s):
- return md5(s).digest()
- def HEX(n):
- return binascii.b2a_hex(n)
- def KD(k, s):
- return H(k + b':' + s)
- a1 = (H(username + b":" + realm + b":" + password) + b":" +
- nonce + b":" +
- cnonce)
- a2 = b"AUTHENTICATE:" + uri
- response = HEX(KD(HEX(H(a1)),
- nonce + b":" + nc + b":" + cnonce + b":" +
- b"auth" + b":" + HEX(H(a2))))
- return response
- def _genResponse(self, charset, realm, nonce):
- """
- Generate response-value.
- Creates a response to a challenge according to section 2.1.2.1 of
- RFC 2831 using the C{charset}, C{realm} and C{nonce} directives
- from the challenge.
- """
- try:
- username = self.username.encode(charset)
- password = self.password.encode(charset)
- digest_uri = self.digest_uri.encode(charset)
- except UnicodeError:
- # TODO - add error checking
- raise
- nc = networkString('%08x' % (1,)) # TODO: support subsequent auth.
- cnonce = self._gen_nonce()
- qop = b'auth'
- # TODO - add support for authzid
- response = self._calculateResponse(cnonce, nc, nonce,
- username, password, realm,
- digest_uri)
- directives = {b'username': username,
- b'realm' : realm,
- b'nonce' : nonce,
- b'cnonce' : cnonce,
- b'nc' : nc,
- b'qop' : qop,
- b'digest-uri': digest_uri,
- b'response': response,
- b'charset': charset.encode('ascii')}
- return self._unparse(directives)
- def _gen_nonce(self):
- nonceString = "%f:%f:%d" % (random.random(), time.time(), os.getpid())
- nonceBytes = networkString(nonceString)
- return md5(nonceBytes).hexdigest().encode('ascii')
|