12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for large portions of L{twisted.mail}.
- """
- import os
- import errno
- import shutil
- import pickle
- import StringIO
- import email.message
- import email.parser
- import tempfile
- import signal
- import time
- from hashlib import md5
- from zope.interface.verify import verifyClass
- from zope.interface import Interface, implementer
- import twisted.cred.checkers
- import twisted.cred.credentials
- import twisted.cred.portal
- import twisted.mail.alias
- import twisted.mail.mail
- import twisted.mail.maildir
- import twisted.mail.protocols
- import twisted.mail.relay
- import twisted.mail.relaymanager
- from twisted import cred, mail
- from twisted.internet import (address, defer, interfaces, protocol,
- reactor, task)
- from twisted.internet.defer import Deferred
- from twisted.internet.error import (DNSLookupError, CannotListenError,
- ProcessDone, ProcessTerminated)
- from twisted.mail import pop3, smtp
- from twisted.mail.relaymanager import _AttemptManager
- from twisted.names import dns
- from twisted.names.dns import RRHeader, Record_CNAME, Record_MX
- from twisted.names.error import DNSNameError
- from twisted.python import failure, log
- from twisted.python.compat import range
- from twisted.python.filepath import FilePath
- from twisted.test.proto_helpers import (LineSendingProtocol,
- MemoryReactorClock, StringTransport)
- from twisted.trial import unittest
- class DomainWithDefaultsTests(unittest.TestCase):
- def testMethods(self):
- d = dict([(x, x + 10) for x in range(10)])
- d = mail.mail.DomainWithDefaultDict(d, 'Default')
- self.assertEqual(len(d), 10)
- self.assertEqual(list(iter(d)), list(range(10)))
- self.assertEqual(list(d.iterkeys()), list(iter(d)))
- items = list(d.iteritems())
- items.sort()
- self.assertEqual(items, [(x, x + 10) for x in range(10)])
- values = list(d.itervalues())
- values.sort()
- self.assertEqual(values, list(range(10, 20)))
- items = d.items()
- items.sort()
- self.assertEqual(items, [(x, x + 10) for x in range(10)])
- values = d.values()
- values.sort()
- self.assertEqual(values, list(range(10, 20)))
- for x in range(10):
- self.assertEqual(d[x], x + 10)
- self.assertEqual(d.get(x), x + 10)
- self.assertTrue(x in d)
- del d[2], d[4], d[6]
- self.assertEqual(len(d), 7)
- self.assertEqual(d[2], 'Default')
- self.assertEqual(d[4], 'Default')
- self.assertEqual(d[6], 'Default')
- d.update({'a': None, 'b': (), 'c': '*'})
- self.assertEqual(len(d), 10)
- self.assertEqual(d['a'], None)
- self.assertEqual(d['b'], ())
- self.assertEqual(d['c'], '*')
- d.clear()
- self.assertEqual(len(d), 0)
- self.assertEqual(d.setdefault('key', 'value'), 'value')
- self.assertEqual(d['key'], 'value')
- self.assertEqual(d.popitem(), ('key', 'value'))
- self.assertEqual(len(d), 0)
- dcopy = d.copy()
- self.assertEqual(d.domains, dcopy.domains)
- self.assertEqual(d.default, dcopy.default)
- def _stringificationTest(self, stringifier):
- """
- Assert that the class name of a L{mail.mail.DomainWithDefaultDict}
- instance and the string-formatted underlying domain dictionary both
- appear in the string produced by the given string-returning function.
- @type stringifier: one-argument callable
- @param stringifier: either C{str} or C{repr}, to be used to get a
- string to make assertions against.
- """
- domain = mail.mail.DomainWithDefaultDict({}, 'Default')
- self.assertIn(domain.__class__.__name__, stringifier(domain))
- domain['key'] = 'value'
- self.assertIn(str({'key': 'value'}), stringifier(domain))
- def test_str(self):
- """
- L{DomainWithDefaultDict.__str__} should return a string including
- the class name and the domain mapping held by the instance.
- """
- self._stringificationTest(str)
- def test_repr(self):
- """
- L{DomainWithDefaultDict.__repr__} should return a string including
- the class name and the domain mapping held by the instance.
- """
- self._stringificationTest(repr)
- def test_has_keyDeprecation(self):
- """
- has_key is now deprecated.
- """
- sut = mail.mail.DomainWithDefaultDict({}, 'Default')
- sut.has_key('anything')
- message = (
- 'twisted.mail.mail.DomainWithDefaultDict.has_key was deprecated '
- 'in Twisted 16.3.0. Use the `in` keyword instead.'
- )
- warnings = self.flushWarnings(
- [self.test_has_keyDeprecation])
- self.assertEqual(1, len(warnings))
- self.assertEqual(DeprecationWarning, warnings[0]['category'])
- self.assertEqual(message, warnings[0]['message'])
- class BounceTests(unittest.TestCase):
- def setUp(self):
- self.domain = mail.mail.BounceDomain()
- def testExists(self):
- self.assertRaises(smtp.AddressError, self.domain.exists, "any user")
- def testRelay(self):
- self.assertEqual(
- self.domain.willRelay("random q emailer", "protocol"),
- False
- )
- def testAddUser(self):
- self.domain.addUser("bob", "password")
- self.assertRaises(smtp.SMTPBadRcpt, self.domain.exists, "bob")
- class BounceWithSMTPServerTests(unittest.TestCase):
- """
- Tests for L{twisted.mail.mail.BounceDomain} with
- L{twisted.mail.smtp.SMTPServer}.
- """
- def test_rejected(self):
- """
- Incoming emails to a SMTP server with L{twisted.mail.mail.BounceDomain}
- are rejected.
- """
- service = mail.mail.MailService()
- domain = mail.mail.BounceDomain()
- service.addDomain(b'foo.com', domain)
- factory = mail.protocols.SMTPFactory(service)
- protocol = factory.buildProtocol(None)
- deliverer = mail.protocols.SMTPDomainDelivery(service, None, None)
- protocol.delivery = deliverer
- transport = StringTransport()
- protocol.makeConnection(transport)
- protocol.lineReceived(b'HELO baz.net')
- protocol.lineReceived(b'MAIL FROM:<a@baz.net>')
- protocol.lineReceived(b'RCPT TO:<any@foo.com>')
- protocol.lineReceived(b'QUIT')
- self.assertTrue(transport.disconnecting)
- protocol.connectionLost(None)
- self.assertEqual(transport.value().strip().split(b'\r\n')[-2],
- b'550 Cannot receive for specified address')
- class FileMessageTests(unittest.TestCase):
- def setUp(self):
- self.name = "fileMessage.testFile"
- self.final = "final.fileMessage.testFile"
- self.f = open(self.name, 'w')
- self.fp = mail.mail.FileMessage(self.f, self.name, self.final)
- def tearDown(self):
- try:
- self.f.close()
- except:
- pass
- try:
- os.remove(self.name)
- except:
- pass
- try:
- os.remove(self.final)
- except:
- pass
- def testFinalName(self):
- return self.fp.eomReceived().addCallback(self._cbFinalName)
- def _cbFinalName(self, result):
- self.assertEqual(result, self.final)
- self.assertTrue(self.f.closed)
- self.assertFalse(os.path.exists(self.name))
- def testContents(self):
- contents = "first line\nsecond line\nthird line\n"
- for line in contents.splitlines():
- self.fp.lineReceived(line)
- self.fp.eomReceived()
- with open(self.final) as f:
- self.assertEqual(f.read(), contents)
- def testInterrupted(self):
- contents = "first line\nsecond line\n"
- for line in contents.splitlines():
- self.fp.lineReceived(line)
- self.fp.connectionLost()
- self.assertFalse(os.path.exists(self.name))
- self.assertFalse(os.path.exists(self.final))
- class MailServiceTests(unittest.TestCase):
- def setUp(self):
- self.service = mail.mail.MailService()
- def testFactories(self):
- f = self.service.getPOP3Factory()
- self.assertTrue(isinstance(f, protocol.ServerFactory))
- self.assertTrue(f.buildProtocol(('127.0.0.1', 12345)), pop3.POP3)
- f = self.service.getSMTPFactory()
- self.assertTrue(isinstance(f, protocol.ServerFactory))
- self.assertTrue(f.buildProtocol(('127.0.0.1', 12345)), smtp.SMTP)
- f = self.service.getESMTPFactory()
- self.assertTrue(isinstance(f, protocol.ServerFactory))
- self.assertTrue(f.buildProtocol(('127.0.0.1', 12345)), smtp.ESMTP)
- def testPortals(self):
- o1 = object()
- o2 = object()
- self.service.portals['domain'] = o1
- self.service.portals[''] = o2
- self.assertTrue(self.service.lookupPortal('domain') is o1)
- self.assertTrue(self.service.defaultPortal() is o2)
- class StringListMailboxTests(unittest.TestCase):
- """
- Tests for L{StringListMailbox}, an in-memory only implementation of
- L{pop3.IMailbox}.
- """
- def test_listOneMessage(self):
- """
- L{StringListMailbox.listMessages} returns the length of the message at
- the offset into the mailbox passed to it.
- """
- mailbox = mail.maildir.StringListMailbox(["abc", "ab", "a"])
- self.assertEqual(mailbox.listMessages(0), 3)
- self.assertEqual(mailbox.listMessages(1), 2)
- self.assertEqual(mailbox.listMessages(2), 1)
- def test_listAllMessages(self):
- """
- L{StringListMailbox.listMessages} returns a list of the lengths of all
- messages if not passed an index.
- """
- mailbox = mail.maildir.StringListMailbox(["a", "abc", "ab"])
- self.assertEqual(mailbox.listMessages(), [1, 3, 2])
- def test_getMessage(self):
- """
- L{StringListMailbox.getMessage} returns a file-like object from which
- the contents of the message at the given offset into the mailbox can be
- read.
- """
- mailbox = mail.maildir.StringListMailbox(["foo", "real contents"])
- self.assertEqual(mailbox.getMessage(1).read(), "real contents")
- def test_getUidl(self):
- """
- L{StringListMailbox.getUidl} returns a unique identifier for the
- message at the given offset into the mailbox.
- """
- mailbox = mail.maildir.StringListMailbox(["foo", "bar"])
- self.assertNotEqual(mailbox.getUidl(0), mailbox.getUidl(1))
- def test_deleteMessage(self):
- """
- L{StringListMailbox.deleteMessage} marks a message for deletion causing
- further requests for its length to return 0.
- """
- mailbox = mail.maildir.StringListMailbox(["foo"])
- mailbox.deleteMessage(0)
- self.assertEqual(mailbox.listMessages(0), 0)
- self.assertEqual(mailbox.listMessages(), [0])
- def test_undeleteMessages(self):
- """
- L{StringListMailbox.undeleteMessages} causes any messages marked for
- deletion to be returned to their original state.
- """
- mailbox = mail.maildir.StringListMailbox(["foo"])
- mailbox.deleteMessage(0)
- mailbox.undeleteMessages()
- self.assertEqual(mailbox.listMessages(0), 3)
- self.assertEqual(mailbox.listMessages(), [3])
- def test_sync(self):
- """
- L{StringListMailbox.sync} causes any messages as marked for deletion to
- be permanently deleted.
- """
- mailbox = mail.maildir.StringListMailbox(["foo"])
- mailbox.deleteMessage(0)
- mailbox.sync()
- mailbox.undeleteMessages()
- self.assertEqual(mailbox.listMessages(0), 0)
- self.assertEqual(mailbox.listMessages(), [0])
- class FailingMaildirMailboxAppendMessageTask(mail.maildir._MaildirMailboxAppendMessageTask):
- _openstate = True
- _writestate = True
- _renamestate = True
- def osopen(self, fn, attr, mode):
- if self._openstate:
- return os.open(fn, attr, mode)
- else:
- raise OSError(errno.EPERM, "Faked Permission Problem")
- def oswrite(self, fh, data):
- if self._writestate:
- return os.write(fh, data)
- else:
- raise OSError(errno.ENOSPC, "Faked Space problem")
- def osrename(self, oldname, newname):
- if self._renamestate:
- return os.rename(oldname, newname)
- else:
- raise OSError(errno.EPERM, "Faked Permission Problem")
- class _AppendTestMixin(object):
- """
- Mixin for L{MaildirMailbox.appendMessage} test cases which defines a helper
- for serially appending multiple messages to a mailbox.
- """
- def _appendMessages(self, mbox, messages):
- """
- Deliver the given messages one at a time. Delivery is serialized to
- guarantee a predictable order in the mailbox (overlapped message deliver
- makes no guarantees about which message which appear first).
- """
- results = []
- def append():
- for m in messages:
- d = mbox.appendMessage(m)
- d.addCallback(results.append)
- yield d
- d = task.cooperate(append()).whenDone()
- d.addCallback(lambda ignored: results)
- return d
- class MaildirAppendStringTests(unittest.TestCase, _AppendTestMixin):
- """
- Tests for L{MaildirMailbox.appendMessage} when invoked with a C{str}.
- """
- def setUp(self):
- self.d = self.mktemp()
- mail.maildir.initializeMaildir(self.d)
- def _append(self, ignored, mbox):
- d = mbox.appendMessage('TEST')
- return self.assertFailure(d, Exception)
- def _setState(self, ignored, mbox, rename=None, write=None, open=None):
- """
- Change the behavior of future C{rename}, C{write}, or C{open} calls made
- by the mailbox C{mbox}.
- @param rename: If not L{None}, a new value for the C{_renamestate}
- attribute of the mailbox's append factory. The original value will
- be restored at the end of the test.
- @param write: Like C{rename}, but for the C{_writestate} attribute.
- @param open: Like C{rename}, but for the C{_openstate} attribute.
- """
- if rename is not None:
- self.addCleanup(
- setattr, mbox.AppendFactory, '_renamestate',
- mbox.AppendFactory._renamestate)
- mbox.AppendFactory._renamestate = rename
- if write is not None:
- self.addCleanup(
- setattr, mbox.AppendFactory, '_writestate',
- mbox.AppendFactory._writestate)
- mbox.AppendFactory._writestate = write
- if open is not None:
- self.addCleanup(
- setattr, mbox.AppendFactory, '_openstate',
- mbox.AppendFactory._openstate)
- mbox.AppendFactory._openstate = open
- def test_append(self):
- """
- L{MaildirMailbox.appendMessage} returns a L{Deferred} which fires when
- the message has been added to the end of the mailbox.
- """
- mbox = mail.maildir.MaildirMailbox(self.d)
- mbox.AppendFactory = FailingMaildirMailboxAppendMessageTask
- d = self._appendMessages(mbox, ["X" * i for i in range(1, 11)])
- d.addCallback(self.assertEqual, [None] * 10)
- d.addCallback(self._cbTestAppend, mbox)
- return d
- def _cbTestAppend(self, ignored, mbox):
- """
- Check that the mailbox has the expected number (ten) of messages in it,
- and that each has the expected contents, and that they are in the same
- order as that in which they were appended.
- """
- self.assertEqual(len(mbox.listMessages()), 10)
- self.assertEqual(
- [len(mbox.getMessage(i).read()) for i in range(10)],
- list(range(1, 11)))
- # test in the right order: last to first error location.
- self._setState(None, mbox, rename=False)
- d = self._append(None, mbox)
- d.addCallback(self._setState, mbox, rename=True, write=False)
- d.addCallback(self._append, mbox)
- d.addCallback(self._setState, mbox, write=True, open=False)
- d.addCallback(self._append, mbox)
- d.addCallback(self._setState, mbox, open=True)
- return d
- class MaildirAppendFileTests(unittest.TestCase, _AppendTestMixin):
- """
- Tests for L{MaildirMailbox.appendMessage} when invoked with a C{str}.
- """
- def setUp(self):
- self.d = self.mktemp()
- mail.maildir.initializeMaildir(self.d)
- def test_append(self):
- """
- L{MaildirMailbox.appendMessage} returns a L{Deferred} which fires when
- the message has been added to the end of the mailbox.
- """
- mbox = mail.maildir.MaildirMailbox(self.d)
- messages = []
- for i in range(1, 11):
- temp = tempfile.TemporaryFile()
- temp.write("X" * i)
- temp.seek(0, 0)
- messages.append(temp)
- self.addCleanup(temp.close)
- d = self._appendMessages(mbox, messages)
- d.addCallback(self._cbTestAppend, mbox)
- return d
- def _cbTestAppend(self, result, mbox):
- """
- Check that the mailbox has the expected number (ten) of messages in it,
- and that each has the expected contents, and that they are in the same
- order as that in which they were appended.
- """
- self.assertEqual(len(mbox.listMessages()), 10)
- self.assertEqual(
- [len(mbox.getMessage(i).read()) for i in range(10)],
- list(range(1, 11)))
- class MaildirTests(unittest.TestCase):
- def setUp(self):
- self.d = self.mktemp()
- mail.maildir.initializeMaildir(self.d)
- def tearDown(self):
- shutil.rmtree(self.d)
- def testInitializer(self):
- d = self.d
- trash = os.path.join(d, '.Trash')
- self.assertTrue(os.path.exists(d) and os.path.isdir(d))
- self.assertTrue(os.path.exists(os.path.join(d, 'new')))
- self.assertTrue(os.path.exists(os.path.join(d, 'cur')))
- self.assertTrue(os.path.exists(os.path.join(d, 'tmp')))
- self.assertTrue(os.path.isdir(os.path.join(d, 'new')))
- self.assertTrue(os.path.isdir(os.path.join(d, 'cur')))
- self.assertTrue(os.path.isdir(os.path.join(d, 'tmp')))
- self.assertTrue(os.path.exists(os.path.join(trash, 'new')))
- self.assertTrue(os.path.exists(os.path.join(trash, 'cur')))
- self.assertTrue(os.path.exists(os.path.join(trash, 'tmp')))
- self.assertTrue(os.path.isdir(os.path.join(trash, 'new')))
- self.assertTrue(os.path.isdir(os.path.join(trash, 'cur')))
- self.assertTrue(os.path.isdir(os.path.join(trash, 'tmp')))
- def test_nameGenerator(self):
- """
- Each call to L{_MaildirNameGenerator.generate} returns a unique
- string suitable for use as the basename of a new message file. The
- names are ordered such that those generated earlier sort less than
- those generated later.
- """
- clock = task.Clock()
- clock.advance(0.05)
- generator = mail.maildir._MaildirNameGenerator(clock)
- firstName = generator.generate()
- clock.advance(0.05)
- secondName = generator.generate()
- self.assertTrue(firstName < secondName)
- def test_mailbox(self):
- """
- Exercise the methods of L{IMailbox} as implemented by
- L{MaildirMailbox}.
- """
- j = os.path.join
- n = mail.maildir._generateMaildirName
- msgs = [j(b, n()) for b in ('cur', 'new') for x in range(5)]
- # Toss a few files into the mailbox
- i = 1
- for f in msgs:
- with open(j(self.d, f), 'w') as fObj:
- fObj.write('x' * i)
- i = i + 1
- mb = mail.maildir.MaildirMailbox(self.d)
- self.assertEqual(mb.listMessages(), list(range(1, 11)))
- self.assertEqual(mb.listMessages(1), 2)
- self.assertEqual(mb.listMessages(5), 6)
- self.assertEqual(mb.getMessage(6).read(), 'x' * 7)
- self.assertEqual(mb.getMessage(1).read(), 'x' * 2)
- d = {}
- for i in range(10):
- u = mb.getUidl(i)
- self.assertFalse(u in d)
- d[u] = None
- p, f = os.path.split(msgs[5])
- mb.deleteMessage(5)
- self.assertEqual(mb.listMessages(5), 0)
- self.assertTrue(os.path.exists(j(self.d, '.Trash', 'cur', f)))
- self.assertFalse(os.path.exists(j(self.d, msgs[5])))
- mb.undeleteMessages()
- self.assertEqual(mb.listMessages(5), 6)
- self.assertFalse(os.path.exists(j(self.d, '.Trash', 'cur', f)))
- self.assertTrue(os.path.exists(j(self.d, msgs[5])))
- class AbstractMaildirDomainTests(unittest.TestCase):
- """
- Tests for L{twisted.mail.maildir.AbstractMaildirDomain}.
- """
- def test_interface(self):
- """
- L{maildir.AbstractMaildirDomain} implements L{mail.IAliasableDomain}.
- """
- verifyClass(mail.mail.IAliasableDomain,
- mail.maildir.AbstractMaildirDomain)
- class MaildirDirdbmDomainTests(unittest.TestCase):
- """
- Tests for L{MaildirDirdbmDomain}.
- """
- def setUp(self):
- """
- Create a temporary L{MaildirDirdbmDomain} and parent
- L{MailService} before running each test.
- """
- self.P = self.mktemp()
- self.S = mail.mail.MailService()
- self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.P)
- def tearDown(self):
- """
- Remove the temporary C{maildir} directory when the test has
- finished.
- """
- shutil.rmtree(self.P)
- def test_addUser(self):
- """
- L{MaildirDirdbmDomain.addUser} accepts a user and password
- argument. It stores those in a C{dbm} dictionary
- attribute and creates a directory for each user.
- """
- toAdd = (('user1', 'pwd1'), ('user2', 'pwd2'), ('user3', 'pwd3'))
- for (u, p) in toAdd:
- self.D.addUser(u, p)
- for (u, p) in toAdd:
- self.assertTrue(u in self.D.dbm)
- self.assertEqual(self.D.dbm[u], p)
- self.assertTrue(os.path.exists(os.path.join(self.P, u)))
- def test_credentials(self):
- """
- L{MaildirDirdbmDomain.getCredentialsCheckers} initializes and
- returns one L{ICredentialsChecker} checker by default.
- """
- creds = self.D.getCredentialsCheckers()
- self.assertEqual(len(creds), 1)
- self.assertTrue(cred.checkers.ICredentialsChecker.providedBy(creds[0]))
- self.assertTrue(cred.credentials.IUsernamePassword in creds[0].credentialInterfaces)
- def test_requestAvatar(self):
- """
- L{MaildirDirdbmDomain.requestAvatar} raises L{NotImplementedError}
- unless it is supplied with an L{pop3.IMailbox} interface.
- When called with an L{pop3.IMailbox}, it returns a 3-tuple
- containing L{pop3.IMailbox}, an implementation of that interface
- and a NOOP callable.
- """
- class ISomething(Interface):
- pass
- self.D.addUser('user', 'password')
- self.assertRaises(
- NotImplementedError,
- self.D.requestAvatar, 'user', None, ISomething
- )
- t = self.D.requestAvatar('user', None, pop3.IMailbox)
- self.assertEqual(len(t), 3)
- self.assertTrue(t[0] is pop3.IMailbox)
- self.assertTrue(pop3.IMailbox.providedBy(t[1]))
- t[2]()
- def test_requestAvatarId(self):
- """
- L{DirdbmDatabase.requestAvatarId} raises L{UnauthorizedLogin} if
- supplied with invalid user credentials.
- When called with valid credentials, L{requestAvatarId} returns
- the username associated with the supplied credentials.
- """
- self.D.addUser('user', 'password')
- database = self.D.getCredentialsCheckers()[0]
- creds = cred.credentials.UsernamePassword('user', 'wrong password')
- self.assertRaises(
- cred.error.UnauthorizedLogin,
- database.requestAvatarId, creds
- )
- creds = cred.credentials.UsernamePassword('user', 'password')
- self.assertEqual(database.requestAvatarId(creds), 'user')
- def test_userDirectory(self):
- """
- L{MaildirDirdbmDomain.userDirectory} is supplied with a user name
- and returns the path to that user's maildir subdirectory.
- Calling L{MaildirDirdbmDomain.userDirectory} with a
- non-existent user returns the 'postmaster' directory if there
- is a postmaster or returns L{None} if there is no postmaster.
- """
- self.D.addUser('user', 'password')
- self.assertEqual(self.D.userDirectory('user'),
- os.path.join(self.D.root, 'user'))
- self.D.postmaster = False
- self.assertIdentical(self.D.userDirectory('nouser'), None)
- self.D.postmaster = True
- self.assertEqual(self.D.userDirectory('nouser'),
- os.path.join(self.D.root, 'postmaster'))
- @implementer(mail.mail.IAliasableDomain)
- class StubAliasableDomain(object):
- """
- Minimal testable implementation of IAliasableDomain.
- """
- def exists(self, user):
- """
- No test coverage for invocations of this method on domain objects,
- so we just won't implement it.
- """
- raise NotImplementedError()
- def addUser(self, user, password):
- """
- No test coverage for invocations of this method on domain objects,
- so we just won't implement it.
- """
- raise NotImplementedError()
- def getCredentialsCheckers(self):
- """
- This needs to succeed in order for other tests to complete
- successfully, but we don't actually assert anything about its
- behavior. Return an empty list. Sometime later we should return
- something else and assert that a portal got set up properly.
- """
- return []
- def setAliasGroup(self, aliases):
- """
- Just record the value so the test can check it later.
- """
- self.aliasGroup = aliases
- class ServiceDomainTests(unittest.TestCase):
- def setUp(self):
- self.S = mail.mail.MailService()
- self.D = mail.protocols.DomainDeliveryBase(self.S, None)
- self.D.service = self.S
- self.D.protocolName = 'TEST'
- self.D.host = 'hostname'
- self.tmpdir = self.mktemp()
- domain = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
- domain.addUser('user', 'password')
- self.S.addDomain('test.domain', domain)
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- def testAddAliasableDomain(self):
- """
- Test that adding an IAliasableDomain to a mail service properly sets
- up alias group references and such.
- """
- aliases = object()
- domain = StubAliasableDomain()
- self.S.aliases = aliases
- self.S.addDomain('example.com', domain)
- self.assertIdentical(domain.aliasGroup, aliases)
- def testReceivedHeader(self):
- hdr = self.D.receivedHeader(
- ('remotehost', '123.232.101.234'),
- smtp.Address('<someguy@someplace>'),
- ['user@host.name']
- )
- fp = StringIO.StringIO(hdr)
- emailParser = email.parser.Parser()
- m = emailParser.parse(fp)
- self.assertEqual(len(m.items()), 1)
- self.assertIn('Received', m)
- def testValidateTo(self):
- user = smtp.User('user@test.domain', 'helo', None, 'wherever@whatever')
- return defer.maybeDeferred(self.D.validateTo, user
- ).addCallback(self._cbValidateTo
- )
- def _cbValidateTo(self, result):
- self.assertTrue(callable(result))
- def testValidateToBadUsername(self):
- user = smtp.User('resu@test.domain', 'helo', None, 'wherever@whatever')
- return self.assertFailure(
- defer.maybeDeferred(self.D.validateTo, user),
- smtp.SMTPBadRcpt)
- def testValidateToBadDomain(self):
- user = smtp.User('user@domain.test', 'helo', None, 'wherever@whatever')
- return self.assertFailure(
- defer.maybeDeferred(self.D.validateTo, user),
- smtp.SMTPBadRcpt)
- def testValidateFrom(self):
- helo = ('hostname', '127.0.0.1')
- origin = smtp.Address('<user@hostname>')
- self.assertTrue(self.D.validateFrom(helo, origin) is origin)
- helo = ('hostname', '1.2.3.4')
- origin = smtp.Address('<user@hostname>')
- self.assertTrue(self.D.validateFrom(helo, origin) is origin)
- helo = ('hostname', '1.2.3.4')
- origin = smtp.Address('<>')
- self.assertTrue(self.D.validateFrom(helo, origin) is origin)
- self.assertRaises(
- smtp.SMTPBadSender,
- self.D.validateFrom, None, origin
- )
- class VirtualPOP3Tests(unittest.TestCase):
- def setUp(self):
- self.tmpdir = self.mktemp()
- self.S = mail.mail.MailService()
- self.D = mail.maildir.MaildirDirdbmDomain(self.S, self.tmpdir)
- self.D.addUser('user', 'password')
- self.S.addDomain('test.domain', self.D)
- portal = cred.portal.Portal(self.D)
- map(portal.registerChecker, self.D.getCredentialsCheckers())
- self.S.portals[''] = self.S.portals['test.domain'] = portal
- self.P = mail.protocols.VirtualPOP3()
- self.P.service = self.S
- self.P.magic = '<unit test magic>'
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- def testAuthenticateAPOP(self):
- resp = md5(self.P.magic + 'password').hexdigest()
- return self.P.authenticateUserAPOP('user', resp
- ).addCallback(self._cbAuthenticateAPOP
- )
- def _cbAuthenticateAPOP(self, result):
- self.assertEqual(len(result), 3)
- self.assertEqual(result[0], pop3.IMailbox)
- self.assertTrue(pop3.IMailbox.providedBy(result[1]))
- result[2]()
- def testAuthenticateIncorrectUserAPOP(self):
- resp = md5(self.P.magic + 'password').hexdigest()
- return self.assertFailure(
- self.P.authenticateUserAPOP('resu', resp),
- cred.error.UnauthorizedLogin)
- def testAuthenticateIncorrectResponseAPOP(self):
- resp = md5('wrong digest').hexdigest()
- return self.assertFailure(
- self.P.authenticateUserAPOP('user', resp),
- cred.error.UnauthorizedLogin)
- def testAuthenticatePASS(self):
- return self.P.authenticateUserPASS('user', 'password'
- ).addCallback(self._cbAuthenticatePASS
- )
- def _cbAuthenticatePASS(self, result):
- self.assertEqual(len(result), 3)
- self.assertEqual(result[0], pop3.IMailbox)
- self.assertTrue(pop3.IMailbox.providedBy(result[1]))
- result[2]()
- def testAuthenticateBadUserPASS(self):
- return self.assertFailure(
- self.P.authenticateUserPASS('resu', 'password'),
- cred.error.UnauthorizedLogin)
- def testAuthenticateBadPasswordPASS(self):
- return self.assertFailure(
- self.P.authenticateUserPASS('user', 'wrong password'),
- cred.error.UnauthorizedLogin)
- class empty(smtp.User):
- def __init__(self):
- pass
- class RelayTests(unittest.TestCase):
- def testExists(self):
- service = mail.mail.MailService()
- domain = mail.relay.DomainQueuer(service)
- doRelay = [
- address.UNIXAddress('/var/run/mail-relay'),
- address.IPv4Address('TCP', '127.0.0.1', 12345),
- ]
- dontRelay = [
- address.IPv4Address('TCP', '192.168.2.1', 62),
- address.IPv4Address('TCP', '1.2.3.4', 1943),
- ]
- for peer in doRelay:
- user = empty()
- user.orig = 'user@host'
- user.dest = 'tsoh@resu'
- user.protocol = empty()
- user.protocol.transport = empty()
- user.protocol.transport.getPeer = lambda: peer
- self.assertTrue(callable(domain.exists(user)))
- for peer in dontRelay:
- user = empty()
- user.orig = 'some@place'
- user.protocol = empty()
- user.protocol.transport = empty()
- user.protocol.transport.getPeer = lambda: peer
- user.dest = 'who@cares'
- self.assertRaises(smtp.SMTPBadRcpt, domain.exists, user)
- class RelayerTests(unittest.TestCase):
- def setUp(self):
- self.tmpdir = self.mktemp()
- os.mkdir(self.tmpdir)
- self.messageFiles = []
- for i in range(10):
- name = os.path.join(self.tmpdir, 'body-%d' % (i,))
- with open(name + '-H', 'w') as f:
- pickle.dump(['from-%d' % (i,), 'to-%d' % (i,)], f)
- f = open(name + '-D', 'w')
- f.write(name)
- f.seek(0, 0)
- self.messageFiles.append(name)
- self.R = mail.relay.RelayerMixin()
- self.R.loadMessages(self.messageFiles)
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- def testMailFrom(self):
- for i in range(10):
- self.assertEqual(self.R.getMailFrom(), 'from-%d' % (i,))
- self.R.sentMail(250, None, None, None, None)
- self.assertEqual(self.R.getMailFrom(), None)
- def testMailTo(self):
- for i in range(10):
- self.assertEqual(self.R.getMailTo(), ['to-%d' % (i,)])
- self.R.sentMail(250, None, None, None, None)
- self.assertEqual(self.R.getMailTo(), None)
- def testMailData(self):
- for i in range(10):
- name = os.path.join(self.tmpdir, 'body-%d' % (i,))
- self.assertEqual(self.R.getMailData().read(), name)
- self.R.sentMail(250, None, None, None, None)
- self.assertEqual(self.R.getMailData(), None)
- class Manager:
- def __init__(self):
- self.success = []
- self.failure = []
- self.done = []
- def notifySuccess(self, factory, message):
- self.success.append((factory, message))
- def notifyFailure(self, factory, message):
- self.failure.append((factory, message))
- def notifyDone(self, factory):
- self.done.append(factory)
- class ManagedRelayerTests(unittest.TestCase):
- def setUp(self):
- self.manager = Manager()
- self.messages = list(range(0, 20, 2))
- self.factory = object()
- self.relay = mail.relaymanager.ManagedRelayerMixin(self.manager)
- self.relay.messages = self.messages[:]
- self.relay.names = self.messages[:]
- self.relay.factory = self.factory
- def testSuccessfulSentMail(self):
- for i in self.messages:
- self.relay.sentMail(250, None, None, None, None)
- self.assertEqual(
- self.manager.success,
- [(self.factory, m) for m in self.messages]
- )
- def testFailedSentMail(self):
- for i in self.messages:
- self.relay.sentMail(550, None, None, None, None)
- self.assertEqual(
- self.manager.failure,
- [(self.factory, m) for m in self.messages]
- )
- def testConnectionLost(self):
- self.relay.connectionLost(failure.Failure(Exception()))
- self.assertEqual(self.manager.done, [self.factory])
- class DirectoryQueueTests(unittest.TestCase):
- def setUp(self):
- # This is almost a test case itself.
- self.tmpdir = self.mktemp()
- os.mkdir(self.tmpdir)
- self.queue = mail.relaymanager.Queue(self.tmpdir)
- self.queue.noisy = False
- for m in range(25):
- hdrF, msgF = self.queue.createNewMessage()
- with hdrF:
- pickle.dump(['header', m], hdrF)
- msgF.lineReceived('body: %d' % (m,))
- msgF.eomReceived()
- self.queue.readDirectory()
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
- def testWaiting(self):
- self.assertTrue(self.queue.hasWaiting())
- self.assertEqual(len(self.queue.getWaiting()), 25)
- waiting = self.queue.getWaiting()
- self.queue.setRelaying(waiting[0])
- self.assertEqual(len(self.queue.getWaiting()), 24)
- self.queue.setWaiting(waiting[0])
- self.assertEqual(len(self.queue.getWaiting()), 25)
- def testRelaying(self):
- for m in self.queue.getWaiting():
- self.queue.setRelaying(m)
- self.assertEqual(
- len(self.queue.getRelayed()),
- 25 - len(self.queue.getWaiting())
- )
- self.assertFalse(self.queue.hasWaiting())
- relayed = self.queue.getRelayed()
- self.queue.setWaiting(relayed[0])
- self.assertEqual(len(self.queue.getWaiting()), 1)
- self.assertEqual(len(self.queue.getRelayed()), 24)
- def testDone(self):
- msg = self.queue.getWaiting()[0]
- self.queue.setRelaying(msg)
- self.queue.done(msg)
- self.assertEqual(len(self.queue.getWaiting()), 24)
- self.assertEqual(len(self.queue.getRelayed()), 0)
- self.assertFalse(msg in self.queue.getWaiting())
- self.assertFalse(msg in self.queue.getRelayed())
- def testEnvelope(self):
- envelopes = []
- for msg in self.queue.getWaiting():
- envelopes.append(self.queue.getEnvelope(msg))
- envelopes.sort()
- for i in range(25):
- self.assertEqual(
- envelopes.pop(0),
- ['header', i]
- )
- from twisted.names import server
- from twisted.names import client
- from twisted.names import common
- class TestAuthority(common.ResolverBase):
- def __init__(self):
- common.ResolverBase.__init__(self)
- self.addresses = {}
- def _lookup(self, name, cls, type, timeout = None):
- if name in self.addresses and type == dns.MX:
- results = []
- for a in self.addresses[name]:
- hdr = dns.RRHeader(
- name, dns.MX, dns.IN, 60, dns.Record_MX(0, a)
- )
- results.append(hdr)
- return defer.succeed((results, [], []))
- return defer.fail(failure.Failure(dns.DomainError(name)))
- def setUpDNS(self):
- self.auth = TestAuthority()
- factory = server.DNSServerFactory([self.auth])
- protocol = dns.DNSDatagramProtocol(factory)
- while 1:
- self.port = reactor.listenTCP(0, factory, interface='127.0.0.1')
- portNumber = self.port.getHost().port
- try:
- self.udpPort = reactor.listenUDP(portNumber, protocol, interface='127.0.0.1')
- except CannotListenError:
- self.port.stopListening()
- else:
- break
- self.resolver = client.Resolver(servers=[('127.0.0.1', portNumber)])
- def tearDownDNS(self):
- dl = []
- dl.append(defer.maybeDeferred(self.port.stopListening))
- dl.append(defer.maybeDeferred(self.udpPort.stopListening))
- try:
- self.resolver._parseCall.cancel()
- except:
- pass
- return defer.DeferredList(dl)
- class MXTests(unittest.TestCase):
- """
- Tests for L{mail.relaymanager.MXCalculator}.
- """
- def setUp(self):
- setUpDNS(self)
- self.clock = task.Clock()
- self.mx = mail.relaymanager.MXCalculator(self.resolver, self.clock)
- def tearDown(self):
- return tearDownDNS(self)
- def test_defaultClock(self):
- """
- L{MXCalculator}'s default clock is C{twisted.internet.reactor}.
- """
- self.assertIdentical(
- mail.relaymanager.MXCalculator(self.resolver).clock,
- reactor)
- def testSimpleSuccess(self):
- self.auth.addresses['test.domain'] = ['the.email.test.domain']
- return self.mx.getMX('test.domain').addCallback(self._cbSimpleSuccess)
- def _cbSimpleSuccess(self, mx):
- self.assertEqual(mx.preference, 0)
- self.assertEqual(str(mx.name), 'the.email.test.domain')
- def testSimpleFailure(self):
- self.mx.fallbackToDomain = False
- return self.assertFailure(self.mx.getMX('test.domain'), IOError)
- def testSimpleFailureWithFallback(self):
- return self.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
- def _exchangeTest(self, domain, records, correctMailExchange):
- """
- Issue an MX request for the given domain and arrange for it to be
- responded to with the given records. Verify that the resulting mail
- exchange is the indicated host.
- @type domain: C{str}
- @type records: C{list} of L{RRHeader}
- @type correctMailExchange: C{str}
- @rtype: L{Deferred}
- """
- class DummyResolver(object):
- def lookupMailExchange(self, name):
- if name == domain:
- return defer.succeed((
- records,
- [],
- []))
- return defer.fail(DNSNameError(domain))
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(domain)
- def gotMailExchange(record):
- self.assertEqual(str(record.name), correctMailExchange)
- d.addCallback(gotMailExchange)
- return d
- def test_mailExchangePreference(self):
- """
- The MX record with the lowest preference is returned by
- L{MXCalculator.getMX}.
- """
- domain = "example.com"
- good = "good.example.com"
- bad = "bad.example.com"
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, bad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, good)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(2, bad))]
- return self._exchangeTest(domain, records, good)
- def test_badExchangeExcluded(self):
- """
- L{MXCalculator.getMX} returns the MX record with the lowest preference
- which is not also marked as bad.
- """
- domain = "example.com"
- good = "good.example.com"
- bad = "bad.example.com"
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, bad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, good))]
- self.mx.markBad(bad)
- return self._exchangeTest(domain, records, good)
- def test_fallbackForAllBadExchanges(self):
- """
- L{MXCalculator.getMX} returns the MX record with the lowest preference
- if all the MX records in the response have been marked bad.
- """
- domain = "example.com"
- bad = "bad.example.com"
- worse = "worse.example.com"
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, bad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, worse))]
- self.mx.markBad(bad)
- self.mx.markBad(worse)
- return self._exchangeTest(domain, records, bad)
- def test_badExchangeExpires(self):
- """
- L{MXCalculator.getMX} returns the MX record with the lowest preference
- if it was last marked bad longer than L{MXCalculator.timeOutBadMX}
- seconds ago.
- """
- domain = "example.com"
- good = "good.example.com"
- previouslyBad = "bad.example.com"
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, previouslyBad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, good))]
- self.mx.markBad(previouslyBad)
- self.clock.advance(self.mx.timeOutBadMX)
- return self._exchangeTest(domain, records, previouslyBad)
- def test_goodExchangeUsed(self):
- """
- L{MXCalculator.getMX} returns the MX record with the lowest preference
- if it was marked good after it was marked bad.
- """
- domain = "example.com"
- good = "good.example.com"
- previouslyBad = "bad.example.com"
- records = [
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, previouslyBad)),
- RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(1, good))]
- self.mx.markBad(previouslyBad)
- self.mx.markGood(previouslyBad)
- self.clock.advance(self.mx.timeOutBadMX)
- return self._exchangeTest(domain, records, previouslyBad)
- def test_successWithoutResults(self):
- """
- If an MX lookup succeeds but the result set is empty,
- L{MXCalculator.getMX} should try to look up an I{A} record for the
- requested name and call back its returned Deferred with that
- address.
- """
- ip = '1.2.3.4'
- domain = 'example.org'
- class DummyResolver(object):
- """
- Fake resolver which will respond to an MX lookup with an empty
- result set.
- @ivar mx: A dictionary mapping hostnames to three-tuples of
- results to be returned from I{MX} lookups.
- @ivar a: A dictionary mapping hostnames to addresses to be
- returned from I{A} lookups.
- """
- mx = {domain: ([], [], [])}
- a = {domain: ip}
- def lookupMailExchange(self, domain):
- return defer.succeed(self.mx[domain])
- def getHostByName(self, domain):
- return defer.succeed(self.a[domain])
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(domain)
- d.addCallback(self.assertEqual, Record_MX(name=ip))
- return d
- def test_failureWithSuccessfulFallback(self):
- """
- Test that if the MX record lookup fails, fallback is enabled, and an A
- record is available for the name, then the Deferred returned by
- L{MXCalculator.getMX} ultimately fires with a Record_MX instance which
- gives the address in the A record for the name.
- """
- class DummyResolver(object):
- """
- Fake resolver which will fail an MX lookup but then succeed a
- getHostByName call.
- """
- def lookupMailExchange(self, domain):
- return defer.fail(DNSNameError())
- def getHostByName(self, domain):
- return defer.succeed("1.2.3.4")
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX("domain")
- d.addCallback(self.assertEqual, Record_MX(name="1.2.3.4"))
- return d
- def test_cnameWithoutGlueRecords(self):
- """
- If an MX lookup returns a single CNAME record as a result, MXCalculator
- will perform an MX lookup for the canonical name indicated and return
- the MX record which results.
- """
- alias = "alias.example.com"
- canonical = "canonical.example.com"
- exchange = "mail.example.com"
- class DummyResolver(object):
- """
- Fake resolver which will return a CNAME for an MX lookup of a name
- which is an alias and an MX for an MX lookup of the canonical name.
- """
- def lookupMailExchange(self, domain):
- if domain == alias:
- return defer.succeed((
- [RRHeader(name=domain,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(canonical))],
- [], []))
- elif domain == canonical:
- return defer.succeed((
- [RRHeader(name=domain,
- type=Record_MX.TYPE,
- payload=Record_MX(0, exchange))],
- [], []))
- else:
- return defer.fail(DNSNameError(domain))
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(alias)
- d.addCallback(self.assertEqual, Record_MX(name=exchange))
- return d
- def test_cnameChain(self):
- """
- If L{MXCalculator.getMX} encounters a CNAME chain which is longer than
- the length specified, the returned L{Deferred} should errback with
- L{CanonicalNameChainTooLong}.
- """
- class DummyResolver(object):
- """
- Fake resolver which generates a CNAME chain of infinite length in
- response to MX lookups.
- """
- chainCounter = 0
- def lookupMailExchange(self, domain):
- self.chainCounter += 1
- name = 'x-%d.example.com' % (self.chainCounter,)
- return defer.succeed((
- [RRHeader(name=domain,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(name))],
- [], []))
- cnameLimit = 3
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX("mail.example.com", cnameLimit)
- self.assertFailure(
- d, twisted.mail.relaymanager.CanonicalNameChainTooLong)
- def cbChainTooLong(error):
- self.assertEqual(error.args[0], Record_CNAME("x-%d.example.com" % (cnameLimit + 1,)))
- self.assertEqual(self.mx.resolver.chainCounter, cnameLimit + 1)
- d.addCallback(cbChainTooLong)
- return d
- def test_cnameWithGlueRecords(self):
- """
- If an MX lookup returns a CNAME and the MX record for the CNAME, the
- L{Deferred} returned by L{MXCalculator.getMX} should be called back
- with the name from the MX record without further lookups being
- attempted.
- """
- lookedUp = []
- alias = "alias.example.com"
- canonical = "canonical.example.com"
- exchange = "mail.example.com"
- class DummyResolver(object):
- def lookupMailExchange(self, domain):
- if domain != alias or lookedUp:
- # Don't give back any results for anything except the alias
- # or on any request after the first.
- return ([], [], [])
- return defer.succeed((
- [RRHeader(name=alias,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(canonical)),
- RRHeader(name=canonical,
- type=Record_MX.TYPE,
- payload=Record_MX(name=exchange))],
- [], []))
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(alias)
- d.addCallback(self.assertEqual, Record_MX(name=exchange))
- return d
- def test_cnameLoopWithGlueRecords(self):
- """
- If an MX lookup returns two CNAME records which point to each other,
- the loop should be detected and the L{Deferred} returned by
- L{MXCalculator.getMX} should be errbacked with L{CanonicalNameLoop}.
- """
- firstAlias = "cname1.example.com"
- secondAlias = "cname2.example.com"
- class DummyResolver(object):
- def lookupMailExchange(self, domain):
- return defer.succeed((
- [RRHeader(name=firstAlias,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(secondAlias)),
- RRHeader(name=secondAlias,
- type=Record_CNAME.TYPE,
- payload=Record_CNAME(firstAlias))],
- [], []))
- self.mx.resolver = DummyResolver()
- d = self.mx.getMX(firstAlias)
- self.assertFailure(d, twisted.mail.relaymanager.CanonicalNameLoop)
- return d
- def testManyRecords(self):
- self.auth.addresses['test.domain'] = [
- 'mx1.test.domain', 'mx2.test.domain', 'mx3.test.domain'
- ]
- return self.mx.getMX('test.domain'
- ).addCallback(self._cbManyRecordsSuccessfulLookup
- )
- def _cbManyRecordsSuccessfulLookup(self, mx):
- self.assertTrue(str(mx.name).split('.', 1)[0] in ('mx1', 'mx2', 'mx3'))
- self.mx.markBad(str(mx.name))
- return self.mx.getMX('test.domain'
- ).addCallback(self._cbManyRecordsDifferentResult, mx
- )
- def _cbManyRecordsDifferentResult(self, nextMX, mx):
- self.assertNotEqual(str(mx.name), str(nextMX.name))
- self.mx.markBad(str(nextMX.name))
- return self.mx.getMX('test.domain'
- ).addCallback(self._cbManyRecordsLastResult, mx, nextMX
- )
- def _cbManyRecordsLastResult(self, lastMX, mx, nextMX):
- self.assertNotEqual(str(mx.name), str(lastMX.name))
- self.assertNotEqual(str(nextMX.name), str(lastMX.name))
- self.mx.markBad(str(lastMX.name))
- self.mx.markGood(str(nextMX.name))
- return self.mx.getMX('test.domain'
- ).addCallback(self._cbManyRecordsRepeatSpecificResult, nextMX
- )
- def _cbManyRecordsRepeatSpecificResult(self, againMX, nextMX):
- self.assertEqual(str(againMX.name), str(nextMX.name))
- class LiveFireExerciseTests(unittest.TestCase):
- if interfaces.IReactorUDP(reactor, None) is None:
- skip = "UDP support is required to determining MX records"
- def setUp(self):
- setUpDNS(self)
- self.tmpdirs = [
- 'domainDir', 'insertionDomain', 'insertionQueue',
- 'destinationDomain', 'destinationQueue'
- ]
- def tearDown(self):
- for d in self.tmpdirs:
- if os.path.exists(d):
- shutil.rmtree(d)
- return tearDownDNS(self)
- def testLocalDelivery(self):
- service = mail.mail.MailService()
- service.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
- domain = mail.maildir.MaildirDirdbmDomain(service, 'domainDir')
- domain.addUser('user', 'password')
- service.addDomain('test.domain', domain)
- service.portals[''] = service.portals['test.domain']
- map(service.portals[''].registerChecker, domain.getCredentialsCheckers())
- service.setQueue(mail.relay.DomainQueuer(service))
- f = service.getSMTPFactory()
- self.smtpServer = reactor.listenTCP(0, f, interface='127.0.0.1')
- client = LineSendingProtocol([
- 'HELO meson',
- 'MAIL FROM: <user@hostname>',
- 'RCPT TO: <user@test.domain>',
- 'DATA',
- 'This is the message',
- '.',
- 'QUIT'
- ])
- done = Deferred()
- f = protocol.ClientFactory()
- f.protocol = lambda: client
- f.clientConnectionLost = lambda *args: done.callback(None)
- reactor.connectTCP('127.0.0.1', self.smtpServer.getHost().port, f)
- def finished(ign):
- mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
- msg = mbox.getMessage(0).read()
- self.assertNotEqual(msg.find('This is the message'), -1)
- return self.smtpServer.stopListening()
- done.addCallback(finished)
- return done
- def testRelayDelivery(self):
- # Here is the service we will connect to and send mail from
- insServ = mail.mail.MailService()
- insServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
- domain = mail.maildir.MaildirDirdbmDomain(insServ, 'insertionDomain')
- insServ.addDomain('insertion.domain', domain)
- os.mkdir('insertionQueue')
- insServ.setQueue(mail.relaymanager.Queue('insertionQueue'))
- insServ.domains.setDefaultDomain(mail.relay.DomainQueuer(insServ))
- manager = mail.relaymanager.SmartHostSMTPRelayingManager(insServ.queue)
- manager.fArgs += ('test.identity.hostname',)
- helper = mail.relaymanager.RelayStateHelper(manager, 1)
- # Yoink! Now the internet obeys OUR every whim!
- manager.mxcalc = mail.relaymanager.MXCalculator(self.resolver)
- # And this is our whim.
- self.auth.addresses['destination.domain'] = ['127.0.0.1']
- f = insServ.getSMTPFactory()
- self.insServer = reactor.listenTCP(0, f, interface='127.0.0.1')
- # Here is the service the previous one will connect to for final
- # delivery
- destServ = mail.mail.MailService()
- destServ.smtpPortal.registerChecker(cred.checkers.AllowAnonymousAccess())
- domain = mail.maildir.MaildirDirdbmDomain(destServ, 'destinationDomain')
- domain.addUser('user', 'password')
- destServ.addDomain('destination.domain', domain)
- os.mkdir('destinationQueue')
- destServ.setQueue(mail.relaymanager.Queue('destinationQueue'))
- helper = mail.relaymanager.RelayStateHelper(manager, 1)
- helper.startService()
- f = destServ.getSMTPFactory()
- self.destServer = reactor.listenTCP(0, f, interface='127.0.0.1')
- # Update the port number the *first* relay will connect to, because we can't use
- # port 25
- manager.PORT = self.destServer.getHost().port
- client = LineSendingProtocol([
- 'HELO meson',
- 'MAIL FROM: <user@wherever>',
- 'RCPT TO: <user@destination.domain>',
- 'DATA',
- 'This is the message',
- '.',
- 'QUIT'
- ])
- done = Deferred()
- f = protocol.ClientFactory()
- f.protocol = lambda: client
- f.clientConnectionLost = lambda *args: done.callback(None)
- reactor.connectTCP('127.0.0.1', self.insServer.getHost().port, f)
- def finished(ign):
- # First part of the delivery is done. Poke the queue manually now
- # so we don't have to wait for the queue to be flushed.
- delivery = manager.checkState()
- def delivered(ign):
- mbox = domain.requestAvatar('user', None, pop3.IMailbox)[1]
- msg = mbox.getMessage(0).read()
- self.assertNotEqual(msg.find('This is the message'), -1)
- self.insServer.stopListening()
- self.destServer.stopListening()
- helper.stopService()
- delivery.addCallback(delivered)
- return delivery
- done.addCallback(finished)
- return done
- aliasFile = StringIO.StringIO("""\
- # Here's a comment
- # woop another one
- testuser: address1,address2, address3,
- continuation@address, |/bin/process/this
- usertwo:thisaddress,thataddress, lastaddress
- lastuser: :/includable, /filename, |/program, address
- """)
- class LineBufferMessage:
- def __init__(self):
- self.lines = []
- self.eom = False
- self.lost = False
- def lineReceived(self, line):
- self.lines.append(line)
- def eomReceived(self):
- self.eom = True
- return defer.succeed('<Whatever>')
- def connectionLost(self):
- self.lost = True
- class AliasTests(unittest.TestCase):
- lines = [
- 'First line',
- 'Next line',
- '',
- 'After a blank line',
- 'Last line'
- ]
- def setUp(self):
- aliasFile.seek(0)
- def testHandle(self):
- result = {}
- lines = [
- 'user: another@host\n',
- 'nextuser: |/bin/program\n',
- 'user: me@again\n',
- 'moreusers: :/etc/include/filename\n',
- 'multiuser: first@host, second@host,last@anotherhost',
- ]
- for l in lines:
- mail.alias.handle(result, l, 'TestCase', None)
- self.assertEqual(result['user'], ['another@host', 'me@again'])
- self.assertEqual(result['nextuser'], ['|/bin/program'])
- self.assertEqual(result['moreusers'], [':/etc/include/filename'])
- self.assertEqual(result['multiuser'], ['first@host', 'second@host', 'last@anotherhost'])
- def testFileLoader(self):
- domains = {'': object()}
- result = mail.alias.loadAliasFile(domains, fp=aliasFile)
- self.assertEqual(len(result), 3)
- group = result['testuser']
- s = str(group)
- for a in ('address1', 'address2', 'address3', 'continuation@address', '/bin/process/this'):
- self.assertNotEqual(s.find(a), -1)
- self.assertEqual(len(group), 5)
- group = result['usertwo']
- s = str(group)
- for a in ('thisaddress', 'thataddress', 'lastaddress'):
- self.assertNotEqual(s.find(a), -1)
- self.assertEqual(len(group), 3)
- group = result['lastuser']
- s = str(group)
- self.assertEqual(s.find('/includable'), -1)
- for a in ('/filename', 'program', 'address'):
- self.assertNotEqual(s.find(a), -1, '%s not found' % a)
- self.assertEqual(len(group), 3)
- def testMultiWrapper(self):
- msgs = LineBufferMessage(), LineBufferMessage(), LineBufferMessage()
- msg = mail.alias.MultiWrapper(msgs)
- for L in self.lines:
- msg.lineReceived(L)
- return msg.eomReceived().addCallback(self._cbMultiWrapper, msgs)
- def _cbMultiWrapper(self, ignored, msgs):
- for m in msgs:
- self.assertTrue(m.eom)
- self.assertFalse(m.lost)
- self.assertEqual(self.lines, m.lines)
- def testFileAlias(self):
- tmpfile = self.mktemp()
- a = mail.alias.FileAlias(tmpfile, None, None)
- m = a.createMessageReceiver()
- for l in self.lines:
- m.lineReceived(l)
- return m.eomReceived().addCallback(self._cbTestFileAlias, tmpfile)
- def _cbTestFileAlias(self, ignored, tmpfile):
- with open(tmpfile) as f:
- lines = f.readlines()
- self.assertEqual([L[:-1] for L in lines], self.lines)
- class DummyDomain(object):
- """
- Test domain for L{AddressAliasTests}.
- """
- def __init__(self, address):
- self.address = address
- def exists(self, user, memo=None):
- """
- @returns: When a C{memo} is passed in this will raise a
- L{smtp.SMTPBadRcpt} exception, otherwise a boolean
- indicating if the C{user} and string version of
- L{self.address} are equal or not.
- @rtype: C{bool}
- """
- if memo:
- raise mail.smtp.SMTPBadRcpt('ham')
- return lambda: user == str(self.address)
- class AddressAliasTests(unittest.TestCase):
- """
- Tests for L{twisted.mail.alias.AddressAlias}.
- """
- def setUp(self):
- """
- Setup an L{AddressAlias}.
- """
- self.address = mail.smtp.Address('foo@bar')
- domains = {self.address.domain: DummyDomain(self.address)}
- self.alias = mail.alias.AddressAlias(self.address, domains,
- self.address)
- def test_createMessageReceiver(self):
- """
- L{createMessageReceiever} calls C{exists()} on the domain object
- which key matches the C{alias} passed to L{AddressAlias}.
- """
- self.assertTrue(self.alias.createMessageReceiver())
- def test_str(self):
- """
- The string presentation of L{AddressAlias} includes the alias.
- """
- self.assertEqual(str(self.alias), '<Address foo@bar>')
- def test_resolve(self):
- """
- L{resolve} will look for additional aliases when an C{aliasmap}
- dictionary is passed, and returns L{None} if none were found.
- """
- self.assertEqual(self.alias.resolve({self.address: 'bar'}), None)
- def test_resolveWithoutAliasmap(self):
- """
- L{resolve} returns L{None} when the alias could not be found in the
- C{aliasmap} and no L{mail.smtp.User} with this alias exists either.
- """
- self.assertEqual(self.alias.resolve({}), None)
- class DummyProcess(object):
- __slots__ = ['onEnd']
- class MockProcessAlias(mail.alias.ProcessAlias):
- """
- An alias processor that doesn't actually launch processes.
- """
- def spawnProcess(self, proto, program, path):
- """
- Don't spawn a process.
- """
- class MockAliasGroup(mail.alias.AliasGroup):
- """
- An alias group using C{MockProcessAlias}.
- """
- processAliasFactory = MockProcessAlias
- class StubProcess(object):
- """
- Fake implementation of L{IProcessTransport}.
- @ivar signals: A list of all the signals which have been sent to this fake
- process.
- """
- def __init__(self):
- self.signals = []
- def loseConnection(self):
- """
- No-op implementation of disconnection.
- """
- def signalProcess(self, signal):
- """
- Record a signal sent to this process for later inspection.
- """
- self.signals.append(signal)
- class ProcessAliasTests(unittest.TestCase):
- """
- Tests for alias resolution.
- """
- if interfaces.IReactorProcess(reactor, None) is None:
- skip = "IReactorProcess not supported"
- lines = [
- 'First line',
- 'Next line',
- '',
- 'After a blank line',
- 'Last line'
- ]
- def exitStatus(self, code):
- """
- Construct a status from the given exit code.
- @type code: L{int} between 0 and 255 inclusive.
- @param code: The exit status which the code will represent.
- @rtype: L{int}
- @return: A status integer for the given exit code.
- """
- # /* Macros for constructing status values. */
- # #define __W_EXITCODE(ret, sig) ((ret) << 8 | (sig))
- status = (code << 8) | 0
- # Sanity check
- self.assertTrue(os.WIFEXITED(status))
- self.assertEqual(os.WEXITSTATUS(status), code)
- self.assertFalse(os.WIFSIGNALED(status))
- return status
- def signalStatus(self, signal):
- """
- Construct a status from the given signal.
- @type signal: L{int} between 0 and 255 inclusive.
- @param signal: The signal number which the status will represent.
- @rtype: L{int}
- @return: A status integer for the given signal.
- """
- # /* If WIFSIGNALED(STATUS), the terminating signal. */
- # #define __WTERMSIG(status) ((status) & 0x7f)
- # /* Nonzero if STATUS indicates termination by a signal. */
- # #define __WIFSIGNALED(status) \
- # (((signed char) (((status) & 0x7f) + 1) >> 1) > 0)
- status = signal
- # Sanity check
- self.assertTrue(os.WIFSIGNALED(status))
- self.assertEqual(os.WTERMSIG(status), signal)
- self.assertFalse(os.WIFEXITED(status))
- return status
- def setUp(self):
- """
- Replace L{smtp.DNSNAME} with a well-known value.
- """
- self.DNSNAME = smtp.DNSNAME
- smtp.DNSNAME = ''
- def tearDown(self):
- """
- Restore the original value of L{smtp.DNSNAME}.
- """
- smtp.DNSNAME = self.DNSNAME
- def test_processAlias(self):
- """
- Standard call to C{mail.alias.ProcessAlias}: check that the specified
- script is called, and that the input is correctly transferred to it.
- """
- sh = FilePath(self.mktemp())
- sh.setContent("""\
- #!/bin/sh
- rm -f process.alias.out
- while read i; do
- echo $i >> process.alias.out
- done""")
- os.chmod(sh.path, 0o700)
- a = mail.alias.ProcessAlias(sh.path, None, None)
- m = a.createMessageReceiver()
- for l in self.lines:
- m.lineReceived(l)
- def _cbProcessAlias(ignored):
- with open('process.alias.out') as f:
- lines = f.readlines()
- self.assertEqual([L[:-1] for L in lines], self.lines)
- return m.eomReceived().addCallback(_cbProcessAlias)
- def test_processAliasTimeout(self):
- """
- If the alias child process does not exit within a particular period of
- time, the L{Deferred} returned by L{MessageWrapper.eomReceived} should
- fail with L{ProcessAliasTimeout} and send the I{KILL} signal to the
- child process..
- """
- reactor = task.Clock()
- transport = StubProcess()
- proto = mail.alias.ProcessAliasProtocol()
- proto.makeConnection(transport)
- receiver = mail.alias.MessageWrapper(proto, None, reactor)
- d = receiver.eomReceived()
- reactor.advance(receiver.completionTimeout)
- def timedOut(ignored):
- self.assertEqual(transport.signals, ['KILL'])
- # Now that it has been killed, disconnect the protocol associated
- # with it.
- proto.processEnded(
- ProcessTerminated(self.signalStatus(signal.SIGKILL)))
- self.assertFailure(d, mail.alias.ProcessAliasTimeout)
- d.addCallback(timedOut)
- return d
- def test_earlyProcessTermination(self):
- """
- If the process associated with an L{mail.alias.MessageWrapper} exits
- before I{eomReceived} is called, the L{Deferred} returned by
- I{eomReceived} should fail.
- """
- transport = StubProcess()
- protocol = mail.alias.ProcessAliasProtocol()
- protocol.makeConnection(transport)
- receiver = mail.alias.MessageWrapper(protocol, None, None)
- protocol.processEnded(failure.Failure(ProcessDone(0)))
- return self.assertFailure(receiver.eomReceived(), ProcessDone)
- def _terminationTest(self, status):
- """
- Verify that if the process associated with an
- L{mail.alias.MessageWrapper} exits with the given status, the
- L{Deferred} returned by I{eomReceived} fails with L{ProcessTerminated}.
- """
- transport = StubProcess()
- protocol = mail.alias.ProcessAliasProtocol()
- protocol.makeConnection(transport)
- receiver = mail.alias.MessageWrapper(protocol, None, None)
- protocol.processEnded(
- failure.Failure(ProcessTerminated(status)))
- return self.assertFailure(receiver.eomReceived(), ProcessTerminated)
- def test_errorProcessTermination(self):
- """
- If the process associated with an L{mail.alias.MessageWrapper} exits
- with a non-zero exit code, the L{Deferred} returned by I{eomReceived}
- should fail.
- """
- return self._terminationTest(self.exitStatus(1))
- def test_signalProcessTermination(self):
- """
- If the process associated with an L{mail.alias.MessageWrapper} exits
- because it received a signal, the L{Deferred} returned by
- I{eomReceived} should fail.
- """
- return self._terminationTest(self.signalStatus(signal.SIGHUP))
- def test_aliasResolution(self):
- """
- Check that the C{resolve} method of alias processors produce the correct
- set of objects:
- - direct alias with L{mail.alias.AddressAlias} if a simple input is passed
- - aliases in a file with L{mail.alias.FileWrapper} if an input in the format
- '/file' is given
- - aliases resulting of a process call wrapped by L{mail.alias.MessageWrapper}
- if the format is '|process'
- """
- aliases = {}
- domain = {'': TestDomain(aliases, ['user1', 'user2', 'user3'])}
- A1 = MockAliasGroup(['user1', '|echo', '/file'], domain, 'alias1')
- A2 = MockAliasGroup(['user2', 'user3'], domain, 'alias2')
- A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
- aliases.update({
- 'alias1': A1,
- 'alias2': A2,
- 'alias3': A3,
- })
- res1 = A1.resolve(aliases)
- r1 = map(str, res1.objs)
- r1.sort()
- expected = map(str, [
- mail.alias.AddressAlias('user1', None, None),
- mail.alias.MessageWrapper(DummyProcess(), 'echo'),
- mail.alias.FileWrapper('/file'),
- ])
- expected.sort()
- self.assertEqual(r1, expected)
- res2 = A2.resolve(aliases)
- r2 = map(str, res2.objs)
- r2.sort()
- expected = map(str, [
- mail.alias.AddressAlias('user2', None, None),
- mail.alias.AddressAlias('user3', None, None)
- ])
- expected.sort()
- self.assertEqual(r2, expected)
- res3 = A3.resolve(aliases)
- r3 = map(str, res3.objs)
- r3.sort()
- expected = map(str, [
- mail.alias.AddressAlias('user1', None, None),
- mail.alias.MessageWrapper(DummyProcess(), 'echo'),
- mail.alias.FileWrapper('/file'),
- ])
- expected.sort()
- self.assertEqual(r3, expected)
- def test_cyclicAlias(self):
- """
- Check that a cycle in alias resolution is correctly handled.
- """
- aliases = {}
- domain = {'': TestDomain(aliases, [])}
- A1 = mail.alias.AddressAlias('alias2', domain, 'alias1')
- A2 = mail.alias.AddressAlias('alias3', domain, 'alias2')
- A3 = mail.alias.AddressAlias('alias1', domain, 'alias3')
- aliases.update({
- 'alias1': A1,
- 'alias2': A2,
- 'alias3': A3
- })
- self.assertEqual(aliases['alias1'].resolve(aliases), None)
- self.assertEqual(aliases['alias2'].resolve(aliases), None)
- self.assertEqual(aliases['alias3'].resolve(aliases), None)
- A4 = MockAliasGroup(['|echo', 'alias1'], domain, 'alias4')
- aliases['alias4'] = A4
- res = A4.resolve(aliases)
- r = map(str, res.objs)
- r.sort()
- expected = map(str, [
- mail.alias.MessageWrapper(DummyProcess(), 'echo')
- ])
- expected.sort()
- self.assertEqual(r, expected)
- class TestDomain:
- def __init__(self, aliases, users):
- self.aliases = aliases
- self.users = users
- def exists(self, user, memo=None):
- user = user.dest.local
- if user in self.users:
- return lambda: mail.alias.AddressAlias(user, None, None)
- try:
- a = self.aliases[user]
- except:
- raise smtp.SMTPBadRcpt(user)
- else:
- aliases = a.resolve(self.aliases, memo)
- if aliases:
- return lambda: aliases
- raise smtp.SMTPBadRcpt(user)
- class DummyQueue(object):
- """
- A fake relay queue to use for testing.
- This queue doesn't keep track of which messages are waiting to be relayed
- or are in the process of being relayed.
- @ivar directory: See L{__init__}.
- """
- def __init__(self, directory):
- """
- @type directory: L{bytes}
- @param directory: The pathname of the directory holding messages in the
- queue.
- """
- self.directory = directory
- def done(self, message):
- """
- Remove a message from the queue.
- @type message: L{bytes}
- @param message: The base filename of a message.
- """
- message = os.path.basename(message)
- os.remove(self.getPath(message) + '-D')
- os.remove(self.getPath(message) + '-H')
- def getEnvelopeFile(self, message):
- """
- Get the envelope file for a message in the queue.
- @type message: L{bytes}
- @param message: The base filename of a message.
- @rtype: L{file}
- @return: The envelope file for the message.
- """
- return open(os.path.join(self.directory, message+'-H'), 'rb')
- def getPath(self, message):
- """
- Return the full base pathname of a message in the queue.
- @type message: L{bytes}
- @param message: The base filename of a message.
- @rtype: L{bytes}
- @return: The full base pathname of the message.
- """
- return os.path.join(self.directory, message)
- def createNewMessage(self):
- """
- Create a new message in the queue.
- @rtype: 2-L{tuple} of (E{1}) L{file}, (E{2}) L{FileMessage}
- @return: The envelope file and a message receiver for a new message in
- the queue.
- """
- fname = "%s_%s" % (time.time(), id(self))
- headerFile = open(os.path.join(self.directory, fname+'-H'), 'wb')
- tempFilename = os.path.join(self.directory, fname+'-C')
- finalFilename = os.path.join(self.directory, fname+'-D')
- messageFile = open(tempFilename, 'wb')
- return headerFile, mail.mail.FileMessage(messageFile, tempFilename,
- finalFilename)
- def setWaiting(self, message):
- """
- Ignore the request to mark a message as waiting to be relayed.
- @type message: L{bytes}
- @param message: The base filename of a message.
- """
- pass
- class DummySmartHostSMTPRelayingManager(object):
- """
- A fake smart host to use for testing.
- @type managed: L{dict} of L{bytes} -> L{list} of
- L{list} of L{bytes}
- @ivar managed: A mapping of a string identifying a managed relayer to
- filenames of messages the managed relayer is responsible for.
- @ivar queue: See L{__init__}.
- """
- def __init__(self, queue):
- """
- Initialize the minimum necessary members of a smart host.
- @type queue: L{DummyQueue}
- @param queue: A queue that can be used for testing purposes.
- """
- self.managed = {}
- self.queue = queue
- class _AttemptManagerTests(unittest.TestCase):
- """
- Test the behavior of L{_AttemptManager}.
- @type tmpdir: L{bytes}
- @ivar tmpdir: The path to a temporary directory holding the message files.
- @type reactor: L{MemoryReactorClock}
- @ivar reactor: The reactor used for test purposes.
- @type eventLog: L{None} or L{dict} of L{bytes} -> L{object}
- @ivar eventLog: Information about the last informational log message
- generated or none if no log message has been generated.
- @type noisyAttemptMgr: L{_AttemptManager}
- @ivar noisyAttemptMgr: An attempt manager which generates informational
- log messages.
- @type quietAttemptMgr: L{_AttemptManager}
- @ivar quietAttemptMgr: An attempt manager which does not generate
- informational log messages.
- @type noisyMessage: L{bytes}
- @ivar noisyMessage: The full base pathname of the message to be used with
- the noisy attempt manager.
- @type quietMessage: L{bytes}
- @ivar quietMessage: The full base pathname of the message to be used with
- the quiet.
- """
- def setUp(self):
- """
- Set up a temporary directory for the queue, attempt managers with the
- noisy flag on and off, message files for use with each attempt manager,
- and a reactor. Also, register to be notified when log messages are
- generated.
- """
- self.tmpdir = self.mktemp()
- os.mkdir(self.tmpdir)
- self.reactor = MemoryReactorClock()
- self.eventLog = None
- log.addObserver(self._logObserver)
- self.noisyAttemptMgr = _AttemptManager(
- DummySmartHostSMTPRelayingManager(DummyQueue(self.tmpdir)),
- True, self.reactor)
- self.quietAttemptMgr = _AttemptManager(
- DummySmartHostSMTPRelayingManager(DummyQueue(self.tmpdir)),
- False, self.reactor)
- noisyBaseName = "noisyMessage"
- quietBaseName = "quietMessage"
- self.noisyMessage = os.path.join(self.tmpdir, noisyBaseName)
- self.quietMessage = os.path.join(self.tmpdir, quietBaseName)
- open(self.noisyMessage+'-D', "w").close()
- open(self.quietMessage+'-D', "w").close()
- self.noisyAttemptMgr.manager.managed['noisyRelayer'] = [
- noisyBaseName]
- self.quietAttemptMgr.manager.managed['quietRelayer'] = [
- quietBaseName]
- with open(self.noisyMessage+'-H', 'w') as envelope:
- pickle.dump(['from-noisy@domain', 'to-noisy@domain'], envelope)
- with open(self.quietMessage+'-H', 'w') as envelope:
- pickle.dump(['from-quiet@domain', 'to-quiet@domain'], envelope)
- def tearDown(self):
- """
- Unregister for log events and remove the temporary directory.
- """
- log.removeObserver(self._logObserver)
- shutil.rmtree(self.tmpdir)
- def _logObserver(self, eventDict):
- """
- A log observer.
- @type eventDict: L{dict} of L{bytes} -> L{object}
- @param eventDict: Information about the last informational log message
- generated.
- """
- self.eventLog = eventDict
- def test_initNoisyDefault(self):
- """
- When an attempt manager is created without the noisy parameter, the
- noisy instance variable should default to true.
- """
- am = _AttemptManager(DummySmartHostSMTPRelayingManager(
- DummyQueue(self.tmpdir)))
- self.assertTrue(am.noisy)
- def test_initNoisy(self):
- """
- When an attempt manager is created with the noisy parameter set to
- true, the noisy instance variable should be set to true.
- """
- self.assertTrue(self.noisyAttemptMgr.noisy)
- def test_initQuiet(self):
- """
- When an attempt manager is created with the noisy parameter set to
- false, the noisy instance variable should be set to false.
- """
- self.assertFalse(self.quietAttemptMgr.noisy)
- def test_initReactorDefault(self):
- """
- When an attempt manager is created without the reactor parameter, the
- reactor instance variable should default to the global reactor.
- """
- am = _AttemptManager(DummySmartHostSMTPRelayingManager(
- DummyQueue(self.tmpdir)))
- self.assertEqual(am.reactor, reactor)
- def test_initReactor(self):
- """
- When an attempt manager is created with a reactor provided, the
- reactor instance variable should default to that reactor.
- """
- self.assertEqual(self.noisyAttemptMgr.reactor, self.reactor)
- def test_notifySuccessNoisy(self):
- """
- For an attempt manager with the noisy flag set, notifySuccess should
- result in a log message.
- """
- self.noisyAttemptMgr.notifySuccess('noisyRelayer', self.noisyMessage)
- self.assertTrue(self.eventLog)
- def test_notifySuccessQuiet(self):
- """
- For an attempt manager with the noisy flag not set, notifySuccess
- should result in no log message.
- """
- self.quietAttemptMgr.notifySuccess('quietRelayer', self.quietMessage)
- self.assertFalse(self.eventLog)
- def test_notifyFailureNoisy(self):
- """
- For an attempt manager with the noisy flag set, notifyFailure should
- result in a log message.
- """
- self.noisyAttemptMgr.notifyFailure('noisyRelayer', self.noisyMessage)
- self.assertTrue(self.eventLog)
- def test_notifyFailureQuiet(self):
- """
- For an attempt manager with the noisy flag not set, notifyFailure
- should result in no log message.
- """
- self.quietAttemptMgr.notifyFailure('quietRelayer', self.quietMessage)
- self.assertFalse(self.eventLog)
- def test_notifyDoneNoisy(self):
- """
- For an attempt manager with the noisy flag set, notifyDone should
- result in a log message.
- """
- self.noisyAttemptMgr.notifyDone('noisyRelayer')
- self.assertTrue(self.eventLog)
- def test_notifyDoneQuiet(self):
- """
- For an attempt manager with the noisy flag not set, notifyDone
- should result in no log message.
- """
- self.quietAttemptMgr.notifyDone('quietRelayer')
- self.assertFalse(self.eventLog)
- def test_notifyNoConnectionNoisy(self):
- """
- For an attempt manager with the noisy flag set, notifyNoConnection
- should result in a log message.
- """
- self.noisyAttemptMgr.notifyNoConnection('noisyRelayer')
- self.assertTrue(self.eventLog)
- self.reactor.advance(60)
- def test_notifyNoConnectionQuiet(self):
- """
- For an attempt manager with the noisy flag not set, notifyNoConnection
- should result in no log message.
- """
- self.quietAttemptMgr.notifyNoConnection('quietRelayer')
- self.assertFalse(self.eventLog)
- self.reactor.advance(60)
- from twisted.python.runtime import platformType
- import types
- if platformType != "posix":
- for o in locals().values():
- if isinstance(o, (types.ClassType, type)) and issubclass(o, unittest.TestCase):
- o.skip = "twisted.mail only works on posix"
|