12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.words.protocols.irc}.
- """
- import errno
- import operator
- import time
- from twisted.internet import protocol, task
- from twisted.python.filepath import FilePath
- from twisted.python.compat import unicode
- from twisted.test.proto_helpers import StringTransport, StringIOWithoutClosing
- from twisted.trial.unittest import TestCase
- from twisted.words.protocols import irc
- from twisted.words.protocols.irc import IRCClient, attributes as A
- class IRCTestCase(TestCase):
- def assertEqualBufferValue(self, buf, val):
- """
- A buffer is always bytes, but sometimes
- we need to compare it to a utf-8 unicode string
- @param buf: the buffer
- @type buf: L{bytes} or L{unicode} or L{list}
- @param val: the value to compare
- @type val: L{bytes} or L{unicode} or L{list}
- """
- bufferValue = buf
- if isinstance(val, unicode):
- bufferValue = bufferValue.decode("utf-8")
- if isinstance(bufferValue, list):
- if isinstance(val[0], unicode):
- bufferValue = [b.decode("utf8") for b in bufferValue]
- self.assertEqual(bufferValue, val)
- class ModeParsingTests(IRCTestCase):
- """
- Tests for L{twisted.words.protocols.irc.parseModes}.
- """
- paramModes = ('klb', 'b')
- def test_emptyModes(self):
- """
- Parsing an empty mode string raises L{irc.IRCBadModes}.
- """
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '', [])
- def test_emptyModeSequence(self):
- """
- Parsing a mode string that contains an empty sequence (either a C{+} or
- C{-} followed directly by another C{+} or C{-}, or not followed by
- anything at all) raises L{irc.IRCBadModes}.
- """
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '++k', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '-+k', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '+', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '-', [])
- def test_malformedModes(self):
- """
- Parsing a mode string that does not start with C{+} or C{-} raises
- L{irc.IRCBadModes}.
- """
- self.assertRaises(irc.IRCBadModes, irc.parseModes, 'foo', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '%', [])
- def test_nullModes(self):
- """
- Parsing a mode string that contains no mode characters raises
- L{irc.IRCBadModes}.
- """
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '+', [])
- self.assertRaises(irc.IRCBadModes, irc.parseModes, '-', [])
- def test_singleMode(self):
- """
- Parsing a single mode setting with no parameters results in that mode,
- with no parameters, in the "added" direction and no modes in the
- "removed" direction.
- """
- added, removed = irc.parseModes('+s', [])
- self.assertEqual(added, [('s', None)])
- self.assertEqual(removed, [])
- added, removed = irc.parseModes('-s', [])
- self.assertEqual(added, [])
- self.assertEqual(removed, [('s', None)])
- def test_singleDirection(self):
- """
- Parsing a single-direction mode setting with multiple modes and no
- parameters, results in all modes falling into the same direction group.
- """
- added, removed = irc.parseModes('+stn', [])
- self.assertEqual(added, [('s', None),
- ('t', None),
- ('n', None)])
- self.assertEqual(removed, [])
- added, removed = irc.parseModes('-nt', [])
- self.assertEqual(added, [])
- self.assertEqual(removed, [('n', None),
- ('t', None)])
- def test_multiDirection(self):
- """
- Parsing a multi-direction mode setting with no parameters.
- """
- added, removed = irc.parseModes('+s-n+ti', [])
- self.assertEqual(added, [('s', None),
- ('t', None),
- ('i', None)])
- self.assertEqual(removed, [('n', None)])
- def test_consecutiveDirection(self):
- """
- Parsing a multi-direction mode setting containing two consecutive mode
- sequences with the same direction results in the same result as if
- there were only one mode sequence in the same direction.
- """
- added, removed = irc.parseModes('+sn+ti', [])
- self.assertEqual(added, [('s', None),
- ('n', None),
- ('t', None),
- ('i', None)])
- self.assertEqual(removed, [])
- def test_mismatchedParams(self):
- """
- If the number of mode parameters does not match the number of modes
- expecting parameters, L{irc.IRCBadModes} is raised.
- """
- self.assertRaises(irc.IRCBadModes,
- irc.parseModes,
- '+k', [],
- self.paramModes)
- self.assertRaises(irc.IRCBadModes,
- irc.parseModes,
- '+kl', ['foo', '10', 'lulz_extra_param'],
- self.paramModes)
- def test_parameters(self):
- """
- Modes which require parameters are parsed and paired with their relevant
- parameter, modes which do not require parameters do not consume any of
- the parameters.
- """
- added, removed = irc.parseModes(
- '+klbb',
- ['somekey', '42', 'nick!user@host', 'other!*@*'],
- self.paramModes)
- self.assertEqual(added, [('k', 'somekey'),
- ('l', '42'),
- ('b', 'nick!user@host'),
- ('b', 'other!*@*')])
- self.assertEqual(removed, [])
- added, removed = irc.parseModes(
- '-klbb',
- ['nick!user@host', 'other!*@*'],
- self.paramModes)
- self.assertEqual(added, [])
- self.assertEqual(removed, [('k', None),
- ('l', None),
- ('b', 'nick!user@host'),
- ('b', 'other!*@*')])
- # Mix a no-argument mode in with argument modes.
- added, removed = irc.parseModes(
- '+knbb',
- ['somekey', 'nick!user@host', 'other!*@*'],
- self.paramModes)
- self.assertEqual(added, [('k', 'somekey'),
- ('n', None),
- ('b', 'nick!user@host'),
- ('b', 'other!*@*')])
- self.assertEqual(removed, [])
- class MiscTests(IRCTestCase):
- """
- Tests for miscellaneous functions.
- """
- def test_foldr(self):
- """
- Apply a function of two arguments cumulatively to the items of
- a sequence, from right to left, so as to reduce the sequence to
- a single value.
- """
- self.assertEqual(
- irc._foldr(operator.sub, 0, [1, 2, 3, 4]),
- -2)
- def insertTop(l, x):
- l.insert(0, x)
- return l
- self.assertEqual(
- irc._foldr(insertTop, [], [[1], [2], [3], [4]]),
- [[[[[], 4], 3], 2], 1])
- class FormattedTextTests(IRCTestCase):
- """
- Tests for parsing and assembling formatted IRC text.
- """
- def assertAssembledEqually(self, text, expectedFormatted):
- """
- Assert that C{text} is parsed and assembled to the same value as what
- C{expectedFormatted} is assembled to. This provides a way to ignore
- meaningless differences in the formatting structure that would be
- difficult to detect without rendering the structures.
- """
- formatted = irc.parseFormattedText(text)
- self.assertAssemblesTo(formatted, expectedFormatted)
- def assertAssemblesTo(self, formatted, expectedFormatted):
- """
- Assert that C{formatted} and C{expectedFormatted} assemble to the same
- value.
- """
- text = irc.assembleFormattedText(formatted)
- expectedText = irc.assembleFormattedText(expectedFormatted)
- self.assertEqual(
- irc.assembleFormattedText(formatted),
- expectedText,
- '%r (%r) is not equivalent to %r (%r)' % (
- text, formatted, expectedText, expectedFormatted))
- def test_parseEmpty(self):
- """
- An empty string parses to a I{normal} attribute with no text.
- """
- self.assertAssembledEqually('', A.normal)
- def test_assembleEmpty(self):
- """
- An attribute with no text assembles to the empty string. An attribute
- whose text is the empty string assembles to two control codes: C{off}
- and that of the attribute.
- """
- self.assertEqual(
- irc.assembleFormattedText(A.normal),
- '')
- # Attempting to apply an attribute to the empty string should still
- # produce two control codes.
- self.assertEqual(
- irc.assembleFormattedText(
- A.bold['']),
- '\x0f\x02')
- def test_assembleNormal(self):
- """
- A I{normal} string assembles to a string prefixed with the I{off}
- control code.
- """
- self.assertEqual(
- irc.assembleFormattedText(
- A.normal['hello']),
- '\x0fhello')
- def test_assembleBold(self):
- """
- A I{bold} string assembles to a string prefixed with the I{off} and
- I{bold} control codes.
- """
- self.assertEqual(
- irc.assembleFormattedText(
- A.bold['hello']),
- '\x0f\x02hello')
- def test_assembleUnderline(self):
- """
- An I{underline} string assembles to a string prefixed with the I{off}
- and I{underline} control codes.
- """
- self.assertEqual(
- irc.assembleFormattedText(
- A.underline['hello']),
- '\x0f\x1fhello')
- def test_assembleReverseVideo(self):
- """
- A I{reverse video} string assembles to a string prefixed with the I{off}
- and I{reverse video} control codes.
- """
- self.assertEqual(
- irc.assembleFormattedText(
- A.reverseVideo['hello']),
- '\x0f\x16hello')
- def test_assembleForegroundColor(self):
- """
- A I{foreground color} string assembles to a string prefixed with the
- I{off} and I{color} (followed by the relevant foreground color code)
- control codes.
- """
- self.assertEqual(
- irc.assembleFormattedText(
- A.fg.blue['hello']),
- '\x0f\x0302hello')
- def test_assembleBackgroundColor(self):
- """
- A I{background color} string assembles to a string prefixed with the
- I{off} and I{color} (followed by a I{,} to indicate the absence of a
- foreground color, followed by the relevant background color code)
- control codes.
- """
- self.assertEqual(
- irc.assembleFormattedText(
- A.bg.blue['hello']),
- '\x0f\x03,02hello')
- def test_assembleColor(self):
- """
- A I{foreground} and I{background} color string assembles to a string
- prefixed with the I{off} and I{color} (followed by the relevant
- foreground color, I{,} and the relevant background color code) control
- codes.
- """
- self.assertEqual(
- irc.assembleFormattedText(
- A.fg.red[A.bg.blue['hello']]),
- '\x0f\x0305,02hello')
- def test_assembleNested(self):
- """
- Nested attributes retain the attributes of their parents.
- """
- self.assertEqual(
- irc.assembleFormattedText(
- A.bold['hello', A.underline[' world']]),
- '\x0f\x02hello\x0f\x02\x1f world')
- self.assertEqual(
- irc.assembleFormattedText(
- A.normal[
- A.fg.red[A.bg.green['hello'], ' world'],
- A.reverseVideo[' yay']]),
- '\x0f\x0305,03hello\x0f\x0305 world\x0f\x16 yay')
- def test_parseUnformattedText(self):
- """
- Parsing unformatted text results in text with attributes that
- constitute a no-op.
- """
- self.assertEqual(
- irc.parseFormattedText('hello'),
- A.normal['hello'])
- def test_colorFormatting(self):
- """
- Correctly formatted text with colors uses 2 digits to specify
- foreground and (optionally) background.
- """
- self.assertEqual(
- irc.parseFormattedText('\x0301yay\x03'),
- A.fg.black['yay'])
- self.assertEqual(
- irc.parseFormattedText('\x0301,02yay\x03'),
- A.fg.black[A.bg.blue['yay']])
- self.assertEqual(
- irc.parseFormattedText('\x0301yay\x0302yipee\x03'),
- A.fg.black['yay', A.fg.blue['yipee']])
- def test_weirdColorFormatting(self):
- """
- Formatted text with colors can use 1 digit for both foreground and
- background, as long as the text part does not begin with a digit.
- Foreground and background colors are only processed to a maximum of 2
- digits per component, anything else is treated as text. Color sequences
- must begin with a digit, otherwise processing falls back to unformatted
- text.
- """
- self.assertAssembledEqually(
- '\x031kinda valid',
- A.fg.black['kinda valid'])
- self.assertAssembledEqually(
- '\x03999,999kinda valid',
- A.fg.green['9,999kinda valid'])
- self.assertAssembledEqually(
- '\x031,2kinda valid',
- A.fg.black[A.bg.blue['kinda valid']])
- self.assertAssembledEqually(
- '\x031,999kinda valid',
- A.fg.black[A.bg.green['9kinda valid']])
- self.assertAssembledEqually(
- '\x031,242 is a special number',
- A.fg.black[A.bg.yellow['2 is a special number']])
- self.assertAssembledEqually(
- '\x03,02oops\x03',
- A.normal[',02oops'])
- self.assertAssembledEqually(
- '\x03wrong',
- A.normal['wrong'])
- self.assertAssembledEqually(
- '\x031,hello',
- A.fg.black['hello'])
- self.assertAssembledEqually(
- '\x03\x03',
- A.normal)
- def test_clearColorFormatting(self):
- """
- An empty color format specifier clears foreground and background
- colors.
- """
- self.assertAssembledEqually(
- '\x0301yay\x03reset',
- A.normal[A.fg.black['yay'], 'reset'])
- self.assertAssembledEqually(
- '\x0301,02yay\x03reset',
- A.normal[A.fg.black[A.bg.blue['yay']], 'reset'])
- def test_resetFormatting(self):
- """
- A reset format specifier clears all formatting attributes.
- """
- self.assertAssembledEqually(
- '\x02\x1fyay\x0freset',
- A.normal[A.bold[A.underline['yay']], 'reset'])
- self.assertAssembledEqually(
- '\x0301yay\x0freset',
- A.normal[A.fg.black['yay'], 'reset'])
- self.assertAssembledEqually(
- '\x0301,02yay\x0freset',
- A.normal[A.fg.black[A.bg.blue['yay']], 'reset'])
- def test_stripFormatting(self):
- """
- Strip formatting codes from formatted text, leaving only the text parts.
- """
- self.assertEqual(
- irc.stripFormatting(
- irc.assembleFormattedText(
- A.bold[
- A.underline[
- A.reverseVideo[A.fg.red[A.bg.green['hello']]],
- ' world']])),
- 'hello world')
- class FormattingStateAttributeTests(IRCTestCase):
- """
- Tests for L{twisted.words.protocols.irc._FormattingState}.
- """
- def test_equality(self):
- """
- L{irc._FormattingState}s must have matching character attribute
- values (bold, underline, etc) with the same values to be considered
- equal.
- """
- self.assertEqual(
- irc._FormattingState(),
- irc._FormattingState())
- self.assertEqual(
- irc._FormattingState(),
- irc._FormattingState(off=False))
- self.assertEqual(
- irc._FormattingState(
- bold=True, underline=True, off=False, reverseVideo=True,
- foreground=irc._IRC_COLORS['blue']),
- irc._FormattingState(
- bold=True, underline=True, off=False, reverseVideo=True,
- foreground=irc._IRC_COLORS['blue']))
- self.assertNotEqual(
- irc._FormattingState(bold=True),
- irc._FormattingState(bold=False))
- stringSubjects = [
- "Hello, this is a nice string with no complications.",
- "xargs%(NUL)smight%(NUL)slike%(NUL)sthis" % {'NUL': irc.NUL },
- "embedded%(CR)snewline%(CR)s%(NL)sFUN%(NL)s" % {'CR': irc.CR,
- 'NL': irc.NL},
- "escape!%(X)s escape!%(M)s %(X)s%(X)sa %(M)s0" % {'X': irc.X_QUOTE,
- 'M': irc.M_QUOTE}
- ]
- class QuotingTests(IRCTestCase):
- def test_lowquoteSanity(self):
- """
- Testing client-server level quote/dequote.
- """
- for s in stringSubjects:
- self.assertEqual(s, irc.lowDequote(irc.lowQuote(s)))
- def test_ctcpquoteSanity(self):
- """
- Testing CTCP message level quote/dequote.
- """
- for s in stringSubjects:
- self.assertEqual(s, irc.ctcpDequote(irc.ctcpQuote(s)))
- class Dispatcher(irc._CommandDispatcherMixin):
- """
- A dispatcher that exposes one known command and handles unknown commands.
- """
- prefix = 'disp'
- def disp_working(self, a, b):
- """
- A known command that returns its input.
- """
- return a, b
- def disp_unknown(self, name, a, b):
- """
- Handle unknown commands by returning their name and inputs.
- """
- return name, a, b
- class DispatcherTests(IRCTestCase):
- """
- Tests for L{irc._CommandDispatcherMixin}.
- """
- def test_dispatch(self):
- """
- Dispatching a command invokes the correct handler.
- """
- disp = Dispatcher()
- args = (1, 2)
- res = disp.dispatch('working', *args)
- self.assertEqual(res, args)
- def test_dispatchUnknown(self):
- """
- Dispatching an unknown command invokes the default handler.
- """
- disp = Dispatcher()
- name = 'missing'
- args = (1, 2)
- res = disp.dispatch(name, *args)
- self.assertEqual(res, (name,) + args)
- def test_dispatchMissingUnknown(self):
- """
- Dispatching an unknown command, when no default handler is present,
- results in an exception being raised.
- """
- disp = Dispatcher()
- disp.disp_unknown = None
- self.assertRaises(irc.UnhandledCommand, disp.dispatch, 'bar')
- class ServerSupportedFeatureTests(IRCTestCase):
- """
- Tests for L{ServerSupportedFeatures} and related functions.
- """
- def test_intOrDefault(self):
- """
- L{_intOrDefault} converts values to C{int} if possible, otherwise
- returns a default value.
- """
- self.assertEqual(irc._intOrDefault(None), None)
- self.assertEqual(irc._intOrDefault([]), None)
- self.assertEqual(irc._intOrDefault(''), None)
- self.assertEqual(irc._intOrDefault('hello', 5), 5)
- self.assertEqual(irc._intOrDefault('123'), 123)
- self.assertEqual(irc._intOrDefault(123), 123)
- def test_splitParam(self):
- """
- L{ServerSupportedFeatures._splitParam} splits ISUPPORT parameters
- into key and values. Parameters without a separator are split into a
- key and a list containing only the empty string. Escaped parameters
- are unescaped.
- """
- params = [('FOO', ('FOO', [''])),
- ('FOO=', ('FOO', [''])),
- ('FOO=1', ('FOO', ['1'])),
- ('FOO=1,2,3', ('FOO', ['1', '2', '3'])),
- ('FOO=A\\x20B', ('FOO', ['A B'])),
- ('FOO=\\x5Cx', ('FOO', ['\\x'])),
- ('FOO=\\', ('FOO', ['\\'])),
- ('FOO=\\n', ('FOO', ['\\n']))]
- _splitParam = irc.ServerSupportedFeatures._splitParam
- for param, expected in params:
- res = _splitParam(param)
- self.assertEqual(res, expected)
- self.assertRaises(ValueError, _splitParam, 'FOO=\\x')
- self.assertRaises(ValueError, _splitParam, 'FOO=\\xNN')
- self.assertRaises(ValueError, _splitParam, 'FOO=\\xN')
- self.assertRaises(ValueError, _splitParam, 'FOO=\\x20\\x')
- def test_splitParamArgs(self):
- """
- L{ServerSupportedFeatures._splitParamArgs} splits ISUPPORT parameter
- arguments into key and value. Arguments without a separator are
- split into a key and an empty string.
- """
- res = irc.ServerSupportedFeatures._splitParamArgs(['A:1', 'B:2', 'C:', 'D'])
- self.assertEqual(res, [('A', '1'),
- ('B', '2'),
- ('C', ''),
- ('D', '')])
- def test_splitParamArgsProcessor(self):
- """
- L{ServerSupportedFeatures._splitParamArgs} uses the argument processor
- passed to convert ISUPPORT argument values to some more suitable
- form.
- """
- res = irc.ServerSupportedFeatures._splitParamArgs(['A:1', 'B:2', 'C'],
- irc._intOrDefault)
- self.assertEqual(res, [('A', 1),
- ('B', 2),
- ('C', None)])
- def test_parsePrefixParam(self):
- """
- L{ServerSupportedFeatures._parsePrefixParam} parses the ISUPPORT PREFIX
- parameter into a mapping from modes to prefix symbols, returns
- L{None} if there is no parseable prefix parameter or raises
- C{ValueError} if the prefix parameter is malformed.
- """
- _parsePrefixParam = irc.ServerSupportedFeatures._parsePrefixParam
- self.assertEqual(_parsePrefixParam(''), None)
- self.assertRaises(ValueError, _parsePrefixParam, 'hello')
- self.assertEqual(_parsePrefixParam('(ov)@+'),
- {'o': ('@', 0),
- 'v': ('+', 1)})
- def test_parseChanModesParam(self):
- """
- L{ServerSupportedFeatures._parseChanModesParam} parses the ISUPPORT
- CHANMODES parameter into a mapping from mode categories to mode
- characters. Passing fewer than 4 parameters results in the empty string
- for the relevant categories. Passing more than 4 parameters raises
- C{ValueError}.
- """
- _parseChanModesParam = irc.ServerSupportedFeatures._parseChanModesParam
- self.assertEqual(
- _parseChanModesParam(['', '', '', '']),
- {'addressModes': '',
- 'param': '',
- 'setParam': '',
- 'noParam': ''})
- self.assertEqual(
- _parseChanModesParam(['b', 'k', 'l', 'imnpst']),
- {'addressModes': 'b',
- 'param': 'k',
- 'setParam': 'l',
- 'noParam': 'imnpst'})
- self.assertEqual(
- _parseChanModesParam(['b', 'k', 'l', '']),
- {'addressModes': 'b',
- 'param': 'k',
- 'setParam': 'l',
- 'noParam': ''})
- self.assertRaises(
- ValueError,
- _parseChanModesParam, ['a', 'b', 'c', 'd', 'e'])
- def test_parse(self):
- """
- L{ServerSupportedFeatures.parse} changes the internal state of the
- instance to reflect the features indicated by the parsed ISUPPORT
- parameters, including unknown parameters and unsetting previously set
- parameters.
- """
- supported = irc.ServerSupportedFeatures()
- supported.parse(['MODES=4',
- 'CHANLIMIT=#:20,&:10',
- 'INVEX',
- 'EXCEPTS=Z',
- 'UNKNOWN=A,B,C'])
- self.assertEqual(supported.getFeature('MODES'), 4)
- self.assertEqual(supported.getFeature('CHANLIMIT'),
- [('#', 20),
- ('&', 10)])
- self.assertEqual(supported.getFeature('INVEX'), 'I')
- self.assertEqual(supported.getFeature('EXCEPTS'), 'Z')
- self.assertEqual(supported.getFeature('UNKNOWN'), ('A', 'B', 'C'))
- self.assertTrue(supported.hasFeature('INVEX'))
- supported.parse(['-INVEX'])
- self.assertFalse(supported.hasFeature('INVEX'))
- # Unsetting a previously unset parameter should not be a problem.
- supported.parse(['-INVEX'])
- def _parse(self, features):
- """
- Parse all specified features according to the ISUPPORT specifications.
- @type features: C{list} of C{(featureName, value)}
- @param features: Feature names and values to parse
- @rtype: L{irc.ServerSupportedFeatures}
- """
- supported = irc.ServerSupportedFeatures()
- features = ['%s=%s' % (name, value or '')
- for name, value in features]
- supported.parse(features)
- return supported
- def _parseFeature(self, name, value=None):
- """
- Parse a feature, with the given name and value, according to the
- ISUPPORT specifications and return the parsed value.
- """
- supported = self._parse([(name, value)])
- return supported.getFeature(name)
- def _testIntOrDefaultFeature(self, name, default=None):
- """
- Perform some common tests on a feature known to use L{_intOrDefault}.
- """
- self.assertEqual(
- self._parseFeature(name, None),
- default)
- self.assertEqual(
- self._parseFeature(name, 'notanint'),
- default)
- self.assertEqual(
- self._parseFeature(name, '42'),
- 42)
- def _testFeatureDefault(self, name, features=None):
- """
- Features known to have default values are reported as being present by
- L{irc.ServerSupportedFeatures.hasFeature}, and their value defaults
- correctly, when they don't appear in an ISUPPORT message.
- """
- default = irc.ServerSupportedFeatures()._features[name]
- if features is None:
- features = [('DEFINITELY_NOT', 'a_feature')]
- supported = self._parse(features)
- self.assertTrue(supported.hasFeature(name))
- self.assertEqual(supported.getFeature(name), default)
- def test_support_CHANMODES(self):
- """
- The CHANMODES ISUPPORT parameter is parsed into a C{dict} giving the
- four mode categories, C{'addressModes'}, C{'param'}, C{'setParam'}, and
- C{'noParam'}.
- """
- self._testFeatureDefault('CHANMODES')
- self._testFeatureDefault('CHANMODES', [('CHANMODES', 'b,,lk,')])
- self._testFeatureDefault('CHANMODES', [('CHANMODES', 'b,,lk,ha,ha')])
- self.assertEqual(
- self._parseFeature('CHANMODES', ',,,'),
- {'addressModes': '',
- 'param': '',
- 'setParam': '',
- 'noParam': ''})
- self.assertEqual(
- self._parseFeature('CHANMODES', ',A,,'),
- {'addressModes': '',
- 'param': 'A',
- 'setParam': '',
- 'noParam': ''})
- self.assertEqual(
- self._parseFeature('CHANMODES', 'A,Bc,Def,Ghij'),
- {'addressModes': 'A',
- 'param': 'Bc',
- 'setParam': 'Def',
- 'noParam': 'Ghij'})
- def test_support_IDCHAN(self):
- """
- The IDCHAN support parameter is parsed into a sequence of two-tuples
- giving channel prefix and ID length pairs.
- """
- self.assertEqual(
- self._parseFeature('IDCHAN', '!:5'),
- [('!', '5')])
- def test_support_MAXLIST(self):
- """
- The MAXLIST support parameter is parsed into a sequence of two-tuples
- giving modes and their limits.
- """
- self.assertEqual(
- self._parseFeature('MAXLIST', 'b:25,eI:50'),
- [('b', 25), ('eI', 50)])
- # A non-integer parameter argument results in None.
- self.assertEqual(
- self._parseFeature('MAXLIST', 'b:25,eI:50,a:3.1415'),
- [('b', 25), ('eI', 50), ('a', None)])
- self.assertEqual(
- self._parseFeature('MAXLIST', 'b:25,eI:50,a:notanint'),
- [('b', 25), ('eI', 50), ('a', None)])
- def test_support_NETWORK(self):
- """
- The NETWORK support parameter is parsed as the network name, as
- specified by the server.
- """
- self.assertEqual(
- self._parseFeature('NETWORK', 'IRCNet'),
- 'IRCNet')
- def test_support_SAFELIST(self):
- """
- The SAFELIST support parameter is parsed into a boolean indicating
- whether the safe "list" command is supported or not.
- """
- self.assertEqual(
- self._parseFeature('SAFELIST'),
- True)
- def test_support_STATUSMSG(self):
- """
- The STATUSMSG support parameter is parsed into a string of channel
- status that support the exclusive channel notice method.
- """
- self.assertEqual(
- self._parseFeature('STATUSMSG', '@+'),
- '@+')
- def test_support_TARGMAX(self):
- """
- The TARGMAX support parameter is parsed into a dictionary, mapping
- strings to integers, of the maximum number of targets for a particular
- command.
- """
- self.assertEqual(
- self._parseFeature('TARGMAX', 'PRIVMSG:4,NOTICE:3'),
- {'PRIVMSG': 4,
- 'NOTICE': 3})
- # A non-integer parameter argument results in None.
- self.assertEqual(
- self._parseFeature('TARGMAX', 'PRIVMSG:4,NOTICE:3,KICK:3.1415'),
- {'PRIVMSG': 4,
- 'NOTICE': 3,
- 'KICK': None})
- self.assertEqual(
- self._parseFeature('TARGMAX', 'PRIVMSG:4,NOTICE:3,KICK:notanint'),
- {'PRIVMSG': 4,
- 'NOTICE': 3,
- 'KICK': None})
- def test_support_NICKLEN(self):
- """
- The NICKLEN support parameter is parsed into an integer value
- indicating the maximum length of a nickname the client may use,
- otherwise, if the parameter is missing or invalid, the default value
- (as specified by RFC 1459) is used.
- """
- default = irc.ServerSupportedFeatures()._features['NICKLEN']
- self._testIntOrDefaultFeature('NICKLEN', default)
- def test_support_CHANNELLEN(self):
- """
- The CHANNELLEN support parameter is parsed into an integer value
- indicating the maximum channel name length, otherwise, if the
- parameter is missing or invalid, the default value (as specified by
- RFC 1459) is used.
- """
- default = irc.ServerSupportedFeatures()._features['CHANNELLEN']
- self._testIntOrDefaultFeature('CHANNELLEN', default)
- def test_support_CHANTYPES(self):
- """
- The CHANTYPES support parameter is parsed into a tuple of
- valid channel prefix characters.
- """
- self._testFeatureDefault('CHANTYPES')
- self.assertEqual(
- self._parseFeature('CHANTYPES', '#&%'),
- ('#', '&', '%'))
- def test_support_KICKLEN(self):
- """
- The KICKLEN support parameter is parsed into an integer value
- indicating the maximum length of a kick message a client may use.
- """
- self._testIntOrDefaultFeature('KICKLEN')
- def test_support_PREFIX(self):
- """
- The PREFIX support parameter is parsed into a dictionary mapping
- modes to two-tuples of status symbol and priority.
- """
- self._testFeatureDefault('PREFIX')
- self._testFeatureDefault('PREFIX', [('PREFIX', 'hello')])
- self.assertEqual(
- self._parseFeature('PREFIX', None),
- None)
- self.assertEqual(
- self._parseFeature('PREFIX', '(ohv)@%+'),
- {'o': ('@', 0),
- 'h': ('%', 1),
- 'v': ('+', 2)})
- self.assertEqual(
- self._parseFeature('PREFIX', '(hov)@%+'),
- {'o': ('%', 1),
- 'h': ('@', 0),
- 'v': ('+', 2)})
- def test_support_TOPICLEN(self):
- """
- The TOPICLEN support parameter is parsed into an integer value
- indicating the maximum length of a topic a client may set.
- """
- self._testIntOrDefaultFeature('TOPICLEN')
- def test_support_MODES(self):
- """
- The MODES support parameter is parsed into an integer value
- indicating the maximum number of "variable" modes (defined as being
- modes from C{addressModes}, C{param} or C{setParam} categories for
- the C{CHANMODES} ISUPPORT parameter) which may by set on a channel
- by a single MODE command from a client.
- """
- self._testIntOrDefaultFeature('MODES')
- def test_support_EXCEPTS(self):
- """
- The EXCEPTS support parameter is parsed into the mode character
- to be used for "ban exception" modes. If no parameter is specified
- then the character C{e} is assumed.
- """
- self.assertEqual(
- self._parseFeature('EXCEPTS', 'Z'),
- 'Z')
- self.assertEqual(
- self._parseFeature('EXCEPTS'),
- 'e')
- def test_support_INVEX(self):
- """
- The INVEX support parameter is parsed into the mode character to be
- used for "invite exception" modes. If no parameter is specified then
- the character C{I} is assumed.
- """
- self.assertEqual(
- self._parseFeature('INVEX', 'Z'),
- 'Z')
- self.assertEqual(
- self._parseFeature('INVEX'),
- 'I')
- class IRCClientWithoutLogin(irc.IRCClient):
- performLogin = 0
- class CTCPTests(IRCTestCase):
- """
- Tests for L{twisted.words.protocols.irc.IRCClient} CTCP handling.
- """
- def setUp(self):
- self.file = StringIOWithoutClosing()
- self.transport = protocol.FileWrapper(self.file)
- self.client = IRCClientWithoutLogin()
- self.client.makeConnection(self.transport)
- self.addCleanup(self.transport.loseConnection)
- self.addCleanup(self.client.connectionLost, None)
- def test_ERRMSG(self):
- """Testing CTCP query ERRMSG.
- Not because this is this is an especially important case in the
- field, but it does go through the entire dispatch/decode/encode
- process.
- """
- errQuery = (":nick!guy@over.there PRIVMSG #theChan :"
- "%(X)cERRMSG t%(X)c%(EOL)s"
- % {'X': irc.X_DELIM,
- 'EOL': irc.CR + irc.LF})
- errReply = ("NOTICE nick :%(X)cERRMSG t :"
- "No error has occurred.%(X)c%(EOL)s"
- % {'X': irc.X_DELIM,
- 'EOL': irc.CR + irc.LF})
- self.client.dataReceived(errQuery)
- reply = self.file.getvalue()
- self.assertEqualBufferValue(reply, errReply)
- def test_noNumbersVERSION(self):
- """
- If attributes for version information on L{IRCClient} are set to
- L{None}, the parts of the CTCP VERSION response they correspond to
- are omitted.
- """
- self.client.versionName = "FrobozzIRC"
- self.client.ctcpQuery_VERSION("nick!guy@over.there", "#theChan", None)
- versionReply = ("NOTICE nick :%(X)cVERSION %(vname)s::"
- "%(X)c%(EOL)s"
- % {'X': irc.X_DELIM,
- 'EOL': irc.CR + irc.LF,
- 'vname': self.client.versionName})
- reply = self.file.getvalue()
- self.assertEqualBufferValue(reply, versionReply)
- def test_fullVERSION(self):
- """
- The response to a CTCP VERSION query includes the version number and
- environment information, as specified by L{IRCClient.versionNum} and
- L{IRCClient.versionEnv}.
- """
- self.client.versionName = "FrobozzIRC"
- self.client.versionNum = "1.2g"
- self.client.versionEnv = "ZorkOS"
- self.client.ctcpQuery_VERSION("nick!guy@over.there", "#theChan", None)
- versionReply = ("NOTICE nick :%(X)cVERSION %(vname)s:%(vnum)s:%(venv)s"
- "%(X)c%(EOL)s"
- % {'X': irc.X_DELIM,
- 'EOL': irc.CR + irc.LF,
- 'vname': self.client.versionName,
- 'vnum': self.client.versionNum,
- 'venv': self.client.versionEnv})
- reply = self.file.getvalue()
- self.assertEqualBufferValue(reply, versionReply)
- def test_noDuplicateCTCPDispatch(self):
- """
- Duplicated CTCP messages are ignored and no reply is made.
- """
- def testCTCP(user, channel, data):
- self.called += 1
- self.called = 0
- self.client.ctcpQuery_TESTTHIS = testCTCP
- self.client.irc_PRIVMSG(
- 'foo!bar@baz.quux', [
- '#chan',
- '%(X)sTESTTHIS%(X)sfoo%(X)sTESTTHIS%(X)s' % {'X': irc.X_DELIM}])
- self.assertEqualBufferValue(self.file.getvalue(), '')
- self.assertEqual(self.called, 1)
- def test_noDefaultDispatch(self):
- """
- The fallback handler is invoked for unrecognized CTCP messages.
- """
- def unknownQuery(user, channel, tag, data):
- self.calledWith = (user, channel, tag, data)
- self.called += 1
- self.called = 0
- self.patch(self.client, 'ctcpUnknownQuery', unknownQuery)
- self.client.irc_PRIVMSG(
- 'foo!bar@baz.quux', [
- '#chan',
- '%(X)sNOTREAL%(X)s' % {'X': irc.X_DELIM}])
- self.assertEqualBufferValue(self.file.getvalue(), '')
- self.assertEqual(
- self.calledWith,
- ('foo!bar@baz.quux', '#chan', 'NOTREAL', None))
- self.assertEqual(self.called, 1)
- # The fallback handler is not invoked for duplicate unknown CTCP
- # messages.
- self.client.irc_PRIVMSG(
- 'foo!bar@baz.quux', [
- '#chan',
- '%(X)sNOTREAL%(X)sfoo%(X)sNOTREAL%(X)s' % {'X': irc.X_DELIM}])
- self.assertEqual(self.called, 2)
- class NoticingClient(IRCClientWithoutLogin, object):
- methods = {
- 'created': ('when',),
- 'yourHost': ('info',),
- 'myInfo': ('servername', 'version', 'umodes', 'cmodes'),
- 'luserClient': ('info',),
- 'bounce': ('info',),
- 'isupport': ('options',),
- 'luserChannels': ('channels',),
- 'luserOp': ('ops',),
- 'luserMe': ('info',),
- 'receivedMOTD': ('motd',),
- 'privmsg': ('user', 'channel', 'message'),
- 'joined': ('channel',),
- 'left': ('channel',),
- 'noticed': ('user', 'channel', 'message'),
- 'modeChanged': ('user', 'channel', 'set', 'modes', 'args'),
- 'pong': ('user', 'secs'),
- 'signedOn': (),
- 'kickedFrom': ('channel', 'kicker', 'message'),
- 'nickChanged': ('nick',),
- 'userJoined': ('user', 'channel'),
- 'userLeft': ('user', 'channel'),
- 'userKicked': ('user', 'channel', 'kicker', 'message'),
- 'action': ('user', 'channel', 'data'),
- 'topicUpdated': ('user', 'channel', 'newTopic'),
- 'userRenamed': ('oldname', 'newname')}
- def __init__(self, *a, **kw):
- # It is important that IRCClient.__init__ is not called since
- # traditionally it did not exist, so it is important that nothing is
- # initialised there that would prevent subclasses that did not (or
- # could not) invoke the base implementation. Any protocol
- # initialisation should happen in connectionMode.
- self.calls = []
- def __getattribute__(self, name):
- if name.startswith('__') and name.endswith('__'):
- return super(NoticingClient, self).__getattribute__(name)
- try:
- args = super(NoticingClient, self).__getattribute__('methods')[name]
- except KeyError:
- return super(NoticingClient, self).__getattribute__(name)
- else:
- return self.makeMethod(name, args)
- def makeMethod(self, fname, args):
- def method(*a, **kw):
- if len(a) > len(args):
- raise TypeError("TypeError: %s() takes %d arguments "
- "(%d given)" % (fname, len(args), len(a)))
- for (name, value) in zip(args, a):
- if name in kw:
- raise TypeError("TypeError: %s() got multiple values "
- "for keyword argument '%s'" % (fname, name))
- else:
- kw[name] = value
- if len(kw) != len(args):
- raise TypeError("TypeError: %s() takes %d arguments "
- "(%d given)" % (fname, len(args), len(a)))
- self.calls.append((fname, kw))
- return method
- def pop(dict, key, default):
- try:
- value = dict[key]
- except KeyError:
- return default
- else:
- del dict[key]
- return value
- class ClientImplementationTests(IRCTestCase):
- def setUp(self):
- self.transport = StringTransport()
- self.client = NoticingClient()
- self.client.makeConnection(self.transport)
- self.addCleanup(self.transport.loseConnection)
- self.addCleanup(self.client.connectionLost, None)
- def _serverTestImpl(self, code, msg, func, **kw):
- host = pop(kw, 'host', 'server.host')
- nick = pop(kw, 'nick', 'nickname')
- args = pop(kw, 'args', '')
- message = (":" +
- host + " " +
- code + " " +
- nick + " " +
- args + " :" +
- msg + "\r\n")
- self.client.dataReceived(message)
- self.assertEqual(
- self.client.calls,
- [(func, kw)])
- def testYourHost(self):
- msg = "Your host is some.host[blah.blah/6667], running version server-version-3"
- self._serverTestImpl("002", msg, "yourHost", info=msg)
- def testCreated(self):
- msg = "This server was cobbled together Fri Aug 13 18:00:25 UTC 2004"
- self._serverTestImpl("003", msg, "created", when=msg)
- def testMyInfo(self):
- msg = "server.host server-version abcDEF bcdEHI"
- self._serverTestImpl("004", msg, "myInfo",
- servername="server.host",
- version="server-version",
- umodes="abcDEF",
- cmodes="bcdEHI")
- def testLuserClient(self):
- msg = "There are 9227 victims and 9542 hiding on 24 servers"
- self._serverTestImpl("251", msg, "luserClient",
- info=msg)
- def _sendISUPPORT(self):
- args = ("MODES=4 CHANLIMIT=#:20 NICKLEN=16 USERLEN=10 HOSTLEN=63 "
- "TOPICLEN=450 KICKLEN=450 CHANNELLEN=30 KEYLEN=23 CHANTYPES=# "
- "PREFIX=(ov)@+ CASEMAPPING=ascii CAPAB IRCD=dancer")
- msg = "are available on this server"
- self._serverTestImpl("005", msg, "isupport", args=args,
- options=['MODES=4',
- 'CHANLIMIT=#:20',
- 'NICKLEN=16',
- 'USERLEN=10',
- 'HOSTLEN=63',
- 'TOPICLEN=450',
- 'KICKLEN=450',
- 'CHANNELLEN=30',
- 'KEYLEN=23',
- 'CHANTYPES=#',
- 'PREFIX=(ov)@+',
- 'CASEMAPPING=ascii',
- 'CAPAB',
- 'IRCD=dancer'])
- def test_ISUPPORT(self):
- """
- The client parses ISUPPORT messages sent by the server and calls
- L{IRCClient.isupport}.
- """
- self._sendISUPPORT()
- def testBounce(self):
- msg = "Try server some.host, port 321"
- self._serverTestImpl("010", msg, "bounce",
- info=msg)
- def testLuserChannels(self):
- args = "7116"
- msg = "channels formed"
- self._serverTestImpl("254", msg, "luserChannels", args=args,
- channels=int(args))
- def testLuserOp(self):
- args = "34"
- msg = "flagged staff members"
- self._serverTestImpl("252", msg, "luserOp", args=args,
- ops=int(args))
- def testLuserMe(self):
- msg = "I have 1937 clients and 0 servers"
- self._serverTestImpl("255", msg, "luserMe",
- info=msg)
- def test_receivedMOTD(self):
- """
- Lines received in I{RPL_MOTDSTART} and I{RPL_MOTD} are delivered to
- L{IRCClient.receivedMOTD} when I{RPL_ENDOFMOTD} is received.
- """
- lines = [
- ":host.name 375 nickname :- host.name Message of the Day -",
- ":host.name 372 nickname :- Welcome to host.name",
- ":host.name 376 nickname :End of /MOTD command."]
- for L in lines:
- self.assertEqual(self.client.calls, [])
- self.client.dataReceived(L + '\r\n')
- self.assertEqual(
- self.client.calls,
- [("receivedMOTD", {"motd": ["host.name Message of the Day -", "Welcome to host.name"]})])
- # After the motd is delivered, the tracking variable should be
- # reset.
- self.assertIdentical(self.client.motd, None)
- def test_withoutMOTDSTART(self):
- """
- If L{IRCClient} receives I{RPL_MOTD} and I{RPL_ENDOFMOTD} without
- receiving I{RPL_MOTDSTART}, L{IRCClient.receivedMOTD} is still
- called with a list of MOTD lines.
- """
- lines = [
- ":host.name 372 nickname :- Welcome to host.name",
- ":host.name 376 nickname :End of /MOTD command."]
- for L in lines:
- self.client.dataReceived(L + '\r\n')
- self.assertEqual(
- self.client.calls,
- [("receivedMOTD", {"motd": ["Welcome to host.name"]})])
- def _clientTestImpl(self, sender, group, type, msg, func, **kw):
- ident = pop(kw, 'ident', 'ident')
- host = pop(kw, 'host', 'host')
- wholeUser = sender + '!' + ident + '@' + host
- message = (":" +
- wholeUser + " " +
- type + " " +
- group + " :" +
- msg + "\r\n")
- self.client.dataReceived(message)
- self.assertEqual(
- self.client.calls,
- [(func, kw)])
- self.client.calls = []
- def testPrivmsg(self):
- msg = "Tooty toot toot."
- self._clientTestImpl("sender", "#group", "PRIVMSG", msg, "privmsg",
- ident="ident", host="host",
- # Expected results below
- user="sender!ident@host",
- channel="#group",
- message=msg)
- self._clientTestImpl("sender", "recipient", "PRIVMSG", msg, "privmsg",
- ident="ident", host="host",
- # Expected results below
- user="sender!ident@host",
- channel="recipient",
- message=msg)
- def test_getChannelModeParams(self):
- """
- L{IRCClient.getChannelModeParams} uses ISUPPORT information, either
- given by the server or defaults, to determine which channel modes
- require arguments when being added or removed.
- """
- add, remove = map(sorted, self.client.getChannelModeParams())
- self.assertEqual(add, ['b', 'h', 'k', 'l', 'o', 'v'])
- self.assertEqual(remove, ['b', 'h', 'o', 'v'])
- def removeFeature(name):
- name = '-' + name
- msg = "are available on this server"
- self._serverTestImpl(
- '005', msg, 'isupport', args=name, options=[name])
- self.assertIdentical(
- self.client.supported.getFeature(name), None)
- self.client.calls = []
- # Remove CHANMODES feature, causing getFeature('CHANMODES') to return
- # None.
- removeFeature('CHANMODES')
- add, remove = map(sorted, self.client.getChannelModeParams())
- self.assertEqual(add, ['h', 'o', 'v'])
- self.assertEqual(remove, ['h', 'o', 'v'])
- # Remove PREFIX feature, causing getFeature('PREFIX') to return None.
- removeFeature('PREFIX')
- add, remove = map(sorted, self.client.getChannelModeParams())
- self.assertEqual(add, [])
- self.assertEqual(remove, [])
- # Restore ISUPPORT features.
- self._sendISUPPORT()
- self.assertNotIdentical(
- self.client.supported.getFeature('PREFIX'), None)
- def test_getUserModeParams(self):
- """
- L{IRCClient.getUserModeParams} returns a list of user modes (modes that
- the user sets on themself, outside of channel modes) that require
- parameters when added and removed, respectively.
- """
- add, remove = map(sorted, self.client.getUserModeParams())
- self.assertEqual(add, [])
- self.assertEqual(remove, [])
- def _sendModeChange(self, msg, args='', target=None):
- """
- Build a MODE string and send it to the client.
- """
- if target is None:
- target = '#chan'
- message = ":Wolf!~wolf@yok.utu.fi MODE %s %s %s\r\n" % (
- target, msg, args)
- self.client.dataReceived(message)
- def _parseModeChange(self, results, target=None):
- """
- Parse the results, do some test and return the data to check.
- """
- if target is None:
- target = '#chan'
- for n, result in enumerate(results):
- method, data = result
- self.assertEqual(method, 'modeChanged')
- self.assertEqual(data['user'], 'Wolf!~wolf@yok.utu.fi')
- self.assertEqual(data['channel'], target)
- results[n] = tuple([data[key] for key in ('set', 'modes', 'args')])
- return results
- def _checkModeChange(self, expected, target=None):
- """
- Compare the expected result with the one returned by the client.
- """
- result = self._parseModeChange(self.client.calls, target)
- self.assertEqual(result, expected)
- self.client.calls = []
- def test_modeMissingDirection(self):
- """
- Mode strings that do not begin with a directional character, C{'+'} or
- C{'-'}, have C{'+'} automatically prepended.
- """
- self._sendModeChange('s')
- self._checkModeChange([(True, 's', (None,))])
- def test_noModeParameters(self):
- """
- No parameters are passed to L{IRCClient.modeChanged} for modes that
- don't take any parameters.
- """
- self._sendModeChange('-s')
- self._checkModeChange([(False, 's', (None,))])
- self._sendModeChange('+n')
- self._checkModeChange([(True, 'n', (None,))])
- def test_oneModeParameter(self):
- """
- Parameters are passed to L{IRCClient.modeChanged} for modes that take
- parameters.
- """
- self._sendModeChange('+o', 'a_user')
- self._checkModeChange([(True, 'o', ('a_user',))])
- self._sendModeChange('-o', 'a_user')
- self._checkModeChange([(False, 'o', ('a_user',))])
- def test_mixedModes(self):
- """
- Mixing adding and removing modes that do and don't take parameters
- invokes L{IRCClient.modeChanged} with mode characters and parameters
- that match up.
- """
- self._sendModeChange('+osv', 'a_user another_user')
- self._checkModeChange([(True, 'osv', ('a_user', None, 'another_user'))])
- self._sendModeChange('+v-os', 'a_user another_user')
- self._checkModeChange([(True, 'v', ('a_user',)),
- (False, 'os', ('another_user', None))])
- def test_tooManyModeParameters(self):
- """
- Passing an argument to modes that take no parameters results in
- L{IRCClient.modeChanged} not being called and an error being logged.
- """
- self._sendModeChange('+s', 'wrong')
- self._checkModeChange([])
- errors = self.flushLoggedErrors(irc.IRCBadModes)
- self.assertEqual(len(errors), 1)
- self.assertSubstring(
- 'Too many parameters', errors[0].getErrorMessage())
- def test_tooFewModeParameters(self):
- """
- Passing no arguments to modes that do take parameters results in
- L{IRCClient.modeChange} not being called and an error being logged.
- """
- self._sendModeChange('+o')
- self._checkModeChange([])
- errors = self.flushLoggedErrors(irc.IRCBadModes)
- self.assertEqual(len(errors), 1)
- self.assertSubstring(
- 'Not enough parameters', errors[0].getErrorMessage())
- def test_userMode(self):
- """
- A C{MODE} message whose target is our user (the nickname of our user,
- to be precise), as opposed to a channel, will be parsed according to
- the modes specified by L{IRCClient.getUserModeParams}.
- """
- target = self.client.nickname
- # Mode "o" on channels is supposed to take a parameter, but since this
- # is not a channel this will not cause an exception.
- self._sendModeChange('+o', target=target)
- self._checkModeChange([(True, 'o', (None,))], target=target)
- def getUserModeParams():
- return ['Z', '']
- # Introduce our own user mode that takes an argument.
- self.patch(self.client, 'getUserModeParams', getUserModeParams)
- self._sendModeChange('+Z', 'an_arg', target=target)
- self._checkModeChange([(True, 'Z', ('an_arg',))], target=target)
- def test_heartbeat(self):
- """
- When the I{RPL_WELCOME} message is received a heartbeat is started that
- will send a I{PING} message to the IRC server every
- L{irc.IRCClient.heartbeatInterval} seconds. When the transport is
- closed the heartbeat looping call is stopped too.
- """
- def _createHeartbeat():
- heartbeat = self._originalCreateHeartbeat()
- heartbeat.clock = self.clock
- return heartbeat
- self.clock = task.Clock()
- self._originalCreateHeartbeat = self.client._createHeartbeat
- self.patch(self.client, '_createHeartbeat', _createHeartbeat)
- self.assertIdentical(self.client._heartbeat, None)
- self.client.irc_RPL_WELCOME('foo', [])
- self.assertNotIdentical(self.client._heartbeat, None)
- self.assertEqual(self.client.hostname, 'foo')
- # Pump the clock enough to trigger one LoopingCall.
- self.assertEqualBufferValue(self.transport.value(), '')
- self.clock.advance(self.client.heartbeatInterval)
- self.assertEqualBufferValue(self.transport.value(), 'PING foo\r\n')
- # When the connection is lost the heartbeat is stopped.
- self.transport.loseConnection()
- self.client.connectionLost(None)
- self.assertEqual(
- len(self.clock.getDelayedCalls()), 0)
- self.assertIdentical(self.client._heartbeat, None)
- def test_heartbeatDisabled(self):
- """
- If L{irc.IRCClient.heartbeatInterval} is set to L{None} then no
- heartbeat is created.
- """
- self.assertIdentical(self.client._heartbeat, None)
- self.client.heartbeatInterval = None
- self.client.irc_RPL_WELCOME('foo', [])
- self.assertIdentical(self.client._heartbeat, None)
- class BasicServerFunctionalityTests(IRCTestCase):
- def setUp(self):
- self.f = StringIOWithoutClosing()
- self.t = protocol.FileWrapper(self.f)
- self.p = irc.IRC()
- self.p.makeConnection(self.t)
- def check(self, s):
- """
- Make sure that the internal buffer equals a specified value.
- @param s: the value to compare against buffer
- @type s: L{bytes} or L{unicode}
- """
- bufferValue = self.f.getvalue()
- if isinstance(s, unicode):
- bufferValue = bufferValue.decode("utf-8")
- self.assertEqual(bufferValue, s)
- def test_sendMessage(self):
- """
- Passing a command and parameters to L{IRC.sendMessage} results in a
- query string that consists of the command and parameters, separated by
- a space, ending with '\r\n'.
- """
- self.p.sendMessage('CMD', 'param1', 'param2')
- self.check('CMD param1 param2\r\n')
- def test_sendCommand(self):
- """
- Passing a command and parameters to L{IRC.sendCommand} results in a
- query string that consists of the command and parameters, separated by
- a space, ending with '\r\n'.
- The format is described in more detail in
- U{RFC 1459 <https://tools.ietf.org/html/rfc1459.html#section-2.3>}.
- """
- self.p.sendCommand(u"CMD", (u"param1", u"param2"))
- self.check("CMD param1 param2\r\n")
- def test_sendUnicodeCommand(self):
- """
- Passing unicode parameters to L{IRC.sendCommand} encodes the parameters
- in UTF-8.
- """
- self.p.sendCommand(u"CMD", (u"param\u00b9", u"param\u00b2"))
- self.check(b"CMD param\xc2\xb9 param\xc2\xb2\r\n")
- def test_sendMessageNoCommand(self):
- """
- Passing L{None} as the command to L{IRC.sendMessage} raises a
- C{ValueError}.
- """
- error = self.assertRaises(ValueError, self.p.sendMessage, None,
- 'param1', 'param2')
- self.assertEqual(str(error), "IRC message requires a command.")
- def test_sendCommandNoCommand(self):
- """
- Passing L{None} as the command to L{IRC.sendCommand} raises a
- C{ValueError}.
- """
- error = self.assertRaises(ValueError, self.p.sendCommand, None,
- (u"param1", u"param2"))
- self.assertEqual(error.args[0], "IRC message requires a command.")
- def test_sendMessageInvalidCommand(self):
- """
- Passing an invalid string command to L{IRC.sendMessage} raises a
- C{ValueError}.
- """
- error = self.assertRaises(ValueError, self.p.sendMessage, ' ',
- 'param1', 'param2')
- self.assertEqual(str(error),
- "Somebody screwed up, 'cuz this doesn't look like a command to "
- "me: ")
- def test_sendCommandInvalidCommand(self):
- """
- Passing an invalid string command to L{IRC.sendCommand} raises a
- C{ValueError}.
- """
- error = self.assertRaises(ValueError, self.p.sendCommand, u" ",
- (u"param1", u"param2"))
- self.assertEqual(error.args[0], 'Invalid command: " "')
- def test_sendCommandWithPrefix(self):
- """
- Passing a command and parameters with a specified prefix to
- L{IRC.sendCommand} results in a proper query string including the
- specified line prefix.
- """
- self.p.sendCommand(u"CMD", (u"param1", u"param2"), u"irc.example.com")
- self.check(b":irc.example.com CMD param1 param2\r\n")
- def test_sendCommandWithTags(self):
- """
- Passing a command and parameters with a specified prefix and tags
- to L{IRC.sendCommand} results in a proper query string including the
- specified line prefix and appropriate tags syntax. The query string
- should be output as follows:
- @tags :prefix COMMAND param1 param2\r\n
- The tags are a string of IRCv3 tags, preceded by '@'. The rest
- of the string is as described in test_sendMessage. For more on
- the message tag format, see U{the IRCv3 specification
- <https://ircv3.net/specs/core/message-tags-3.2.html>}.
- """
- sendTags = {
- u"aaa": u"bbb",
- u"ccc": None,
- u"example.com/ddd": u"eee"
- }
- expectedTags = (b"aaa=bbb", b"ccc", b"example.com/ddd=eee")
- self.p.sendCommand(u"CMD", (u"param1", u"param2"), u"irc.example.com",
- sendTags)
- outMsg = self.f.getvalue()
- outTagStr, outLine = outMsg.split(b' ', 1)
- # We pull off the leading '@' sign so that the split tags can be
- # compared with what we expect.
- outTags = outTagStr[1:].split(b';')
- self.assertEqual(outLine, b":irc.example.com CMD param1 param2\r\n")
- self.assertEqual(sorted(expectedTags), sorted(outTags))
- def test_sendCommandValidateEmptyTags(self):
- """
- Passing empty tag names to L{IRC.sendCommand} raises a C{ValueError}.
- """
- sendTags = {
- u"aaa": u"bbb",
- u"ccc": None,
- u"": u""
- }
- error = self.assertRaises(ValueError, self.p.sendCommand, u"CMD",
- (u"param1", u"param2"), u"irc.example.com", sendTags)
- self.assertEqual(error.args[0], "A tag name is required.")
- def test_sendCommandValidateNoneTags(self):
- """
- Passing None as a tag name to L{IRC.sendCommand} raises a
- C{ValueError}.
- """
- sendTags = {
- u"aaa": u"bbb",
- u"ccc": None,
- None: u"beep"
- }
- error = self.assertRaises(ValueError, self.p.sendCommand, u"CMD",
- (u"param1", u"param2"), u"irc.example.com", sendTags)
- self.assertEqual(error.args[0], "A tag name is required.")
- def test_sendCommandValidateTagsWithSpaces(self):
- """
- Passing a tag name containing spaces to L{IRC.sendCommand} raises a
- C{ValueError}.
- """
- sendTags = {
- u"aaa bbb": u"ccc"
- }
- error = self.assertRaises(ValueError, self.p.sendCommand, u"CMD",
- (u"param1", u"param2"), u"irc.example.com", sendTags)
- self.assertEqual(error.args[0], "Tag contains invalid characters.")
- def test_sendCommandValidateTagsWithInvalidChars(self):
- """
- Passing a tag name containing invalid characters to L{IRC.sendCommand}
- raises a C{ValueError}.
- """
- sendTags = {
- u"aaa_b^@": u"ccc"
- }
- error = self.assertRaises(ValueError, self.p.sendCommand, u"CMD",
- (u"param1", u"param2"), u"irc.example.com", sendTags)
- self.assertEqual(error.args[0], "Tag contains invalid characters.")
- def test_sendCommandValidateTagValueEscaping(self):
- """
- Tags with values containing invalid characters passed to
- L{IRC.sendCommand} are escaped.
- """
- sendTags = {
- u"aaa": u"bbb",
- u"ccc": u"test\r\n \\;;"
- }
- expectedTags = (b"aaa=bbb", b"ccc=test\\r\\n\\s\\\\\\:\\:")
- self.p.sendCommand(u"CMD", (u"param1", u"param2"), u"irc.example.com",
- sendTags)
- outMsg = self.f.getvalue()
- outTagStr, outLine = outMsg.split(b" ", 1)
- # We pull off the leading '@' sign so that the split tags can be
- # compared with what we expect.
- outTags = outTagStr[1:].split(b";")
- self.assertEqual(sorted(outTags), sorted(expectedTags))
- def testPrivmsg(self):
- self.p.privmsg("this-is-sender", "this-is-recip", "this is message")
- self.check(":this-is-sender PRIVMSG this-is-recip :this is message\r\n")
- def testNotice(self):
- self.p.notice("this-is-sender", "this-is-recip", "this is notice")
- self.check(":this-is-sender NOTICE this-is-recip :this is notice\r\n")
- def testAction(self):
- self.p.action("this-is-sender", "this-is-recip", "this is action")
- self.check(":this-is-sender ACTION this-is-recip :this is action\r\n")
- def testJoin(self):
- self.p.join("this-person", "#this-channel")
- self.check(":this-person JOIN #this-channel\r\n")
- def testPart(self):
- self.p.part("this-person", "#that-channel")
- self.check(":this-person PART #that-channel\r\n")
- def testWhois(self):
- """
- Verify that a whois by the client receives the right protocol actions
- from the server.
- """
- timestamp = int(time.time()-100)
- hostname = self.p.hostname
- req = 'requesting-nick'
- targ = 'target-nick'
- self.p.whois(req, targ, 'target', 'host.com',
- 'Target User', 'irc.host.com', 'A fake server', False,
- 12, timestamp, ['#fakeusers', '#fakemisc'])
- expected = '\r\n'.join([
- ':%(hostname)s 311 %(req)s %(targ)s target host.com * :Target User',
- ':%(hostname)s 312 %(req)s %(targ)s irc.host.com :A fake server',
- ':%(hostname)s 317 %(req)s %(targ)s 12 %(timestamp)s :seconds idle, signon time',
- ':%(hostname)s 319 %(req)s %(targ)s :#fakeusers #fakemisc',
- ':%(hostname)s 318 %(req)s %(targ)s :End of WHOIS list.',
- '']) % dict(hostname=hostname, timestamp=timestamp, req=req, targ=targ)
- self.check(expected)
- class DummyClient(irc.IRCClient):
- """
- A L{twisted.words.protocols.irc.IRCClient} that stores sent lines in a
- C{list} rather than transmitting them.
- """
- def __init__(self):
- self.lines = []
- def connectionMade(self):
- irc.IRCClient.connectionMade(self)
- self.lines = []
- def _truncateLine(self, line):
- """
- Truncate an IRC line to the maximum allowed length.
- """
- return line[:irc.MAX_COMMAND_LENGTH - len(self.delimiter)]
- def lineReceived(self, line):
- # Emulate IRC servers throwing away our important data.
- line = self._truncateLine(line)
- return irc.IRCClient.lineReceived(self, line)
- def sendLine(self, m):
- self.lines.append(self._truncateLine(m))
- class ClientInviteTests(IRCTestCase):
- """
- Tests for L{IRCClient.invite}.
- """
- def setUp(self):
- """
- Create a L{DummyClient} to call C{invite} on in test methods.
- """
- self.client = DummyClient()
- def test_channelCorrection(self):
- """
- If the channel name passed to L{IRCClient.invite} does not begin with a
- channel prefix character, one is prepended to it.
- """
- self.client.invite('foo', 'bar')
- self.assertEqual(self.client.lines, ['INVITE foo #bar'])
- def test_invite(self):
- """
- L{IRCClient.invite} sends an I{INVITE} message with the specified
- username and a channel.
- """
- self.client.invite('foo', '#bar')
- self.assertEqual(self.client.lines, ['INVITE foo #bar'])
- class ClientMsgTests(IRCTestCase):
- """
- Tests for messages sent with L{twisted.words.protocols.irc.IRCClient}.
- """
- def setUp(self):
- self.client = DummyClient()
- self.client.connectionMade()
- def test_singleLine(self):
- """
- A message containing no newlines is sent in a single command.
- """
- self.client.msg('foo', 'bar')
- self.assertEqual(self.client.lines, ['PRIVMSG foo :bar'])
- def test_invalidMaxLength(self):
- """
- Specifying a C{length} value to L{IRCClient.msg} that is too short to
- contain the protocol command to send a message raises C{ValueError}.
- """
- self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 0)
- self.assertRaises(ValueError, self.client.msg, 'foo', 'bar', 3)
- def test_multipleLine(self):
- """
- Messages longer than the C{length} parameter to L{IRCClient.msg} will
- be split and sent in multiple commands.
- """
- maxLen = len('PRIVMSG foo :') + 3 + 2 # 2 for line endings
- self.client.msg('foo', 'barbazbo', maxLen)
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :bar',
- 'PRIVMSG foo :baz',
- 'PRIVMSG foo :bo'])
- def test_sufficientWidth(self):
- """
- Messages exactly equal in length to the C{length} parameter to
- L{IRCClient.msg} are sent in a single command.
- """
- msg = 'barbazbo'
- maxLen = len('PRIVMSG foo :%s' % (msg,)) + 2
- self.client.msg('foo', msg, maxLen)
- self.assertEqual(self.client.lines, ['PRIVMSG foo :%s' % (msg,)])
- self.client.lines = []
- self.client.msg('foo', msg, maxLen-1)
- self.assertEqual(2, len(self.client.lines))
- self.client.lines = []
- self.client.msg('foo', msg, maxLen+1)
- self.assertEqual(1, len(self.client.lines))
- def test_newlinesAtStart(self):
- """
- An LF at the beginning of the message is ignored.
- """
- self.client.lines = []
- self.client.msg('foo', '\nbar')
- self.assertEqual(self.client.lines, ['PRIVMSG foo :bar'])
- def test_newlinesAtEnd(self):
- """
- An LF at the end of the message is ignored.
- """
- self.client.lines = []
- self.client.msg('foo', 'bar\n')
- self.assertEqual(self.client.lines, ['PRIVMSG foo :bar'])
- def test_newlinesWithinMessage(self):
- """
- An LF within a message causes a new line.
- """
- self.client.lines = []
- self.client.msg('foo', 'bar\nbaz')
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :bar',
- 'PRIVMSG foo :baz'])
- def test_consecutiveNewlines(self):
- """
- Consecutive LFs do not cause a blank line.
- """
- self.client.lines = []
- self.client.msg('foo', 'bar\n\nbaz')
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :bar',
- 'PRIVMSG foo :baz'])
- def assertLongMessageSplitting(self, message, expectedNumCommands,
- length=None):
- """
- Assert that messages sent by L{IRCClient.msg} are split into an
- expected number of commands and the original message is transmitted in
- its entirety over those commands.
- """
- responsePrefix = ':%s!%s@%s ' % (
- self.client.nickname,
- self.client.realname,
- self.client.hostname)
- self.client.msg('foo', message, length=length)
- privmsg = []
- self.patch(self.client, 'privmsg', lambda *a: privmsg.append(a))
- # Deliver these to IRCClient via the normal mechanisms.
- for line in self.client.lines:
- self.client.lineReceived(responsePrefix + line)
- self.assertEqual(len(privmsg), expectedNumCommands)
- receivedMessage = ''.join(
- message for user, target, message in privmsg)
- # Did the long message we sent arrive as intended?
- self.assertEqual(message, receivedMessage)
- def test_splitLongMessagesWithDefault(self):
- """
- If a maximum message length is not provided to L{IRCClient.msg} a
- best-guess effort is made to determine a safe maximum, messages longer
- than this are split into multiple commands with the intent of
- delivering long messages without losing data due to message truncation
- when the server relays them.
- """
- message = 'o' * (irc.MAX_COMMAND_LENGTH - 2)
- self.assertLongMessageSplitting(message, 2)
- def test_splitLongMessagesWithOverride(self):
- """
- The maximum message length can be specified to L{IRCClient.msg},
- messages longer than this are split into multiple commands with the
- intent of delivering long messages without losing data due to message
- truncation when the server relays them.
- """
- message = 'o' * (irc.MAX_COMMAND_LENGTH - 2)
- self.assertLongMessageSplitting(
- message, 3, length=irc.MAX_COMMAND_LENGTH // 2)
- def test_newlinesBeforeLineBreaking(self):
- """
- IRCClient breaks on newlines before it breaks long lines.
- """
- # Because MAX_COMMAND_LENGTH includes framing characters, this long
- # line is slightly longer than half the permissible message size.
- longline = 'o' * (irc.MAX_COMMAND_LENGTH // 2)
- self.client.msg('foo', longline + '\n' + longline)
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :' + longline,
- 'PRIVMSG foo :' + longline])
- def test_lineBreakOnWordBoundaries(self):
- """
- IRCClient prefers to break long lines at word boundaries.
- """
- # Because MAX_COMMAND_LENGTH includes framing characters, this long
- # line is slightly longer than half the permissible message size.
- longline = 'o' * (irc.MAX_COMMAND_LENGTH // 2)
- self.client.msg('foo', longline + ' ' + longline)
- self.assertEqual(
- self.client.lines,
- ['PRIVMSG foo :' + longline,
- 'PRIVMSG foo :' + longline])
- def test_splitSanity(self):
- """
- L{twisted.words.protocols.irc.split} raises C{ValueError} if given a
- length less than or equal to C{0} and returns C{[]} when splitting
- C{''}.
- """
- # Whiteboxing
- self.assertRaises(ValueError, irc.split, 'foo', -1)
- self.assertRaises(ValueError, irc.split, 'foo', 0)
- self.assertEqual([], irc.split('', 1))
- self.assertEqual([], irc.split(''))
- def test_splitDelimiters(self):
- """
- L{twisted.words.protocols.irc.split} skips any delimiter (space or
- newline) that it finds at the very beginning of the string segment it
- is operating on. Nothing should be added to the output list because of
- it.
- """
- r = irc.split("xx yyz", 2)
- self.assertEqual(['xx', 'yy', 'z'], r)
- r = irc.split("xx\nyyz", 2)
- self.assertEqual(['xx', 'yy', 'z'], r)
- def test_splitValidatesLength(self):
- """
- L{twisted.words.protocols.irc.split} raises C{ValueError} if given a
- length less than or equal to C{0}.
- """
- self.assertRaises(ValueError, irc.split, "foo", 0)
- self.assertRaises(ValueError, irc.split, "foo", -1)
- def test_say(self):
- """
- L{IRCClient.say} prepends the channel prefix C{"#"} if necessary and
- then sends the message to the server for delivery to that channel.
- """
- self.client.say("thechannel", "the message")
- self.assertEqual(
- self.client.lines, ["PRIVMSG #thechannel :the message"])
- class ClientTests(IRCTestCase):
- """
- Tests for the protocol-level behavior of IRCClient methods intended to
- be called by application code.
- """
- def setUp(self):
- """
- Create and connect a new L{IRCClient} to a new L{StringTransport}.
- """
- self.transport = StringTransport()
- self.protocol = IRCClient()
- self.protocol.performLogin = False
- self.protocol.makeConnection(self.transport)
- # Sanity check - we don't want anything to have happened at this
- # point, since we're not in a test yet.
- self.assertEqualBufferValue(self.transport.value(), "")
- self.addCleanup(self.transport.loseConnection)
- self.addCleanup(self.protocol.connectionLost, None)
- def getLastLine(self, transport):
- """
- Return the last IRC message in the transport buffer.
- """
- line = transport.value()
- if bytes != str and isinstance(line, bytes):
- line = line.decode("utf-8")
- return line.split('\r\n')[-2]
- def test_away(self):
- """
- L{IRCClient.away} sends an AWAY command with the specified message.
- """
- message = "Sorry, I'm not here."
- self.protocol.away(message)
- expected = [
- 'AWAY :%s' % (message,),
- '',
- ]
- self.assertEqualBufferValue(self.transport.value().split(b'\r\n'), expected)
- def test_back(self):
- """
- L{IRCClient.back} sends an AWAY command with an empty message.
- """
- self.protocol.back()
- expected = [
- 'AWAY :',
- '',
- ]
- self.assertEqualBufferValue(self.transport.value().split(b'\r\n'), expected)
- def test_whois(self):
- """
- L{IRCClient.whois} sends a WHOIS message.
- """
- self.protocol.whois('alice')
- self.assertEqualBufferValue(
- self.transport.value().split(b'\r\n'),
- ['WHOIS alice', ''])
- def test_whoisWithServer(self):
- """
- L{IRCClient.whois} sends a WHOIS message with a server name if a
- value is passed for the C{server} parameter.
- """
- self.protocol.whois('alice', 'example.org')
- self.assertEqualBufferValue(
- self.transport.value().split(b'\r\n'),
- ['WHOIS example.org alice', ''])
- def test_register(self):
- """
- L{IRCClient.register} sends NICK and USER commands with the
- username, name, hostname, server name, and real name specified.
- """
- username = 'testuser'
- hostname = 'testhost'
- servername = 'testserver'
- self.protocol.realname = 'testname'
- self.protocol.password = None
- self.protocol.register(username, hostname, servername)
- expected = [
- 'NICK %s' % (username,),
- 'USER %s %s %s :%s' % (
- username, hostname, servername, self.protocol.realname),
- '']
- self.assertEqualBufferValue(self.transport.value().split(b'\r\n'), expected)
- def test_registerWithPassword(self):
- """
- If the C{password} attribute of L{IRCClient} is not L{None}, the
- C{register} method also sends a PASS command with it as the
- argument.
- """
- username = 'testuser'
- hostname = 'testhost'
- servername = 'testserver'
- self.protocol.realname = 'testname'
- self.protocol.password = 'testpass'
- self.protocol.register(username, hostname, servername)
- expected = [
- 'PASS %s' % (self.protocol.password,),
- 'NICK %s' % (username,),
- 'USER %s %s %s :%s' % (
- username, hostname, servername, self.protocol.realname),
- '']
- self.assertEqualBufferValue(self.transport.value().split(b'\r\n'), expected)
- def test_registerWithTakenNick(self):
- """
- Verify that the client repeats the L{IRCClient.setNick} method with a
- new value when presented with an C{ERR_NICKNAMEINUSE} while trying to
- register.
- """
- username = 'testuser'
- hostname = 'testhost'
- servername = 'testserver'
- self.protocol.realname = 'testname'
- self.protocol.password = 'testpass'
- self.protocol.register(username, hostname, servername)
- self.protocol.irc_ERR_NICKNAMEINUSE('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertNotEqual(lastLine, 'NICK %s' % (username,))
- # Keep chaining underscores for each collision
- self.protocol.irc_ERR_NICKNAMEINUSE('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertEqual(lastLine, 'NICK %s' % (username + '__',))
- def test_overrideAlterCollidedNick(self):
- """
- L{IRCClient.alterCollidedNick} determines how a nickname is altered upon
- collision while a user is trying to change to that nickname.
- """
- nick = 'foo'
- self.protocol.alterCollidedNick = lambda nick: nick + '***'
- self.protocol.register(nick)
- self.protocol.irc_ERR_NICKNAMEINUSE('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertEqual(
- lastLine, 'NICK %s' % (nick + '***',))
- def test_nickChange(self):
- """
- When a NICK command is sent after signon, C{IRCClient.nickname} is set
- to the new nickname I{after} the server sends an acknowledgement.
- """
- oldnick = 'foo'
- newnick = 'bar'
- self.protocol.register(oldnick)
- self.protocol.irc_RPL_WELCOME('prefix', ['param'])
- self.protocol.setNick(newnick)
- self.assertEqual(self.protocol.nickname, oldnick)
- self.protocol.irc_NICK('%s!quux@qux' % (oldnick,), [newnick])
- self.assertEqual(self.protocol.nickname, newnick)
- def test_erroneousNick(self):
- """
- Trying to register an illegal nickname results in the default legal
- nickname being set, and trying to change a nickname to an illegal
- nickname results in the old nickname being kept.
- """
- # Registration case: change illegal nickname to erroneousNickFallback
- badnick = 'foo'
- self.assertEqual(self.protocol._registered, False)
- self.protocol.register(badnick)
- self.protocol.irc_ERR_ERRONEUSNICKNAME('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertEqual(
- lastLine, 'NICK %s' % (self.protocol.erroneousNickFallback,))
- self.protocol.irc_RPL_WELCOME('prefix', ['param'])
- self.assertEqual(self.protocol._registered, True)
- self.protocol.setNick(self.protocol.erroneousNickFallback)
- self.assertEqual(
- self.protocol.nickname, self.protocol.erroneousNickFallback)
- # Illegal nick change attempt after registration. Fall back to the old
- # nickname instead of erroneousNickFallback.
- oldnick = self.protocol.nickname
- self.protocol.setNick(badnick)
- self.protocol.irc_ERR_ERRONEUSNICKNAME('prefix', ['param'])
- lastLine = self.getLastLine(self.transport)
- self.assertEqual(
- lastLine, 'NICK %s' % (badnick,))
- self.assertEqual(self.protocol.nickname, oldnick)
- def test_describe(self):
- """
- L{IRCClient.desrcibe} sends a CTCP ACTION message to the target
- specified.
- """
- target = 'foo'
- channel = '#bar'
- action = 'waves'
- self.protocol.describe(target, action)
- self.protocol.describe(channel, action)
- expected = [
- 'PRIVMSG %s :\01ACTION %s\01' % (target, action),
- 'PRIVMSG %s :\01ACTION %s\01' % (channel, action),
- '']
- self.assertEqualBufferValue(self.transport.value().split(b'\r\n'), expected)
- def test_noticedDoesntPrivmsg(self):
- """
- The default implementation of L{IRCClient.noticed} doesn't invoke
- C{privmsg()}
- """
- def privmsg(user, channel, message):
- self.fail("privmsg() should not have been called")
- self.protocol.privmsg = privmsg
- self.protocol.irc_NOTICE(
- 'spam', ['#greasyspooncafe', "I don't want any spam!"])
- def test_ping(self):
- """
- L{IRCClient.ping}
- """
- # Ping a user with no message
- self.protocol.ping("otheruser")
- self.assertTrue(
- self.transport.value().startswith(b'PRIVMSG otheruser :\x01PING'))
- self.transport.clear()
- # Ping a user with a message
- self.protocol.ping("otheruser", "are you there")
- self.assertEqual(self.transport.value(),
- b'PRIVMSG otheruser :\x01PING are you there\x01\r\n')
- self.transport.clear()
- # Create a lot of pings, more than MAX_PINGRING
- self.protocol._pings = {}
- for pingNum in range(self.protocol._MAX_PINGRING + 3):
- self.protocol._pings[("otheruser"), (str(pingNum))] = (
- time.time() + pingNum)
- self.assertEqual(len(self.protocol._pings), self.protocol._MAX_PINGRING + 3)
- # Ping a user
- self.protocol.ping("otheruser", "I sent a lot of pings")
- # The excess pings should have been purged
- self.assertEqual(len(self.protocol._pings), self.protocol._MAX_PINGRING)
- self.assertEqual(self.transport.value(),
- b'PRIVMSG otheruser :\x01PING I sent a lot of pings\x01\r\n')
- class CollectorClient(irc.IRCClient):
- """
- A client that saves in a list the names of the methods that got called.
- """
- def __init__(self, methodsList):
- """
- @param methodsList: list of methods' names that should be replaced.
- @type methodsList: C{list}
- """
- self.methods = []
- self.nickname = 'Wolf'
- for method in methodsList:
- def fake_method(method=method):
- """
- Collects C{method}s.
- """
- def inner(*args):
- self.methods.append((method, args))
- return inner
- setattr(self, method, fake_method())
- class DccTests(IRCTestCase):
- """
- Tests for C{dcc_*} methods.
- """
- def setUp(self):
- methods = ['dccDoSend', 'dccDoAcceptResume', 'dccDoResume',
- 'dccDoChat']
- self.user = 'Wolf!~wolf@yok.utu.fi'
- self.channel = '#twisted'
- self.client = CollectorClient(methods)
- def test_dccSend(self):
- """
- L{irc.IRCClient.dcc_SEND} invokes L{irc.IRCClient.dccDoSend}.
- """
- self.client.dcc_SEND(self.user, self.channel, 'foo.txt 127.0.0.1 1025')
- self.assertEqual(self.client.methods,
- [('dccDoSend', (self.user, '127.0.0.1', 1025, 'foo.txt', -1,
- ['foo.txt', '127.0.0.1', '1025']))])
- def test_dccSendNotImplemented(self):
- """
- L{irc.IRCClient.dccDoSend} is raises C{NotImplementedError}
- """
- client = irc.IRCClient()
- self.assertRaises(NotImplementedError,
- client.dccSend, 'username', None)
- def test_dccSendMalformedRequest(self):
- """
- L{irc.IRCClient.dcc_SEND} raises L{irc.IRCBadMessage} when it is passed
- a malformed query string.
- """
- result = self.assertRaises(irc.IRCBadMessage, self.client.dcc_SEND,
- self.user, self.channel, 'foo')
- self.assertEqual(str(result), "malformed DCC SEND request: ['foo']")
- def test_dccSendIndecipherableAddress(self):
- """
- L{irc.IRCClient.dcc_SEND} raises L{irc.IRCBadMessage} when it is passed
- a query string that doesn't contain a valid address.
- """
- result = self.assertRaises(irc.IRCBadMessage, self.client.dcc_SEND,
- self.user, self.channel, 'foo.txt #23 sd@d')
- self.assertEqual(str(result), "Indecipherable address '#23'")
- def test_dccSendIndecipherablePort(self):
- """
- L{irc.IRCClient.dcc_SEND} raises L{irc.IRCBadMessage} when it is passed
- a query string that doesn't contain a valid port number.
- """
- result = self.assertRaises(irc.IRCBadMessage, self.client.dcc_SEND,
- self.user, self.channel, 'foo.txt 127.0.0.1 sd@d')
- self.assertEqual(str(result), "Indecipherable port 'sd@d'")
- def test_dccAccept(self):
- """
- L{irc.IRCClient.dcc_ACCEPT} invokes L{irc.IRCClient.dccDoAcceptResume}.
- """
- self.client.dcc_ACCEPT(self.user, self.channel, 'foo.txt 1025 2')
- self.assertEqual(self.client.methods,
- [('dccDoAcceptResume', (self.user, 'foo.txt', 1025, 2))])
- def test_dccAcceptMalformedRequest(self):
- """
- L{irc.IRCClient.dcc_ACCEPT} raises L{irc.IRCBadMessage} when it is
- passed a malformed query string.
- """
- result = self.assertRaises(irc.IRCBadMessage, self.client.dcc_ACCEPT,
- self.user, self.channel, 'foo')
- self.assertEqual(str(result),
- "malformed DCC SEND ACCEPT request: ['foo']")
- def test_dccResume(self):
- """
- L{irc.IRCClient.dcc_RESUME} invokes L{irc.IRCClient.dccDoResume}.
- """
- self.client.dcc_RESUME(self.user, self.channel, 'foo.txt 1025 2')
- self.assertEqual(self.client.methods,
- [('dccDoResume', (self.user, 'foo.txt', 1025, 2))])
- def test_dccResumeMalformedRequest(self):
- """
- L{irc.IRCClient.dcc_RESUME} raises L{irc.IRCBadMessage} when it is
- passed a malformed query string.
- """
- result = self.assertRaises(irc.IRCBadMessage, self.client.dcc_RESUME,
- self.user, self.channel, 'foo')
- self.assertEqual(str(result),
- "malformed DCC SEND RESUME request: ['foo']")
- def test_dccChat(self):
- """
- L{irc.IRCClient.dcc_CHAT} invokes L{irc.IRCClient.dccDoChat}.
- """
- self.client.dcc_CHAT(self.user, self.channel, 'foo.txt 127.0.0.1 1025')
- self.assertEqual(self.client.methods,
- [('dccDoChat', (self.user, self.channel, '127.0.0.1', 1025,
- ['foo.txt', '127.0.0.1', '1025']))])
- def test_dccChatMalformedRequest(self):
- """
- L{irc.IRCClient.dcc_CHAT} raises L{irc.IRCBadMessage} when it is
- passed a malformed query string.
- """
- result = self.assertRaises(irc.IRCBadMessage, self.client.dcc_CHAT,
- self.user, self.channel, 'foo')
- self.assertEqual(str(result),
- "malformed DCC CHAT request: ['foo']")
- def test_dccChatIndecipherablePort(self):
- """
- L{irc.IRCClient.dcc_CHAT} raises L{irc.IRCBadMessage} when it is passed
- a query string that doesn't contain a valid port number.
- """
- result = self.assertRaises(irc.IRCBadMessage, self.client.dcc_CHAT,
- self.user, self.channel, 'foo.txt 127.0.0.1 sd@d')
- self.assertEqual(str(result), "Indecipherable port 'sd@d'")
- class ServerToClientTests(IRCTestCase):
- """
- Tests for the C{irc_*} methods sent from the server to the client.
- """
- def setUp(self):
- self.user = 'Wolf!~wolf@yok.utu.fi'
- self.channel = '#twisted'
- methods = ['joined', 'userJoined', 'left', 'userLeft', 'userQuit',
- 'noticed', 'kickedFrom', 'userKicked', 'topicUpdated']
- self.client = CollectorClient(methods)
- def test_irc_JOIN(self):
- """
- L{IRCClient.joined} is called when I join a channel;
- L{IRCClient.userJoined} is called when someone else joins.
- """
- self.client.irc_JOIN(self.user, [self.channel])
- self.client.irc_JOIN('Svadilfari!~svadi@yok.utu.fi', ['#python'])
- self.assertEqual(self.client.methods,
- [('joined', (self.channel,)),
- ('userJoined', ('Svadilfari', '#python'))])
- def test_irc_PART(self):
- """
- L{IRCClient.left} is called when I part the channel;
- L{IRCClient.userLeft} is called when someone else parts.
- """
- self.client.irc_PART(self.user, [self.channel])
- self.client.irc_PART('Svadilfari!~svadi@yok.utu.fi', ['#python'])
- self.assertEqual(self.client.methods,
- [('left', (self.channel,)),
- ('userLeft', ('Svadilfari', '#python'))])
- def test_irc_QUIT(self):
- """
- L{IRCClient.userQuit} is called whenever someone quits
- the channel (myself included).
- """
- self.client.irc_QUIT('Svadilfari!~svadi@yok.utu.fi', ['Adios.'])
- self.client.irc_QUIT(self.user, ['Farewell.'])
- self.assertEqual(self.client.methods,
- [('userQuit', ('Svadilfari', 'Adios.')),
- ('userQuit', ('Wolf', 'Farewell.'))])
- def test_irc_NOTICE(self):
- """
- L{IRCClient.noticed} is called when a notice is received.
- """
- msg = ('%(X)cextended%(X)cdata1%(X)cextended%(X)cdata2%(X)c%(EOL)s' %
- {'X': irc.X_DELIM, 'EOL': irc.CR + irc.LF})
- self.client.irc_NOTICE(self.user, [self.channel, msg])
- self.assertEqual(self.client.methods,
- [('noticed', (self.user, '#twisted', 'data1 data2'))])
- def test_irc_KICK(self):
- """
- L{IRCClient.kickedFrom} is called when I get kicked from the channel;
- L{IRCClient.userKicked} is called when someone else gets kicked.
- """
- # Fight!
- self.client.irc_KICK('Svadilfari!~svadi@yok.utu.fi',
- ['#python', 'WOLF', 'shoryuken!'])
- self.client.irc_KICK(self.user,
- [self.channel, 'Svadilfari', 'hadouken!'])
- self.assertEqual(self.client.methods,
- [('kickedFrom',
- ('#python', 'Svadilfari', 'shoryuken!')),
- ('userKicked',
- ('Svadilfari', self.channel, 'Wolf', 'hadouken!'))])
- def test_irc_TOPIC(self):
- """
- L{IRCClient.topicUpdated} is called when someone sets the topic.
- """
- self.client.irc_TOPIC(self.user,
- [self.channel, 'new topic is new'])
- self.assertEqual(self.client.methods,
- [('topicUpdated',
- ('Wolf', self.channel, 'new topic is new'))])
- def test_irc_RPL_TOPIC(self):
- """
- L{IRCClient.topicUpdated} is called when the topic is initially
- reported.
- """
- self.client.irc_RPL_TOPIC(self.user,
- ['?', self.channel, 'new topic is new'])
- self.assertEqual(self.client.methods,
- [('topicUpdated',
- ('Wolf', self.channel, 'new topic is new'))])
- def test_irc_RPL_NOTOPIC(self):
- """
- L{IRCClient.topicUpdated} is called when the topic is removed.
- """
- self.client.irc_RPL_NOTOPIC(self.user, ['?', self.channel])
- self.assertEqual(self.client.methods,
- [('topicUpdated', ('Wolf', self.channel, ''))])
- class CTCPQueryTests(IRCTestCase):
- """
- Tests for the C{ctcpQuery_*} methods.
- """
- def setUp(self):
- self.user = 'Wolf!~wolf@yok.utu.fi'
- self.channel = '#twisted'
- self.client = CollectorClient(['ctcpMakeReply'])
- def test_ctcpQuery_PING(self):
- """
- L{IRCClient.ctcpQuery_PING} calls L{IRCClient.ctcpMakeReply} with the
- correct args.
- """
- self.client.ctcpQuery_PING(self.user, self.channel, 'data')
- self.assertEqual(self.client.methods,
- [('ctcpMakeReply', ('Wolf', [('PING', 'data')]))])
- def test_ctcpQuery_FINGER(self):
- """
- L{IRCClient.ctcpQuery_FINGER} calls L{IRCClient.ctcpMakeReply} with the
- correct args.
- """
- self.client.fingerReply = 'reply'
- self.client.ctcpQuery_FINGER(self.user, self.channel, 'data')
- self.assertEqual(self.client.methods,
- [('ctcpMakeReply', ('Wolf', [('FINGER', 'reply')]))])
- def test_ctcpQuery_SOURCE(self):
- """
- L{IRCClient.ctcpQuery_SOURCE} calls L{IRCClient.ctcpMakeReply} with the
- correct args.
- """
- self.client.sourceURL = 'url'
- self.client.ctcpQuery_SOURCE(self.user, self.channel, 'data')
- self.assertEqual(self.client.methods,
- [('ctcpMakeReply', ('Wolf', [('SOURCE', 'url'),
- ('SOURCE', None)]))])
- def test_ctcpQuery_USERINFO(self):
- """
- L{IRCClient.ctcpQuery_USERINFO} calls L{IRCClient.ctcpMakeReply} with
- the correct args.
- """
- self.client.userinfo = 'info'
- self.client.ctcpQuery_USERINFO(self.user, self.channel, 'data')
- self.assertEqual(self.client.methods,
- [('ctcpMakeReply', ('Wolf', [('USERINFO', 'info')]))])
- def test_ctcpQuery_CLIENTINFO(self):
- """
- L{IRCClient.ctcpQuery_CLIENTINFO} calls L{IRCClient.ctcpMakeReply} with
- the correct args.
- """
- self.client.ctcpQuery_CLIENTINFO(self.user, self.channel, '')
- self.client.ctcpQuery_CLIENTINFO(self.user, self.channel, 'PING PONG')
- info = ('ACTION CLIENTINFO DCC ERRMSG FINGER PING SOURCE TIME '
- 'USERINFO VERSION')
- self.assertEqual(self.client.methods,
- [('ctcpMakeReply', ('Wolf', [('CLIENTINFO', info)])),
- ('ctcpMakeReply', ('Wolf', [('CLIENTINFO', None)]))])
- def test_ctcpQuery_TIME(self):
- """
- L{IRCClient.ctcpQuery_TIME} calls L{IRCClient.ctcpMakeReply} with the
- correct args.
- """
- self.client.ctcpQuery_TIME(self.user, self.channel, 'data')
- self.assertEqual(self.client.methods[0][1][0], 'Wolf')
- def test_ctcpQuery_DCC(self):
- """
- L{IRCClient.ctcpQuery_DCC} calls L{IRCClient.ctcpMakeReply} with the
- correct args.
- """
- self.client.ctcpQuery_DCC(self.user, self.channel, 'data')
- self.assertEqual(self.client.methods,
- [('ctcpMakeReply',
- ('Wolf', [('ERRMSG',
- "DCC data :Unknown DCC type 'DATA'")]))])
- class DccChatFactoryTests(IRCTestCase):
- """
- Tests for L{DccChatFactory}.
- """
- def test_buildProtocol(self):
- """
- An instance of the L{irc.DccChat} protocol is returned, which has the
- factory property set to the factory which created it.
- """
- queryData = ('fromUser', None, None)
- factory = irc.DccChatFactory(None, queryData)
- protocol = factory.buildProtocol('127.0.0.1')
- self.assertIsInstance(protocol, irc.DccChat)
- self.assertEqual(protocol.factory, factory)
- class DccDescribeTests(IRCTestCase):
- """
- Tests for L{dccDescribe}.
- """
- def test_address(self):
- """
- L{irc.dccDescribe} supports long IP addresses.
- """
- result = irc.dccDescribe('CHAT arg 3232235522 6666')
- self.assertEqual(result, "CHAT for host 192.168.0.2, port 6666")
- class DccFileReceiveTests(IRCTestCase):
- """
- Tests for L{DccFileReceive}.
- """
- def makeConnectedDccFileReceive(self, filename, resumeOffset=0,
- overwrite=None):
- """
- Factory helper that returns a L{DccFileReceive} instance
- for a specific test case.
- @param filename: Path to the local file where received data is stored.
- @type filename: L{str}
- @param resumeOffset: An integer representing the amount of bytes from
- where the transfer of data should be resumed.
- @type resumeOffset: L{int}
- @param overwrite: A boolean specifying whether the file to write to
- should be overwritten by calling L{DccFileReceive.set_overwrite}
- or not.
- @type overwrite: L{bool}
- @return: An instance of L{DccFileReceive}.
- @rtype: L{DccFileReceive}
- """
- protocol = irc.DccFileReceive(filename, resumeOffset=resumeOffset)
- if overwrite:
- protocol.set_overwrite(True)
- transport = StringTransport()
- protocol.makeConnection(transport)
- return protocol
- def allDataReceivedForProtocol(self, protocol, data):
- """
- Arrange the protocol so that it received all data.
- @param protocol: The protocol which will receive the data.
- @type: L{DccFileReceive}
- @param data: The received data.
- @type data: L{bytest}
- """
- protocol.dataReceived(data)
- protocol.connectionLost(None)
- def test_resumeFromResumeOffset(self):
- """
- If given a resumeOffset argument, L{DccFileReceive} will attempt to
- resume from that number of bytes if the file exists.
- """
- fp = FilePath(self.mktemp())
- fp.setContent(b'Twisted is awesome!')
- protocol = self.makeConnectedDccFileReceive(fp.path, resumeOffset=11)
- self.allDataReceivedForProtocol(protocol, b'amazing!')
- self.assertEqual(fp.getContent(), b'Twisted is amazing!')
- def test_resumeFromResumeOffsetInTheMiddleOfAlreadyWrittenData(self):
- """
- When resuming from an offset somewhere in the middle of the file,
- for example, if there are 50 bytes in a file, and L{DccFileReceive}
- is given a resumeOffset of 25, and after that 15 more bytes are
- written to the file, then the resultant file should have just 40
- bytes of data.
- """
- fp = FilePath(self.mktemp())
- fp.setContent(b'Twisted is amazing!')
- protocol = self.makeConnectedDccFileReceive(fp.path, resumeOffset=11)
- self.allDataReceivedForProtocol(protocol, b'cool!')
- self.assertEqual(fp.getContent(), b'Twisted is cool!')
- def test_setOverwrite(self):
- """
- When local file already exists it can be overwritten using the
- L{DccFileReceive.set_overwrite} method.
- """
- fp = FilePath(self.mktemp())
- fp.setContent(b'I love contributing to Twisted!')
- protocol = self.makeConnectedDccFileReceive(fp.path, overwrite=True)
- self.allDataReceivedForProtocol(protocol, b'Twisted rocks!')
- self.assertEqual(fp.getContent(), b'Twisted rocks!')
- def test_fileDoesNotExist(self):
- """
- If the file does not already exist, then L{DccFileReceive} will
- create one and write the data to it.
- """
- fp = FilePath(self.mktemp())
- protocol = self.makeConnectedDccFileReceive(fp.path)
- self.allDataReceivedForProtocol(protocol, b'I <3 Twisted')
- self.assertEqual(fp.getContent(), b'I <3 Twisted')
- def test_resumeWhenFileDoesNotExist(self):
- """
- If given a resumeOffset to resume writing to a file that does not
- exist, L{DccFileReceive} will raise L{OSError}.
- """
- fp = FilePath(self.mktemp())
- error = self.assertRaises(
- OSError,
- self.makeConnectedDccFileReceive, fp.path, resumeOffset=1)
- self.assertEqual(errno.ENOENT, error.errno)
- def test_fileAlreadyExistsNoOverwrite(self):
- """
- If the file already exists and overwrite action was not asked,
- L{OSError} is raised.
- """
- fp = FilePath(self.mktemp())
- fp.touch()
- self.assertRaises(OSError, self.makeConnectedDccFileReceive, fp.path)
- def test_failToOpenLocalFile(self):
- """
- L{IOError} is raised when failing to open the requested path.
- """
- fp = FilePath(self.mktemp()).child(u'child-with-no-existing-parent')
- self.assertRaises(IOError, self.makeConnectedDccFileReceive, fp.path)
|