123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.words.protocols.jabber.client}
- """
- from __future__ import absolute_import, division
- from hashlib import sha1
- from twisted.internet import defer
- from twisted.python.compat import unicode
- from twisted.trial import unittest
- from twisted.words.protocols.jabber import client, error, jid, xmlstream
- from twisted.words.protocols.jabber.sasl import SASLInitiatingInitializer
- from twisted.words.xish import utility
- IQ_AUTH_GET = '/iq[@type="get"]/query[@xmlns="jabber:iq:auth"]'
- IQ_AUTH_SET = '/iq[@type="set"]/query[@xmlns="jabber:iq:auth"]'
- NS_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
- IQ_BIND_SET = '/iq[@type="set"]/bind[@xmlns="%s"]' % NS_BIND
- NS_SESSION = 'urn:ietf:params:xml:ns:xmpp-session'
- IQ_SESSION_SET = '/iq[@type="set"]/session[@xmlns="%s"]' % NS_SESSION
- class CheckVersionInitializerTests(unittest.TestCase):
- def setUp(self):
- a = xmlstream.Authenticator()
- xs = xmlstream.XmlStream(a)
- self.init = client.CheckVersionInitializer(xs)
- def testSupported(self):
- """
- Test supported version number 1.0
- """
- self.init.xmlstream.version = (1, 0)
- self.init.initialize()
- def testNotSupported(self):
- """
- Test unsupported version number 0.0, and check exception.
- """
- self.init.xmlstream.version = (0, 0)
- exc = self.assertRaises(error.StreamError, self.init.initialize)
- self.assertEqual('unsupported-version', exc.condition)
- class InitiatingInitializerHarness(object):
- """
- Testing harness for interacting with XML stream initializers.
- This sets up an L{utility.XmlPipe} to create a communication channel between
- the initializer and the stubbed receiving entity. It features a sink and
- source side that both act similarly to a real L{xmlstream.XmlStream}. The
- sink is augmented with an authenticator to which initializers can be added.
- The harness also provides some utility methods to work with event observers
- and deferreds.
- """
- def setUp(self):
- self.output = []
- self.pipe = utility.XmlPipe()
- self.xmlstream = self.pipe.sink
- self.authenticator = xmlstream.ConnectAuthenticator('example.org')
- self.xmlstream.authenticator = self.authenticator
- def waitFor(self, event, handler):
- """
- Observe an output event, returning a deferred.
- The returned deferred will be fired when the given event has been
- observed on the source end of the L{XmlPipe} tied to the protocol
- under test. The handler is added as the first callback.
- @param event: The event to be observed. See
- L{utility.EventDispatcher.addOnetimeObserver}.
- @param handler: The handler to be called with the observed event object.
- @rtype: L{defer.Deferred}.
- """
- d = defer.Deferred()
- d.addCallback(handler)
- self.pipe.source.addOnetimeObserver(event, d.callback)
- return d
- class IQAuthInitializerTests(InitiatingInitializerHarness, unittest.TestCase):
- """
- Tests for L{client.IQAuthInitializer}.
- """
- def setUp(self):
- super(IQAuthInitializerTests, self).setUp()
- self.init = client.IQAuthInitializer(self.xmlstream)
- self.authenticator.jid = jid.JID('user@example.com/resource')
- self.authenticator.password = u'secret'
- def testPlainText(self):
- """
- Test plain-text authentication.
- Act as a server supporting plain-text authentication and expect the
- C{password} field to be filled with the password. Then act as if
- authentication succeeds.
- """
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
- The response informs the client that plain-text authentication
- is supported.
- """
- # Create server response
- response = xmlstream.toResponse(iq, 'result')
- response.addElement(('jabber:iq:auth', 'query'))
- response.query.addElement('username')
- response.query.addElement('password')
- response.query.addElement('resource')
- # Set up an observer for the next request we expect.
- d = self.waitFor(IQ_AUTH_SET, onAuthSet)
- # Send server response
- self.pipe.source.send(response)
- return d
- def onAuthSet(iq):
- """
- Called when the initializer sent the authentication request.
- The server checks the credentials and responds with an empty result
- signalling success.
- """
- self.assertEqual('user', unicode(iq.query.username))
- self.assertEqual('secret', unicode(iq.query.password))
- self.assertEqual('resource', unicode(iq.query.resource))
- # Send server response
- response = xmlstream.toResponse(iq, 'result')
- self.pipe.source.send(response)
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
- # Start the initializer
- d2 = self.init.initialize()
- return defer.gatherResults([d1, d2])
- def testDigest(self):
- """
- Test digest authentication.
- Act as a server supporting digest authentication and expect the
- C{digest} field to be filled with a sha1 digest of the concatenated
- stream session identifier and password. Then act as if authentication
- succeeds.
- """
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
- The response informs the client that digest authentication is
- supported.
- """
- # Create server response
- response = xmlstream.toResponse(iq, 'result')
- response.addElement(('jabber:iq:auth', 'query'))
- response.query.addElement('username')
- response.query.addElement('digest')
- response.query.addElement('resource')
- # Set up an observer for the next request we expect.
- d = self.waitFor(IQ_AUTH_SET, onAuthSet)
- # Send server response
- self.pipe.source.send(response)
- return d
- def onAuthSet(iq):
- """
- Called when the initializer sent the authentication request.
- The server checks the credentials and responds with an empty result
- signalling success.
- """
- self.assertEqual('user', unicode(iq.query.username))
- self.assertEqual(sha1(b'12345secret').hexdigest(),
- unicode(iq.query.digest))
- self.assertEqual('resource', unicode(iq.query.resource))
- # Send server response
- response = xmlstream.toResponse(iq, 'result')
- self.pipe.source.send(response)
- # Digest authentication relies on the stream session identifier. Set it.
- self.xmlstream.sid = u'12345'
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
- # Start the initializer
- d2 = self.init.initialize()
- return defer.gatherResults([d1, d2])
- def testFailRequestFields(self):
- """
- Test initializer failure of request for fields for authentication.
- """
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
- The server responds that the client is not authorized to authenticate.
- """
- response = error.StanzaError('not-authorized').toResponse(iq)
- self.pipe.source.send(response)
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
- # Start the initializer
- d2 = self.init.initialize()
- # The initialized should fail with a stanza error.
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
- def testFailAuth(self):
- """
- Test initializer failure to authenticate.
- """
- def onAuthGet(iq):
- """
- Called when the initializer sent a query for authentication methods.
- The response informs the client that plain-text authentication
- is supported.
- """
- # Send server response
- response = xmlstream.toResponse(iq, 'result')
- response.addElement(('jabber:iq:auth', 'query'))
- response.query.addElement('username')
- response.query.addElement('password')
- response.query.addElement('resource')
- # Set up an observer for the next request we expect.
- d = self.waitFor(IQ_AUTH_SET, onAuthSet)
- # Send server response
- self.pipe.source.send(response)
- return d
- def onAuthSet(iq):
- """
- Called when the initializer sent the authentication request.
- The server checks the credentials and responds with a not-authorized
- stanza error.
- """
- response = error.StanzaError('not-authorized').toResponse(iq)
- self.pipe.source.send(response)
- # Set up an observer for the request for authentication fields
- d1 = self.waitFor(IQ_AUTH_GET, onAuthGet)
- # Start the initializer
- d2 = self.init.initialize()
- # The initializer should fail with a stanza error.
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
- class BindInitializerTests(InitiatingInitializerHarness, unittest.TestCase):
- """
- Tests for L{client.BindInitializer}.
- """
- def setUp(self):
- super(BindInitializerTests, self).setUp()
- self.init = client.BindInitializer(self.xmlstream)
- self.authenticator.jid = jid.JID('user@example.com/resource')
- def testBasic(self):
- """
- Set up a stream, and act as if resource binding succeeds.
- """
- def onBind(iq):
- response = xmlstream.toResponse(iq, 'result')
- response.addElement((NS_BIND, 'bind'))
- response.bind.addElement('jid',
- content=u'user@example.com/other resource')
- self.pipe.source.send(response)
- def cb(result):
- self.assertEqual(jid.JID('user@example.com/other resource'),
- self.authenticator.jid)
- d1 = self.waitFor(IQ_BIND_SET, onBind)
- d2 = self.init.start()
- d2.addCallback(cb)
- return defer.gatherResults([d1, d2])
- def testFailure(self):
- """
- Set up a stream, and act as if resource binding fails.
- """
- def onBind(iq):
- response = error.StanzaError('conflict').toResponse(iq)
- self.pipe.source.send(response)
- d1 = self.waitFor(IQ_BIND_SET, onBind)
- d2 = self.init.start()
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
- class SessionInitializerTests(InitiatingInitializerHarness, unittest.TestCase):
- """
- Tests for L{client.SessionInitializer}.
- """
- def setUp(self):
- super(SessionInitializerTests, self).setUp()
- self.init = client.SessionInitializer(self.xmlstream)
- def testSuccess(self):
- """
- Set up a stream, and act as if session establishment succeeds.
- """
- def onSession(iq):
- response = xmlstream.toResponse(iq, 'result')
- self.pipe.source.send(response)
- d1 = self.waitFor(IQ_SESSION_SET, onSession)
- d2 = self.init.start()
- return defer.gatherResults([d1, d2])
- def testFailure(self):
- """
- Set up a stream, and act as if session establishment fails.
- """
- def onSession(iq):
- response = error.StanzaError('forbidden').toResponse(iq)
- self.pipe.source.send(response)
- d1 = self.waitFor(IQ_SESSION_SET, onSession)
- d2 = self.init.start()
- self.assertFailure(d2, error.StanzaError)
- return defer.gatherResults([d1, d2])
- class XMPPAuthenticatorTests(unittest.TestCase):
- """
- Test for both XMPPAuthenticator and XMPPClientFactory.
- """
- def testBasic(self):
- """
- Test basic operations.
- Setup an XMPPClientFactory, which sets up an XMPPAuthenticator, and let
- it produce a protocol instance. Then inspect the instance variables of
- the authenticator and XML stream objects.
- """
- self.client_jid = jid.JID('user@example.com/resource')
- # Get an XmlStream instance. Note that it gets initialized with the
- # XMPPAuthenticator (that has its associateWithXmlStream called) that
- # is in turn initialized with the arguments to the factory.
- xs = client.XMPPClientFactory(self.client_jid,
- 'secret').buildProtocol(None)
- # test authenticator's instance variables
- self.assertEqual('example.com', xs.authenticator.otherHost)
- self.assertEqual(self.client_jid, xs.authenticator.jid)
- self.assertEqual('secret', xs.authenticator.password)
- # test list of initializers
- version, tls, sasl, bind, session = xs.initializers
- self.assertIsInstance(tls, xmlstream.TLSInitiatingInitializer)
- self.assertIsInstance(sasl, SASLInitiatingInitializer)
- self.assertIsInstance(bind, client.BindInitializer)
- self.assertIsInstance(session, client.SessionInitializer)
- self.assertFalse(tls.required)
- self.assertTrue(sasl.required)
- self.assertFalse(bind.required)
- self.assertFalse(session.required)
|