123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676 |
- # -*- test-case-name: twisted.mail.test.test_pop3client -*-
- # Copyright (c) 2001-2004 Divmod Inc.
- # See LICENSE for details.
- import sys
- import inspect
- from zope.interface import directlyProvides
- from twisted.mail.pop3 import AdvancedPOP3Client as POP3Client
- from twisted.mail.pop3 import InsecureAuthenticationDisallowed
- from twisted.mail.pop3 import ServerErrorResponse
- from twisted.protocols import loopback
- from twisted.internet import reactor, defer, error, protocol, interfaces
- from twisted.python import log
- from twisted.trial import unittest
- from twisted.test.proto_helpers import StringTransport
- from twisted.protocols import basic
- from twisted.mail.test import pop3testserver
- try:
- from twisted.test.ssl_helpers import ClientTLSContext, ServerTLSContext
- except ImportError:
- ClientTLSContext = ServerTLSContext = None
- class StringTransportWithConnectionLosing(StringTransport):
- def loseConnection(self):
- self.protocol.connectionLost(error.ConnectionDone())
- capCache = {"TOP": None, "LOGIN-DELAY": "180", "UIDL": None, \
- "STLS": None, "USER": None, "SASL": "LOGIN"}
- def setUp(greet=True):
- p = POP3Client()
- # Skip the CAPA login will issue if it doesn't already have a
- # capability cache
- p._capCache = capCache
- t = StringTransportWithConnectionLosing()
- t.protocol = p
- p.makeConnection(t)
- if greet:
- p.dataReceived('+OK Hello!\r\n')
- return p, t
- def strip(f):
- return lambda result, f=f: f()
- class POP3ClientLoginTests(unittest.TestCase):
- def testNegativeGreeting(self):
- p, t = setUp(greet=False)
- p.allowInsecureLogin = True
- d = p.login("username", "password")
- p.dataReceived('-ERR Offline for maintenance\r\n')
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "Offline for maintenance"))
- def testOkUser(self):
- p, t = setUp()
- d = p.user("username")
- self.assertEqual(t.value(), "USER username\r\n")
- p.dataReceived("+OK send password\r\n")
- return d.addCallback(self.assertEqual, "send password")
- def testBadUser(self):
- p, t = setUp()
- d = p.user("username")
- self.assertEqual(t.value(), "USER username\r\n")
- p.dataReceived("-ERR account suspended\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "account suspended"))
- def testOkPass(self):
- p, t = setUp()
- d = p.password("password")
- self.assertEqual(t.value(), "PASS password\r\n")
- p.dataReceived("+OK you're in!\r\n")
- return d.addCallback(self.assertEqual, "you're in!")
- def testBadPass(self):
- p, t = setUp()
- d = p.password("password")
- self.assertEqual(t.value(), "PASS password\r\n")
- p.dataReceived("-ERR go away\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "go away"))
- def testOkLogin(self):
- p, t = setUp()
- p.allowInsecureLogin = True
- d = p.login("username", "password")
- self.assertEqual(t.value(), "USER username\r\n")
- p.dataReceived("+OK go ahead\r\n")
- self.assertEqual(t.value(), "USER username\r\nPASS password\r\n")
- p.dataReceived("+OK password accepted\r\n")
- return d.addCallback(self.assertEqual, "password accepted")
- def testBadPasswordLogin(self):
- p, t = setUp()
- p.allowInsecureLogin = True
- d = p.login("username", "password")
- self.assertEqual(t.value(), "USER username\r\n")
- p.dataReceived("+OK waiting on you\r\n")
- self.assertEqual(t.value(), "USER username\r\nPASS password\r\n")
- p.dataReceived("-ERR bogus login\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "bogus login"))
- def testBadUsernameLogin(self):
- p, t = setUp()
- p.allowInsecureLogin = True
- d = p.login("username", "password")
- self.assertEqual(t.value(), "USER username\r\n")
- p.dataReceived("-ERR bogus login\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "bogus login"))
- def testServerGreeting(self):
- p, t = setUp(greet=False)
- p.dataReceived("+OK lalala this has no challenge\r\n")
- self.assertEqual(p.serverChallenge, None)
- def testServerGreetingWithChallenge(self):
- p, t = setUp(greet=False)
- p.dataReceived("+OK <here is the challenge>\r\n")
- self.assertEqual(p.serverChallenge, "<here is the challenge>")
- def testAPOP(self):
- p, t = setUp(greet=False)
- p.dataReceived("+OK <challenge string goes here>\r\n")
- d = p.login("username", "password")
- self.assertEqual(t.value(), "APOP username f34f1e464d0d7927607753129cabe39a\r\n")
- p.dataReceived("+OK Welcome!\r\n")
- return d.addCallback(self.assertEqual, "Welcome!")
- def testInsecureLoginRaisesException(self):
- p, t = setUp(greet=False)
- p.dataReceived("+OK Howdy\r\n")
- d = p.login("username", "password")
- self.assertFalse(t.value())
- return self.assertFailure(
- d, InsecureAuthenticationDisallowed)
- def testSSLTransportConsideredSecure(self):
- """
- If a server doesn't offer APOP but the transport is secured using
- SSL or TLS, a plaintext login should be allowed, not rejected with
- an InsecureAuthenticationDisallowed exception.
- """
- p, t = setUp(greet=False)
- directlyProvides(t, interfaces.ISSLTransport)
- p.dataReceived("+OK Howdy\r\n")
- d = p.login("username", "password")
- self.assertEqual(t.value(), "USER username\r\n")
- t.clear()
- p.dataReceived("+OK\r\n")
- self.assertEqual(t.value(), "PASS password\r\n")
- p.dataReceived("+OK\r\n")
- return d
- class ListConsumer:
- def __init__(self):
- self.data = {}
- def consume(self, result):
- (item, value) = result
- self.data.setdefault(item, []).append(value)
- class MessageConsumer:
- def __init__(self):
- self.data = []
- def consume(self, line):
- self.data.append(line)
- class POP3ClientListTests(unittest.TestCase):
- def testListSize(self):
- p, t = setUp()
- d = p.listSize()
- self.assertEqual(t.value(), "LIST\r\n")
- p.dataReceived("+OK Here it comes\r\n")
- p.dataReceived("1 3\r\n2 2\r\n3 1\r\n.\r\n")
- return d.addCallback(self.assertEqual, [3, 2, 1])
- def testListSizeWithConsumer(self):
- p, t = setUp()
- c = ListConsumer()
- f = c.consume
- d = p.listSize(f)
- self.assertEqual(t.value(), "LIST\r\n")
- p.dataReceived("+OK Here it comes\r\n")
- p.dataReceived("1 3\r\n2 2\r\n3 1\r\n")
- self.assertEqual(c.data, {0: [3], 1: [2], 2: [1]})
- p.dataReceived("5 3\r\n6 2\r\n7 1\r\n")
- self.assertEqual(c.data, {0: [3], 1: [2], 2: [1], 4: [3], 5: [2], 6: [1]})
- p.dataReceived(".\r\n")
- return d.addCallback(self.assertIdentical, f)
- def testFailedListSize(self):
- p, t = setUp()
- d = p.listSize()
- self.assertEqual(t.value(), "LIST\r\n")
- p.dataReceived("-ERR Fatal doom server exploded\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "Fatal doom server exploded"))
- def testListUID(self):
- p, t = setUp()
- d = p.listUID()
- self.assertEqual(t.value(), "UIDL\r\n")
- p.dataReceived("+OK Here it comes\r\n")
- p.dataReceived("1 abc\r\n2 def\r\n3 ghi\r\n.\r\n")
- return d.addCallback(self.assertEqual, ["abc", "def", "ghi"])
- def testListUIDWithConsumer(self):
- p, t = setUp()
- c = ListConsumer()
- f = c.consume
- d = p.listUID(f)
- self.assertEqual(t.value(), "UIDL\r\n")
- p.dataReceived("+OK Here it comes\r\n")
- p.dataReceived("1 xyz\r\n2 abc\r\n5 mno\r\n")
- self.assertEqual(c.data, {0: ["xyz"], 1: ["abc"], 4: ["mno"]})
- p.dataReceived(".\r\n")
- return d.addCallback(self.assertIdentical, f)
- def testFailedListUID(self):
- p, t = setUp()
- d = p.listUID()
- self.assertEqual(t.value(), "UIDL\r\n")
- p.dataReceived("-ERR Fatal doom server exploded\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "Fatal doom server exploded"))
- class POP3ClientMessageTests(unittest.TestCase):
- def testRetrieve(self):
- p, t = setUp()
- d = p.retrieve(7)
- self.assertEqual(t.value(), "RETR 8\r\n")
- p.dataReceived("+OK Message incoming\r\n")
- p.dataReceived("La la la here is message text\r\n")
- p.dataReceived("..Further message text tra la la\r\n")
- p.dataReceived(".\r\n")
- return d.addCallback(
- self.assertEqual,
- ["La la la here is message text",
- ".Further message text tra la la"])
- def testRetrieveWithConsumer(self):
- p, t = setUp()
- c = MessageConsumer()
- f = c.consume
- d = p.retrieve(7, f)
- self.assertEqual(t.value(), "RETR 8\r\n")
- p.dataReceived("+OK Message incoming\r\n")
- p.dataReceived("La la la here is message text\r\n")
- p.dataReceived("..Further message text\r\n.\r\n")
- return d.addCallback(self._cbTestRetrieveWithConsumer, f, c)
- def _cbTestRetrieveWithConsumer(self, result, f, c):
- self.assertIdentical(result, f)
- self.assertEqual(c.data, ["La la la here is message text",
- ".Further message text"])
- def testPartialRetrieve(self):
- p, t = setUp()
- d = p.retrieve(7, lines=2)
- self.assertEqual(t.value(), "TOP 8 2\r\n")
- p.dataReceived("+OK 2 lines on the way\r\n")
- p.dataReceived("Line the first! Woop\r\n")
- p.dataReceived("Line the last! Bye\r\n")
- p.dataReceived(".\r\n")
- return d.addCallback(
- self.assertEqual,
- ["Line the first! Woop",
- "Line the last! Bye"])
- def testPartialRetrieveWithConsumer(self):
- p, t = setUp()
- c = MessageConsumer()
- f = c.consume
- d = p.retrieve(7, f, lines=2)
- self.assertEqual(t.value(), "TOP 8 2\r\n")
- p.dataReceived("+OK 2 lines on the way\r\n")
- p.dataReceived("Line the first! Woop\r\n")
- p.dataReceived("Line the last! Bye\r\n")
- p.dataReceived(".\r\n")
- return d.addCallback(self._cbTestPartialRetrieveWithConsumer, f, c)
- def _cbTestPartialRetrieveWithConsumer(self, result, f, c):
- self.assertIdentical(result, f)
- self.assertEqual(c.data, ["Line the first! Woop",
- "Line the last! Bye"])
- def testFailedRetrieve(self):
- p, t = setUp()
- d = p.retrieve(0)
- self.assertEqual(t.value(), "RETR 1\r\n")
- p.dataReceived("-ERR Fatal doom server exploded\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "Fatal doom server exploded"))
- def test_concurrentRetrieves(self):
- """
- Issue three retrieve calls immediately without waiting for any to
- succeed and make sure they all do succeed eventually.
- """
- p, t = setUp()
- messages = [
- p.retrieve(i).addCallback(
- self.assertEqual,
- ["First line of %d." % (i + 1,),
- "Second line of %d." % (i + 1,)])
- for i
- in range(3)]
- for i in range(1, 4):
- self.assertEqual(t.value(), "RETR %d\r\n" % (i,))
- t.clear()
- p.dataReceived("+OK 2 lines on the way\r\n")
- p.dataReceived("First line of %d.\r\n" % (i,))
- p.dataReceived("Second line of %d.\r\n" % (i,))
- self.assertEqual(t.value(), "")
- p.dataReceived(".\r\n")
- return defer.DeferredList(messages, fireOnOneErrback=True)
- class POP3ClientMiscTests(unittest.TestCase):
- def testCapability(self):
- p, t = setUp()
- d = p.capabilities(useCache=0)
- self.assertEqual(t.value(), "CAPA\r\n")
- p.dataReceived("+OK Capabilities on the way\r\n")
- p.dataReceived("X\r\nY\r\nZ\r\nA 1 2 3\r\nB 1 2\r\nC 1\r\n.\r\n")
- return d.addCallback(
- self.assertEqual,
- {"X": None, "Y": None, "Z": None,
- "A": ["1", "2", "3"],
- "B": ["1", "2"],
- "C": ["1"]})
- def testCapabilityError(self):
- p, t = setUp()
- d = p.capabilities(useCache=0)
- self.assertEqual(t.value(), "CAPA\r\n")
- p.dataReceived("-ERR This server is lame!\r\n")
- return d.addCallback(self.assertEqual, {})
- def testStat(self):
- p, t = setUp()
- d = p.stat()
- self.assertEqual(t.value(), "STAT\r\n")
- p.dataReceived("+OK 1 1212\r\n")
- return d.addCallback(self.assertEqual, (1, 1212))
- def testStatError(self):
- p, t = setUp()
- d = p.stat()
- self.assertEqual(t.value(), "STAT\r\n")
- p.dataReceived("-ERR This server is lame!\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "This server is lame!"))
- def testNoop(self):
- p, t = setUp()
- d = p.noop()
- self.assertEqual(t.value(), "NOOP\r\n")
- p.dataReceived("+OK No-op to you too!\r\n")
- return d.addCallback(self.assertEqual, "No-op to you too!")
- def testNoopError(self):
- p, t = setUp()
- d = p.noop()
- self.assertEqual(t.value(), "NOOP\r\n")
- p.dataReceived("-ERR This server is lame!\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "This server is lame!"))
- def testRset(self):
- p, t = setUp()
- d = p.reset()
- self.assertEqual(t.value(), "RSET\r\n")
- p.dataReceived("+OK Reset state\r\n")
- return d.addCallback(self.assertEqual, "Reset state")
- def testRsetError(self):
- p, t = setUp()
- d = p.reset()
- self.assertEqual(t.value(), "RSET\r\n")
- p.dataReceived("-ERR This server is lame!\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "This server is lame!"))
- def testDelete(self):
- p, t = setUp()
- d = p.delete(3)
- self.assertEqual(t.value(), "DELE 4\r\n")
- p.dataReceived("+OK Hasta la vista\r\n")
- return d.addCallback(self.assertEqual, "Hasta la vista")
- def testDeleteError(self):
- p, t = setUp()
- d = p.delete(3)
- self.assertEqual(t.value(), "DELE 4\r\n")
- p.dataReceived("-ERR Winner is not you.\r\n")
- return self.assertFailure(
- d, ServerErrorResponse).addCallback(
- lambda exc: self.assertEqual(exc.args[0], "Winner is not you."))
- class SimpleClient(POP3Client):
- def __init__(self, deferred, contextFactory = None):
- self.deferred = deferred
- self.allowInsecureLogin = True
- def serverGreeting(self, challenge):
- self.deferred.callback(None)
- class POP3HelperMixin:
- serverCTX = None
- clientCTX = None
- def setUp(self):
- d = defer.Deferred()
- self.server = pop3testserver.POP3TestServer(contextFactory=self.serverCTX)
- self.client = SimpleClient(d, contextFactory=self.clientCTX)
- self.client.timeout = 30
- self.connected = d
- def tearDown(self):
- del self.server
- del self.client
- del self.connected
- def _cbStopClient(self, ignore):
- self.client.transport.loseConnection()
- def _ebGeneral(self, failure):
- self.client.transport.loseConnection()
- self.server.transport.loseConnection()
- return failure
- def loopback(self):
- return loopback.loopbackTCP(self.server, self.client, noisy=False)
- class TLSServerFactory(protocol.ServerFactory):
- class protocol(basic.LineReceiver):
- context = None
- output = []
- def connectionMade(self):
- self.factory.input = []
- self.output = self.output[:]
- map(self.sendLine, self.output.pop(0))
- def lineReceived(self, line):
- self.factory.input.append(line)
- map(self.sendLine, self.output.pop(0))
- if line == 'STLS':
- self.transport.startTLS(self.context)
- class POP3TLSTests(unittest.TestCase):
- """
- Tests for POP3Client's support for TLS connections.
- """
- def test_startTLS(self):
- """
- POP3Client.startTLS starts a TLS session over its existing TCP
- connection.
- """
- sf = TLSServerFactory()
- sf.protocol.output = [
- ['+OK'], # Server greeting
- ['+OK', 'STLS', '.'], # CAPA response
- ['+OK'], # STLS response
- ['+OK', '.'], # Second CAPA response
- ['+OK'] # QUIT response
- ]
- sf.protocol.context = ServerTLSContext()
- port = reactor.listenTCP(0, sf, interface='127.0.0.1')
- self.addCleanup(port.stopListening)
- H = port.getHost().host
- P = port.getHost().port
- connLostDeferred = defer.Deferred()
- cp = SimpleClient(defer.Deferred(), ClientTLSContext())
- def connectionLost(reason):
- SimpleClient.connectionLost(cp, reason)
- connLostDeferred.callback(None)
- cp.connectionLost = connectionLost
- cf = protocol.ClientFactory()
- cf.protocol = lambda: cp
- conn = reactor.connectTCP(H, P, cf)
- def cbConnected(ignored):
- log.msg("Connected to server; starting TLS")
- return cp.startTLS()
- def cbStartedTLS(ignored):
- log.msg("Started TLS; disconnecting")
- return cp.quit()
- def cbDisconnected(ign):
- log.msg("Disconnected; asserting correct input received")
- self.assertEqual(
- sf.input,
- ['CAPA', 'STLS', 'CAPA', 'QUIT'])
- def cleanup(result):
- log.msg("Asserted correct input; disconnecting client and shutting down server")
- conn.disconnect()
- return connLostDeferred
- cp.deferred.addCallback(cbConnected)
- cp.deferred.addCallback(cbStartedTLS)
- cp.deferred.addCallback(cbDisconnected)
- cp.deferred.addBoth(cleanup)
- return cp.deferred
- class POP3TimeoutTests(POP3HelperMixin, unittest.TestCase):
- def testTimeout(self):
- def login():
- d = self.client.login('test', 'twisted')
- d.addCallback(loggedIn)
- d.addErrback(timedOut)
- return d
- def loggedIn(result):
- self.fail("Successfully logged in!? Impossible!")
- def timedOut(failure):
- failure.trap(error.TimeoutError)
- self._cbStopClient(None)
- def quit():
- return self.client.quit()
- self.client.timeout = 0.01
- # Tell the server to not return a response to client. This
- # will trigger a timeout.
- pop3testserver.TIMEOUT_RESPONSE = True
- methods = [login, quit]
- map(self.connected.addCallback, map(strip, methods))
- self.connected.addCallback(self._cbStopClient)
- self.connected.addErrback(self._ebGeneral)
- return self.loopback()
- if ClientTLSContext is None:
- for case in (POP3TLSTests,):
- case.skip = "OpenSSL not present"
- elif interfaces.IReactorSSL(reactor, None) is None:
- for case in (POP3TLSTests,):
- case.skip = "Reactor doesn't support SSL"
- import twisted.mail.pop3client
- class POP3ClientModuleStructureTests(unittest.TestCase):
- """
- Miscellaneous tests more to do with module/package structure than
- anything to do with the POP3 client.
- """
- def test_all(self):
- """
- twisted.mail.pop3client.__all__ should be empty because all classes
- should be imported through twisted.mail.pop3.
- """
- self.assertEqual(twisted.mail.pop3client.__all__, [])
- def test_import(self):
- """
- Every public class in twisted.mail.pop3client should be available as a
- member of twisted.mail.pop3 with the exception of
- twisted.mail.pop3client.POP3Client which should be available as
- twisted.mail.pop3.AdvancedClient.
- """
- publicClasses = [c[0] for c in inspect.getmembers(
- sys.modules['twisted.mail.pop3client'],
- inspect.isclass)
- if not c[0][0] == '_']
- for pc in publicClasses:
- if not pc == 'POP3Client':
- self.assertTrue(hasattr(twisted.mail.pop3, pc))
- else:
- self.assertTrue(hasattr(twisted.mail.pop3,
- 'AdvancedPOP3Client'))
|