| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- from __future__ import absolute_import, division
- from zope.interface import implementer
- from twisted.internet import defer
- from twisted.python.compat import unicode
- from twisted.trial import unittest
- from twisted.words.protocols.jabber import sasl, sasl_mechanisms, xmlstream, jid
- from twisted.words.xish import domish
- NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
- @implementer(sasl_mechanisms.ISASLMechanism)
- class DummySASLMechanism(object):
- """
- Dummy SASL mechanism.
- This just returns the initialResponse passed on creation, stores any
- challenges and replies with the value of C{response}.
- @ivar challenge: Last received challenge.
- @type challenge: C{unicode}.
- @ivar initialResponse: Initial response to be returned when requested
- via C{getInitialResponse} or L{None}.
- @type initialResponse: C{unicode}
- """
- challenge = None
- name = u"DUMMY"
- response = b""
- def __init__(self, initialResponse):
- self.initialResponse = initialResponse
- def getInitialResponse(self):
- return self.initialResponse
- def getResponse(self, challenge):
- self.challenge = challenge
- return self.response
- class DummySASLInitiatingInitializer(sasl.SASLInitiatingInitializer):
- """
- Dummy SASL Initializer for initiating entities.
- This hardwires the SASL mechanism to L{DummySASLMechanism}, that is
- instantiated with the value of C{initialResponse}.
- @ivar initialResponse: The initial response to be returned by the
- dummy SASL mechanism or L{None}.
- @type initialResponse: C{unicode}.
- """
- initialResponse = None
- def setMechanism(self):
- self.mechanism = DummySASLMechanism(self.initialResponse)
- class SASLInitiatingInitializerTests(unittest.TestCase):
- """
- Tests for L{sasl.SASLInitiatingInitializer}
- """
- def setUp(self):
- self.output = []
- self.authenticator = xmlstream.Authenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- self.xmlstream.send = self.output.append
- self.xmlstream.connectionMade()
- self.xmlstream.dataReceived(b"<stream:stream xmlns='jabber:client' "
- b"xmlns:stream='http://etherx.jabber.org/streams' "
- b"from='example.com' id='12345' version='1.0'>")
- self.init = DummySASLInitiatingInitializer(self.xmlstream)
- def test_onFailure(self):
- """
- Test that the SASL error condition is correctly extracted.
- """
- failure = domish.Element(('urn:ietf:params:xml:ns:xmpp-sasl',
- 'failure'))
- failure.addElement('not-authorized')
- self.init._deferred = defer.Deferred()
- self.init.onFailure(failure)
- self.assertFailure(self.init._deferred, sasl.SASLAuthError)
- self.init._deferred.addCallback(lambda e:
- self.assertEqual('not-authorized',
- e.condition))
- return self.init._deferred
- def test_sendAuthInitialResponse(self):
- """
- Test starting authentication with an initial response.
- """
- self.init.initialResponse = b"dummy"
- self.init.start()
- auth = self.output[0]
- self.assertEqual(NS_XMPP_SASL, auth.uri)
- self.assertEqual(u'auth', auth.name)
- self.assertEqual(u'DUMMY', auth['mechanism'])
- self.assertEqual(u'ZHVtbXk=', unicode(auth))
- def test_sendAuthNoInitialResponse(self):
- """
- Test starting authentication without an initial response.
- """
- self.init.initialResponse = None
- self.init.start()
- auth = self.output[0]
- self.assertEqual(u'', str(auth))
- def test_sendAuthEmptyInitialResponse(self):
- """
- Test starting authentication where the initial response is empty.
- """
- self.init.initialResponse = b""
- self.init.start()
- auth = self.output[0]
- self.assertEqual('=', unicode(auth))
- def test_onChallenge(self):
- """
- Test receiving a challenge message.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- challenge.addContent(u'bXkgY2hhbGxlbmdl')
- self.init.onChallenge(challenge)
- self.assertEqual(b'my challenge', self.init.mechanism.challenge)
- self.init.onSuccess(None)
- return d
- def test_onChallengeResponse(self):
- """
- A non-empty response gets encoded and included as character data.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- challenge.addContent(u'bXkgY2hhbGxlbmdl')
- self.init.mechanism.response = b"response"
- self.init.onChallenge(challenge)
- response = self.output[1]
- self.assertEqual(u'cmVzcG9uc2U=', unicode(response))
- self.init.onSuccess(None)
- return d
- def test_onChallengeEmpty(self):
- """
- Test receiving an empty challenge message.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- self.init.onChallenge(challenge)
- self.assertEqual(b'', self.init.mechanism.challenge)
- self.init.onSuccess(None)
- return d
- def test_onChallengeIllegalPadding(self):
- """
- Test receiving a challenge message with illegal padding.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- challenge.addContent(u'bXkg=Y2hhbGxlbmdl')
- self.init.onChallenge(challenge)
- self.assertFailure(d, sasl.SASLIncorrectEncodingError)
- return d
- def test_onChallengeIllegalCharacters(self):
- """
- Test receiving a challenge message with illegal characters.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- challenge.addContent(u'bXkg*Y2hhbGxlbmdl')
- self.init.onChallenge(challenge)
- self.assertFailure(d, sasl.SASLIncorrectEncodingError)
- return d
- def test_onChallengeMalformed(self):
- """
- Test receiving a malformed challenge message.
- """
- d = self.init.start()
- challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
- challenge.addContent(u'a')
- self.init.onChallenge(challenge)
- self.assertFailure(d, sasl.SASLIncorrectEncodingError)
- return d
- class SASLInitiatingInitializerSetMechanismTests(unittest.TestCase):
- """
- Test for L{sasl.SASLInitiatingInitializer.setMechanism}.
- """
- def setUp(self):
- self.output = []
- self.authenticator = xmlstream.Authenticator()
- self.xmlstream = xmlstream.XmlStream(self.authenticator)
- self.xmlstream.send = self.output.append
- self.xmlstream.connectionMade()
- self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
- "xmlns:stream='http://etherx.jabber.org/streams' "
- "from='example.com' id='12345' version='1.0'>")
- self.init = sasl.SASLInitiatingInitializer(self.xmlstream)
- def _setMechanism(self, name):
- """
- Set up the XML Stream to have a SASL feature with the given mechanism.
- """
- feature = domish.Element((NS_XMPP_SASL, 'mechanisms'))
- feature.addElement('mechanism', content=name)
- self.xmlstream.features[(feature.uri, feature.name)] = feature
- self.init.setMechanism()
- return self.init.mechanism.name
- def test_anonymous(self):
- """
- Test setting ANONYMOUS as the authentication mechanism.
- """
- self.authenticator.jid = jid.JID('example.com')
- self.authenticator.password = None
- name = u"ANONYMOUS"
- self.assertEqual(name, self._setMechanism(name))
- def test_plain(self):
- """
- Test setting PLAIN as the authentication mechanism.
- """
- self.authenticator.jid = jid.JID('test@example.com')
- self.authenticator.password = 'secret'
- name = u"PLAIN"
- self.assertEqual(name, self._setMechanism(name))
- def test_digest(self):
- """
- Test setting DIGEST-MD5 as the authentication mechanism.
- """
- self.authenticator.jid = jid.JID('test@example.com')
- self.authenticator.password = 'secret'
- name = u"DIGEST-MD5"
- self.assertEqual(name, self._setMechanism(name))
- def test_notAcceptable(self):
- """
- Test using an unacceptable SASL authentication mechanism.
- """
- self.authenticator.jid = jid.JID('test@example.com')
- self.authenticator.password = u'secret'
- self.assertRaises(sasl.SASLNoAcceptableMechanism,
- self._setMechanism, u'SOMETHING_UNACCEPTABLE')
- def test_notAcceptableWithoutUser(self):
- """
- Test using an unacceptable SASL authentication mechanism with no JID.
- """
- self.authenticator.jid = jid.JID('example.com')
- self.authenticator.password = u'secret'
- self.assertRaises(sasl.SASLNoAcceptableMechanism,
- self._setMechanism, u'SOMETHING_UNACCEPTABLE')
|