test_jabbersasl.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. from __future__ import absolute_import, division
  4. from zope.interface import implementer
  5. from twisted.internet import defer
  6. from twisted.python.compat import unicode
  7. from twisted.trial import unittest
  8. from twisted.words.protocols.jabber import sasl, sasl_mechanisms, xmlstream, jid
  9. from twisted.words.xish import domish
  10. NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
  11. @implementer(sasl_mechanisms.ISASLMechanism)
  12. class DummySASLMechanism(object):
  13. """
  14. Dummy SASL mechanism.
  15. This just returns the initialResponse passed on creation, stores any
  16. challenges and replies with the value of C{response}.
  17. @ivar challenge: Last received challenge.
  18. @type challenge: C{unicode}.
  19. @ivar initialResponse: Initial response to be returned when requested
  20. via C{getInitialResponse} or L{None}.
  21. @type initialResponse: C{unicode}
  22. """
  23. challenge = None
  24. name = u"DUMMY"
  25. response = b""
  26. def __init__(self, initialResponse):
  27. self.initialResponse = initialResponse
  28. def getInitialResponse(self):
  29. return self.initialResponse
  30. def getResponse(self, challenge):
  31. self.challenge = challenge
  32. return self.response
  33. class DummySASLInitiatingInitializer(sasl.SASLInitiatingInitializer):
  34. """
  35. Dummy SASL Initializer for initiating entities.
  36. This hardwires the SASL mechanism to L{DummySASLMechanism}, that is
  37. instantiated with the value of C{initialResponse}.
  38. @ivar initialResponse: The initial response to be returned by the
  39. dummy SASL mechanism or L{None}.
  40. @type initialResponse: C{unicode}.
  41. """
  42. initialResponse = None
  43. def setMechanism(self):
  44. self.mechanism = DummySASLMechanism(self.initialResponse)
  45. class SASLInitiatingInitializerTests(unittest.TestCase):
  46. """
  47. Tests for L{sasl.SASLInitiatingInitializer}
  48. """
  49. def setUp(self):
  50. self.output = []
  51. self.authenticator = xmlstream.Authenticator()
  52. self.xmlstream = xmlstream.XmlStream(self.authenticator)
  53. self.xmlstream.send = self.output.append
  54. self.xmlstream.connectionMade()
  55. self.xmlstream.dataReceived(b"<stream:stream xmlns='jabber:client' "
  56. b"xmlns:stream='http://etherx.jabber.org/streams' "
  57. b"from='example.com' id='12345' version='1.0'>")
  58. self.init = DummySASLInitiatingInitializer(self.xmlstream)
  59. def test_onFailure(self):
  60. """
  61. Test that the SASL error condition is correctly extracted.
  62. """
  63. failure = domish.Element(('urn:ietf:params:xml:ns:xmpp-sasl',
  64. 'failure'))
  65. failure.addElement('not-authorized')
  66. self.init._deferred = defer.Deferred()
  67. self.init.onFailure(failure)
  68. self.assertFailure(self.init._deferred, sasl.SASLAuthError)
  69. self.init._deferred.addCallback(lambda e:
  70. self.assertEqual('not-authorized',
  71. e.condition))
  72. return self.init._deferred
  73. def test_sendAuthInitialResponse(self):
  74. """
  75. Test starting authentication with an initial response.
  76. """
  77. self.init.initialResponse = b"dummy"
  78. self.init.start()
  79. auth = self.output[0]
  80. self.assertEqual(NS_XMPP_SASL, auth.uri)
  81. self.assertEqual(u'auth', auth.name)
  82. self.assertEqual(u'DUMMY', auth['mechanism'])
  83. self.assertEqual(u'ZHVtbXk=', unicode(auth))
  84. def test_sendAuthNoInitialResponse(self):
  85. """
  86. Test starting authentication without an initial response.
  87. """
  88. self.init.initialResponse = None
  89. self.init.start()
  90. auth = self.output[0]
  91. self.assertEqual(u'', str(auth))
  92. def test_sendAuthEmptyInitialResponse(self):
  93. """
  94. Test starting authentication where the initial response is empty.
  95. """
  96. self.init.initialResponse = b""
  97. self.init.start()
  98. auth = self.output[0]
  99. self.assertEqual('=', unicode(auth))
  100. def test_onChallenge(self):
  101. """
  102. Test receiving a challenge message.
  103. """
  104. d = self.init.start()
  105. challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
  106. challenge.addContent(u'bXkgY2hhbGxlbmdl')
  107. self.init.onChallenge(challenge)
  108. self.assertEqual(b'my challenge', self.init.mechanism.challenge)
  109. self.init.onSuccess(None)
  110. return d
  111. def test_onChallengeResponse(self):
  112. """
  113. A non-empty response gets encoded and included as character data.
  114. """
  115. d = self.init.start()
  116. challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
  117. challenge.addContent(u'bXkgY2hhbGxlbmdl')
  118. self.init.mechanism.response = b"response"
  119. self.init.onChallenge(challenge)
  120. response = self.output[1]
  121. self.assertEqual(u'cmVzcG9uc2U=', unicode(response))
  122. self.init.onSuccess(None)
  123. return d
  124. def test_onChallengeEmpty(self):
  125. """
  126. Test receiving an empty challenge message.
  127. """
  128. d = self.init.start()
  129. challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
  130. self.init.onChallenge(challenge)
  131. self.assertEqual(b'', self.init.mechanism.challenge)
  132. self.init.onSuccess(None)
  133. return d
  134. def test_onChallengeIllegalPadding(self):
  135. """
  136. Test receiving a challenge message with illegal padding.
  137. """
  138. d = self.init.start()
  139. challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
  140. challenge.addContent(u'bXkg=Y2hhbGxlbmdl')
  141. self.init.onChallenge(challenge)
  142. self.assertFailure(d, sasl.SASLIncorrectEncodingError)
  143. return d
  144. def test_onChallengeIllegalCharacters(self):
  145. """
  146. Test receiving a challenge message with illegal characters.
  147. """
  148. d = self.init.start()
  149. challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
  150. challenge.addContent(u'bXkg*Y2hhbGxlbmdl')
  151. self.init.onChallenge(challenge)
  152. self.assertFailure(d, sasl.SASLIncorrectEncodingError)
  153. return d
  154. def test_onChallengeMalformed(self):
  155. """
  156. Test receiving a malformed challenge message.
  157. """
  158. d = self.init.start()
  159. challenge = domish.Element((NS_XMPP_SASL, 'challenge'))
  160. challenge.addContent(u'a')
  161. self.init.onChallenge(challenge)
  162. self.assertFailure(d, sasl.SASLIncorrectEncodingError)
  163. return d
  164. class SASLInitiatingInitializerSetMechanismTests(unittest.TestCase):
  165. """
  166. Test for L{sasl.SASLInitiatingInitializer.setMechanism}.
  167. """
  168. def setUp(self):
  169. self.output = []
  170. self.authenticator = xmlstream.Authenticator()
  171. self.xmlstream = xmlstream.XmlStream(self.authenticator)
  172. self.xmlstream.send = self.output.append
  173. self.xmlstream.connectionMade()
  174. self.xmlstream.dataReceived("<stream:stream xmlns='jabber:client' "
  175. "xmlns:stream='http://etherx.jabber.org/streams' "
  176. "from='example.com' id='12345' version='1.0'>")
  177. self.init = sasl.SASLInitiatingInitializer(self.xmlstream)
  178. def _setMechanism(self, name):
  179. """
  180. Set up the XML Stream to have a SASL feature with the given mechanism.
  181. """
  182. feature = domish.Element((NS_XMPP_SASL, 'mechanisms'))
  183. feature.addElement('mechanism', content=name)
  184. self.xmlstream.features[(feature.uri, feature.name)] = feature
  185. self.init.setMechanism()
  186. return self.init.mechanism.name
  187. def test_anonymous(self):
  188. """
  189. Test setting ANONYMOUS as the authentication mechanism.
  190. """
  191. self.authenticator.jid = jid.JID('example.com')
  192. self.authenticator.password = None
  193. name = u"ANONYMOUS"
  194. self.assertEqual(name, self._setMechanism(name))
  195. def test_plain(self):
  196. """
  197. Test setting PLAIN as the authentication mechanism.
  198. """
  199. self.authenticator.jid = jid.JID('test@example.com')
  200. self.authenticator.password = 'secret'
  201. name = u"PLAIN"
  202. self.assertEqual(name, self._setMechanism(name))
  203. def test_digest(self):
  204. """
  205. Test setting DIGEST-MD5 as the authentication mechanism.
  206. """
  207. self.authenticator.jid = jid.JID('test@example.com')
  208. self.authenticator.password = 'secret'
  209. name = u"DIGEST-MD5"
  210. self.assertEqual(name, self._setMechanism(name))
  211. def test_notAcceptable(self):
  212. """
  213. Test using an unacceptable SASL authentication mechanism.
  214. """
  215. self.authenticator.jid = jid.JID('test@example.com')
  216. self.authenticator.password = u'secret'
  217. self.assertRaises(sasl.SASLNoAcceptableMechanism,
  218. self._setMechanism, u'SOMETHING_UNACCEPTABLE')
  219. def test_notAcceptableWithoutUser(self):
  220. """
  221. Test using an unacceptable SASL authentication mechanism with no JID.
  222. """
  223. self.authenticator.jid = jid.JID('example.com')
  224. self.authenticator.password = u'secret'
  225. self.assertRaises(sasl.SASLNoAcceptableMechanism,
  226. self._setMechanism, u'SOMETHING_UNACCEPTABLE')