test_tcp.py 83 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for implementations of L{IReactorTCP} and the TCP parts of
  5. L{IReactorSocket}.
  6. """
  7. from __future__ import division, absolute_import
  8. __metaclass__ = type
  9. import errno
  10. import socket
  11. from functools import wraps
  12. from zope.interface import implementer
  13. from zope.interface.verify import verifyClass
  14. from twisted.python.compat import long
  15. from twisted.python.runtime import platform
  16. from twisted.python.failure import Failure
  17. from twisted.python import log
  18. from twisted.trial.unittest import SkipTest, TestCase
  19. from twisted.internet.error import (
  20. ConnectionLost, UserError, ConnectionRefusedError, ConnectionDone,
  21. ConnectionAborted, DNSLookupError, NoProtocol)
  22. from twisted.internet.test.connectionmixins import (
  23. LogObserverMixin, ConnectionTestsMixin, StreamClientTestsMixin,
  24. findFreePort, ConnectableProtocol, EndpointCreator,
  25. runProtocolsWithReactor, Stop, BrokenContextFactory)
  26. from twisted.internet.test.reactormixins import (
  27. ReactorBuilder, needsRunningReactor, stopOnError)
  28. from twisted.internet.interfaces import (
  29. ILoggingContext, IConnector, IReactorFDSet, IReactorSocket, IReactorTCP,
  30. IResolverSimple, ITLSTransport)
  31. from twisted.internet.address import IPv4Address, IPv6Address
  32. from twisted.internet.defer import (
  33. Deferred, DeferredList, maybeDeferred, gatherResults, succeed, fail)
  34. from twisted.internet.endpoints import TCP4ServerEndpoint, TCP4ClientEndpoint
  35. from twisted.internet.protocol import ServerFactory, ClientFactory, Protocol
  36. from twisted.internet.interfaces import (
  37. IPushProducer, IPullProducer, IHalfCloseableProtocol)
  38. from twisted.internet.tcp import Connection, Server, _resolveIPv6
  39. from twisted.internet.test.test_core import ObjectModelIntegrationMixin
  40. from twisted.test.test_tcp import MyClientFactory, MyServerFactory
  41. from twisted.test.test_tcp import ClosingFactory, ClientStartStopFactory
  42. try:
  43. from OpenSSL import SSL
  44. except ImportError:
  45. useSSL = False
  46. else:
  47. from twisted.internet.ssl import ClientContextFactory
  48. useSSL = True
  49. s = None
  50. try:
  51. s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
  52. s.bind(('::1', 0))
  53. except socket.error as e:
  54. ipv6Skip = str(e)
  55. else:
  56. ipv6Skip = None
  57. if s is not None:
  58. s.close()
  59. if platform.isWindows():
  60. from twisted.internet.test import _win32ifaces
  61. getLinkLocalIPv6Addresses = _win32ifaces.win32GetLinkLocalIPv6Addresses
  62. else:
  63. try:
  64. from twisted.internet.test import _posixifaces
  65. except ImportError:
  66. getLinkLocalIPv6Addresses = lambda: []
  67. else:
  68. getLinkLocalIPv6Addresses = _posixifaces.posixGetLinkLocalIPv6Addresses
  69. def getLinkLocalIPv6Address():
  70. """
  71. Find and return a configured link local IPv6 address including a scope
  72. identifier using the % separation syntax. If the system has no link local
  73. IPv6 addresses, raise L{SkipTest} instead.
  74. @raise SkipTest: if no link local address can be found or if the
  75. C{netifaces} module is not available.
  76. @return: a C{str} giving the address
  77. """
  78. addresses = getLinkLocalIPv6Addresses()
  79. if addresses:
  80. return addresses[0]
  81. raise SkipTest("Link local IPv6 address unavailable")
  82. def connect(client, destination):
  83. """
  84. Connect a socket to the given destination.
  85. @param client: A C{socket.socket}.
  86. @param destination: A tuple of (host, port). The host is a C{str}, the
  87. port a C{int}. If the C{host} is an IPv6 IP, the address is resolved
  88. using C{getaddrinfo} and the first version found is used.
  89. """
  90. (host, port) = destination
  91. if '%' in host or ':' in host:
  92. address = socket.getaddrinfo(host, port)[0][4]
  93. else:
  94. address = (host, port)
  95. client.connect(address)
  96. class FakeSocket(object):
  97. """
  98. A fake for L{socket.socket} objects.
  99. @ivar data: A C{str} giving the data which will be returned from
  100. L{FakeSocket.recv}.
  101. @ivar sendBuffer: A C{list} of the objects passed to L{FakeSocket.send}.
  102. """
  103. def __init__(self, data):
  104. self.data = data
  105. self.sendBuffer = []
  106. def setblocking(self, blocking):
  107. self.blocking = blocking
  108. def recv(self, size):
  109. return self.data
  110. def send(self, bytes):
  111. """
  112. I{Send} all of C{bytes} by accumulating it into C{self.sendBuffer}.
  113. @return: The length of C{bytes}, indicating all the data has been
  114. accepted.
  115. """
  116. self.sendBuffer.append(bytes)
  117. return len(bytes)
  118. def shutdown(self, how):
  119. """
  120. Shutdown is not implemented. The method is provided since real sockets
  121. have it and some code expects it. No behavior of L{FakeSocket} is
  122. affected by a call to it.
  123. """
  124. def close(self):
  125. """
  126. Close is not implemented. The method is provided since real sockets
  127. have it and some code expects it. No behavior of L{FakeSocket} is
  128. affected by a call to it.
  129. """
  130. def setsockopt(self, *args):
  131. """
  132. Setsockopt is not implemented. The method is provided since
  133. real sockets have it and some code expects it. No behavior of
  134. L{FakeSocket} is affected by a call to it.
  135. """
  136. def fileno(self):
  137. """
  138. Return a fake file descriptor. If actually used, this will have no
  139. connection to this L{FakeSocket} and will probably cause surprising
  140. results.
  141. """
  142. return 1
  143. class FakeSocketTests(TestCase):
  144. """
  145. Test that the FakeSocket can be used by the doRead method of L{Connection}
  146. """
  147. def test_blocking(self):
  148. skt = FakeSocket(b"someData")
  149. skt.setblocking(0)
  150. self.assertEqual(skt.blocking, 0)
  151. def test_recv(self):
  152. skt = FakeSocket(b"someData")
  153. self.assertEqual(skt.recv(10), b"someData")
  154. def test_send(self):
  155. """
  156. L{FakeSocket.send} accepts the entire string passed to it, adds it to
  157. its send buffer, and returns its length.
  158. """
  159. skt = FakeSocket(b"")
  160. count = skt.send(b"foo")
  161. self.assertEqual(count, 3)
  162. self.assertEqual(skt.sendBuffer, [b"foo"])
  163. class FakeProtocol(Protocol):
  164. """
  165. An L{IProtocol} that returns a value from its dataReceived method.
  166. """
  167. def dataReceived(self, data):
  168. """
  169. Return something other than L{None} to trigger a deprecation warning for
  170. that behavior.
  171. """
  172. return ()
  173. @implementer(IReactorFDSet)
  174. class _FakeFDSetReactor(object):
  175. """
  176. An in-memory implementation of L{IReactorFDSet}, which records the current
  177. sets of active L{IReadDescriptor} and L{IWriteDescriptor}s.
  178. @ivar _readers: The set of L{IReadDescriptor}s active on this
  179. L{_FakeFDSetReactor}
  180. @type _readers: L{set}
  181. @ivar _writers: The set of L{IWriteDescriptor}s active on this
  182. L{_FakeFDSetReactor}
  183. @ivar _writers: L{set}
  184. """
  185. def __init__(self):
  186. self._readers = set()
  187. self._writers = set()
  188. def addReader(self, reader):
  189. self._readers.add(reader)
  190. def removeReader(self, reader):
  191. if reader in self._readers:
  192. self._readers.remove(reader)
  193. def addWriter(self, writer):
  194. self._writers.add(writer)
  195. def removeWriter(self, writer):
  196. if writer in self._writers:
  197. self._writers.remove(writer)
  198. def removeAll(self):
  199. result = self.getReaders() + self.getWriters()
  200. self.__init__()
  201. return result
  202. def getReaders(self):
  203. return list(self._readers)
  204. def getWriters(self):
  205. return list(self._writers)
  206. verifyClass(IReactorFDSet, _FakeFDSetReactor)
  207. class TCPServerTests(TestCase):
  208. """
  209. Whitebox tests for L{twisted.internet.tcp.Server}.
  210. """
  211. def setUp(self):
  212. self.reactor = _FakeFDSetReactor()
  213. class FakePort(object):
  214. _realPortNumber = 3
  215. self.skt = FakeSocket(b"")
  216. self.protocol = Protocol()
  217. self.server = Server(
  218. self.skt, self.protocol, ("", 0), FakePort(), None, self.reactor)
  219. def test_writeAfterDisconnect(self):
  220. """
  221. L{Server.write} discards bytes passed to it if called after it has lost
  222. its connection.
  223. """
  224. self.server.connectionLost(
  225. Failure(Exception("Simulated lost connection")))
  226. self.server.write(b"hello world")
  227. self.assertEqual(self.skt.sendBuffer, [])
  228. def test_writeAfterDisconnectAfterTLS(self):
  229. """
  230. L{Server.write} discards bytes passed to it if called after it has lost
  231. its connection when the connection had started TLS.
  232. """
  233. self.server.TLS = True
  234. self.test_writeAfterDisconnect()
  235. def test_writeSequenceAfterDisconnect(self):
  236. """
  237. L{Server.writeSequence} discards bytes passed to it if called after it
  238. has lost its connection.
  239. """
  240. self.server.connectionLost(
  241. Failure(Exception("Simulated lost connection")))
  242. self.server.writeSequence([b"hello world"])
  243. self.assertEqual(self.skt.sendBuffer, [])
  244. def test_writeSequenceAfterDisconnectAfterTLS(self):
  245. """
  246. L{Server.writeSequence} discards bytes passed to it if called after it
  247. has lost its connection when the connection had started TLS.
  248. """
  249. self.server.TLS = True
  250. self.test_writeSequenceAfterDisconnect()
  251. class TCPConnectionTests(TestCase):
  252. """
  253. Whitebox tests for L{twisted.internet.tcp.Connection}.
  254. """
  255. def test_doReadWarningIsRaised(self):
  256. """
  257. When an L{IProtocol} implementation that returns a value from its
  258. C{dataReceived} method, a deprecated warning is emitted.
  259. """
  260. skt = FakeSocket(b"someData")
  261. protocol = FakeProtocol()
  262. conn = Connection(skt, protocol)
  263. conn.doRead()
  264. warnings = self.flushWarnings([FakeProtocol.dataReceived])
  265. self.assertEqual(warnings[0]['category'], DeprecationWarning)
  266. self.assertEqual(
  267. warnings[0]["message"],
  268. "Returning a value other than None from "
  269. "twisted.internet.test.test_tcp.FakeProtocol.dataReceived "
  270. "is deprecated since Twisted 11.0.0.")
  271. self.assertEqual(len(warnings), 1)
  272. def test_noTLSBeforeStartTLS(self):
  273. """
  274. The C{TLS} attribute of a L{Connection} instance is C{False} before
  275. L{Connection.startTLS} is called.
  276. """
  277. skt = FakeSocket(b"")
  278. protocol = FakeProtocol()
  279. conn = Connection(skt, protocol)
  280. self.assertFalse(conn.TLS)
  281. def test_tlsAfterStartTLS(self):
  282. """
  283. The C{TLS} attribute of a L{Connection} instance is C{True} after
  284. L{Connection.startTLS} is called.
  285. """
  286. skt = FakeSocket(b"")
  287. protocol = FakeProtocol()
  288. conn = Connection(skt, protocol, reactor=_FakeFDSetReactor())
  289. conn._tlsClientDefault = True
  290. conn.startTLS(ClientContextFactory(), True)
  291. self.assertTrue(conn.TLS)
  292. if not useSSL:
  293. test_tlsAfterStartTLS.skip = "No SSL support available"
  294. class TCPCreator(EndpointCreator):
  295. """
  296. Create IPv4 TCP endpoints for L{runProtocolsWithReactor}-based tests.
  297. """
  298. interface = "127.0.0.1"
  299. def server(self, reactor):
  300. """
  301. Create a server-side TCP endpoint.
  302. """
  303. return TCP4ServerEndpoint(reactor, 0, interface=self.interface)
  304. def client(self, reactor, serverAddress):
  305. """
  306. Create a client end point that will connect to the given address.
  307. @type serverAddress: L{IPv4Address}
  308. """
  309. return TCP4ClientEndpoint(reactor, self.interface, serverAddress.port)
  310. class TCP6Creator(TCPCreator):
  311. """
  312. Create IPv6 TCP endpoints for
  313. C{ReactorBuilder.runProtocolsWithReactor}-based tests.
  314. The endpoint types in question here are still the TCP4 variety, since
  315. these simply pass through IPv6 address literals to the reactor, and we are
  316. only testing address literals, not name resolution (as name resolution has
  317. not yet been implemented). See http://twistedmatrix.com/trac/ticket/4470
  318. for more specific information about new endpoint classes. The naming is
  319. slightly misleading, but presumably if you're passing an IPv6 literal, you
  320. know what you're asking for.
  321. """
  322. def __init__(self):
  323. self.interface = getLinkLocalIPv6Address()
  324. @implementer(IResolverSimple)
  325. class FakeResolver(object):
  326. """
  327. A resolver implementation based on a C{dict} mapping names to addresses.
  328. """
  329. def __init__(self, names):
  330. self.names = names
  331. def getHostByName(self, name, timeout):
  332. """
  333. Return the address mapped to C{name} if it exists, or raise a
  334. C{DNSLookupError}.
  335. @param name: The name to resolve.
  336. @param timeout: The lookup timeout, ignore here.
  337. """
  338. try:
  339. return succeed(self.names[name])
  340. except KeyError:
  341. return fail(DNSLookupError("FakeResolver couldn't find " + name))
  342. class TCPClientTestsBase(ReactorBuilder, ConnectionTestsMixin,
  343. StreamClientTestsMixin):
  344. """
  345. Base class for builders defining tests related to
  346. L{IReactorTCP.connectTCP}. Classes which uses this in must provide all of
  347. the documented instance variables in order to specify how the test works.
  348. These are documented as instance variables rather than declared as methods
  349. due to some peculiar inheritance ordering concerns, but they are
  350. effectively abstract methods.
  351. @ivar endpoints: A client/server endpoint creator appropriate to the
  352. address family being tested.
  353. @type endpoints: L{twisted.internet.test.connectionmixins.EndpointCreator}
  354. @ivar interface: An IP address literal to locally bind a socket to as well
  355. as to connect to. This can be any valid interface for the local host.
  356. @type interface: C{str}
  357. @ivar port: An unused local listening port to listen on and connect to.
  358. This will be used in conjunction with the C{interface}. (Depending on
  359. what they're testing, some tests will locate their own port with
  360. L{findFreePort} instead.)
  361. @type port: C{int}
  362. @ivar family: an address family constant, such as L{socket.AF_INET},
  363. L{socket.AF_INET6}, or L{socket.AF_UNIX}, which indicates the address
  364. family of the transport type under test.
  365. @type family: C{int}
  366. @ivar addressClass: the L{twisted.internet.interfaces.IAddress} implementor
  367. associated with the transport type under test. Must also be a
  368. 3-argument callable which produces an instance of same.
  369. @type addressClass: C{type}
  370. @ivar fakeDomainName: A fake domain name to use, to simulate hostname
  371. resolution and to distinguish between hostnames and IP addresses where
  372. necessary.
  373. @type fakeDomainName: C{str}
  374. """
  375. requiredInterfaces = (IReactorTCP,)
  376. _port = None
  377. @property
  378. def port(self):
  379. """
  380. Return the port number to connect to, using C{self._port} set up by
  381. C{listen} if available.
  382. @return: The port number to connect to.
  383. @rtype: C{int}
  384. """
  385. if self._port is not None:
  386. return self._port.getHost().port
  387. return findFreePort(self.interface, self.family)[1]
  388. @property
  389. def interface(self):
  390. """
  391. Return the interface attribute from the endpoints object.
  392. """
  393. return self.endpoints.interface
  394. def listen(self, reactor, factory):
  395. """
  396. Start a TCP server with the given C{factory}.
  397. @param reactor: The reactor to create the TCP port in.
  398. @param factory: The server factory.
  399. @return: A TCP port instance.
  400. """
  401. self._port = reactor.listenTCP(0, factory, interface=self.interface)
  402. return self._port
  403. def connect(self, reactor, factory):
  404. """
  405. Start a TCP client with the given C{factory}.
  406. @param reactor: The reactor to create the connection in.
  407. @param factory: The client factory.
  408. @return: A TCP connector instance.
  409. """
  410. return reactor.connectTCP(self.interface, self.port, factory)
  411. def test_buildProtocolReturnsNone(self):
  412. """
  413. When the factory's C{buildProtocol} returns L{None} the connection is
  414. gracefully closed.
  415. """
  416. connectionLost = Deferred()
  417. reactor = self.buildReactor()
  418. serverFactory = MyServerFactory()
  419. serverFactory.protocolConnectionLost = connectionLost
  420. # Make sure the test ends quickly.
  421. stopOnError(self, reactor)
  422. class NoneFactory(ServerFactory):
  423. def buildProtocol(self, address):
  424. return None
  425. listening = self.endpoints.server(reactor).listen(serverFactory)
  426. def listened(port):
  427. clientFactory = NoneFactory()
  428. endpoint = self.endpoints.client(reactor, port.getHost())
  429. return endpoint.connect(clientFactory)
  430. connecting = listening.addCallback(listened)
  431. def connectSucceeded(protocol):
  432. self.fail(
  433. "Stream client endpoint connect succeeded with %r, "
  434. "should have failed with NoProtocol." % (protocol,))
  435. def connectFailed(reason):
  436. reason.trap(NoProtocol)
  437. connecting.addCallbacks(connectSucceeded, connectFailed)
  438. def connected(ignored):
  439. # Now that the connection attempt has failed continue waiting for
  440. # the server-side connection to be lost. This is the behavior this
  441. # test is primarily concerned with.
  442. return connectionLost
  443. disconnecting = connecting.addCallback(connected)
  444. # Make sure any errors that happen in that process get logged quickly.
  445. disconnecting.addErrback(log.err)
  446. def disconnected(ignored):
  447. # The Deferred has to succeed at this point (because log.err always
  448. # returns None). If an error got logged it will fail the test.
  449. # Stop the reactor now so the test can complete one way or the
  450. # other now.
  451. reactor.stop()
  452. disconnecting.addCallback(disconnected)
  453. self.runReactor(reactor)
  454. def test_addresses(self):
  455. """
  456. A client's transport's C{getHost} and C{getPeer} return L{IPv4Address}
  457. instances which have the dotted-quad string form of the resolved
  458. address of the local and remote endpoints of the connection
  459. respectively as their C{host} attribute, not the hostname originally
  460. passed in to
  461. L{connectTCP<twisted.internet.interfaces.IReactorTCP.connectTCP>}, if a
  462. hostname was used.
  463. """
  464. host, port = findFreePort(self.interface, self.family)[:2]
  465. reactor = self.buildReactor()
  466. fakeDomain = self.fakeDomainName
  467. reactor.installResolver(FakeResolver({fakeDomain: self.interface}))
  468. server = reactor.listenTCP(
  469. 0, ServerFactory.forProtocol(Protocol), interface=host)
  470. serverAddress = server.getHost()
  471. transportData = {'host': None, 'peer': None, 'instance': None}
  472. class CheckAddress(Protocol):
  473. def makeConnection(self, transport):
  474. transportData['host'] = transport.getHost()
  475. transportData['peer'] = transport.getPeer()
  476. transportData['instance'] = transport
  477. reactor.stop()
  478. clientFactory = Stop(reactor)
  479. clientFactory.protocol = CheckAddress
  480. def connectMe():
  481. reactor.connectTCP(
  482. fakeDomain, server.getHost().port, clientFactory,
  483. bindAddress=(self.interface, port))
  484. needsRunningReactor(reactor, connectMe)
  485. self.runReactor(reactor)
  486. if clientFactory.failReason:
  487. self.fail(clientFactory.failReason.getTraceback())
  488. transportRepr = "<%s to %s at %x>" % (
  489. transportData['instance'].__class__,
  490. transportData['instance'].addr,
  491. id(transportData['instance']))
  492. self.assertEqual(
  493. transportData['host'],
  494. self.addressClass('TCP', self.interface, port))
  495. self.assertEqual(
  496. transportData['peer'],
  497. self.addressClass('TCP', self.interface, serverAddress.port))
  498. self.assertEqual(
  499. repr(transportData['instance']), transportRepr)
  500. def test_badContext(self):
  501. """
  502. If the context factory passed to L{ITCPTransport.startTLS} raises an
  503. exception from its C{getContext} method, that exception is raised by
  504. L{ITCPTransport.startTLS}.
  505. """
  506. reactor = self.buildReactor()
  507. brokenFactory = BrokenContextFactory()
  508. results = []
  509. serverFactory = ServerFactory.forProtocol(Protocol)
  510. port = reactor.listenTCP(0, serverFactory, interface=self.interface)
  511. endpoint = self.endpoints.client(reactor, port.getHost())
  512. clientFactory = ClientFactory()
  513. clientFactory.protocol = Protocol
  514. connectDeferred = endpoint.connect(clientFactory)
  515. def connected(protocol):
  516. if not ITLSTransport.providedBy(protocol.transport):
  517. results.append("skip")
  518. else:
  519. results.append(self.assertRaises(ValueError,
  520. protocol.transport.startTLS,
  521. brokenFactory))
  522. def connectFailed(failure):
  523. results.append(failure)
  524. def whenRun():
  525. connectDeferred.addCallback(connected)
  526. connectDeferred.addErrback(connectFailed)
  527. connectDeferred.addBoth(lambda ign: reactor.stop())
  528. needsRunningReactor(reactor, whenRun)
  529. self.runReactor(reactor)
  530. self.assertEqual(len(results), 1,
  531. "more than one callback result: %s" % (results,))
  532. if isinstance(results[0], Failure):
  533. # self.fail(Failure)
  534. results[0].raiseException()
  535. if results[0] == "skip":
  536. raise SkipTest("Reactor does not support ITLSTransport")
  537. self.assertEqual(BrokenContextFactory.message, str(results[0]))
  538. class TCP4ClientTestsBuilder(TCPClientTestsBase):
  539. """
  540. Builder configured with IPv4 parameters for tests related to
  541. L{IReactorTCP.connectTCP}.
  542. """
  543. fakeDomainName = 'some-fake.domain.example.com'
  544. family = socket.AF_INET
  545. addressClass = IPv4Address
  546. endpoints = TCPCreator()
  547. class TCP6ClientTestsBuilder(TCPClientTestsBase):
  548. """
  549. Builder configured with IPv6 parameters for tests related to
  550. L{IReactorTCP.connectTCP}.
  551. """
  552. if ipv6Skip:
  553. skip = ipv6Skip
  554. family = socket.AF_INET6
  555. addressClass = IPv6Address
  556. def setUp(self):
  557. # Only create this object here, so that it won't be created if tests
  558. # are being skipped:
  559. self.endpoints = TCP6Creator()
  560. # This is used by test_addresses to test the distinction between the
  561. # resolved name and the name on the socket itself. All the same
  562. # invariants should hold, but giving back an IPv6 address from a
  563. # resolver is not something the reactor can handle, so instead, we make
  564. # it so that the connect call for the IPv6 address test simply uses an
  565. # address literal.
  566. self.fakeDomainName = self.endpoints.interface
  567. class TCPConnectorTestsBuilder(ReactorBuilder):
  568. """
  569. Tests for the L{IConnector} provider returned by L{IReactorTCP.connectTCP}.
  570. """
  571. requiredInterfaces = (IReactorTCP,)
  572. def test_connectorIdentity(self):
  573. """
  574. L{IReactorTCP.connectTCP} returns an object which provides
  575. L{IConnector}. The destination of the connector is the address which
  576. was passed to C{connectTCP}. The same connector object is passed to
  577. the factory's C{startedConnecting} method as to the factory's
  578. C{clientConnectionLost} method.
  579. """
  580. serverFactory = ClosingFactory()
  581. reactor = self.buildReactor()
  582. tcpPort = reactor.listenTCP(0, serverFactory, interface=self.interface)
  583. serverFactory.port = tcpPort
  584. portNumber = tcpPort.getHost().port
  585. seenConnectors = []
  586. seenFailures = []
  587. clientFactory = ClientStartStopFactory()
  588. clientFactory.clientConnectionLost = (
  589. lambda connector, reason: (seenConnectors.append(connector),
  590. seenFailures.append(reason)))
  591. clientFactory.startedConnecting = seenConnectors.append
  592. connector = reactor.connectTCP(self.interface, portNumber,
  593. clientFactory)
  594. self.assertTrue(IConnector.providedBy(connector))
  595. dest = connector.getDestination()
  596. self.assertEqual(dest.type, "TCP")
  597. self.assertEqual(dest.host, self.interface)
  598. self.assertEqual(dest.port, portNumber)
  599. clientFactory.whenStopped.addBoth(lambda _: reactor.stop())
  600. self.runReactor(reactor)
  601. seenFailures[0].trap(ConnectionDone)
  602. self.assertEqual(seenConnectors, [connector, connector])
  603. def test_userFail(self):
  604. """
  605. Calling L{IConnector.stopConnecting} in C{Factory.startedConnecting}
  606. results in C{Factory.clientConnectionFailed} being called with
  607. L{error.UserError} as the reason.
  608. """
  609. serverFactory = MyServerFactory()
  610. reactor = self.buildReactor()
  611. tcpPort = reactor.listenTCP(0, serverFactory, interface=self.interface)
  612. portNumber = tcpPort.getHost().port
  613. fatalErrors = []
  614. def startedConnecting(connector):
  615. try:
  616. connector.stopConnecting()
  617. except Exception:
  618. fatalErrors.append(Failure())
  619. reactor.stop()
  620. clientFactory = ClientStartStopFactory()
  621. clientFactory.startedConnecting = startedConnecting
  622. clientFactory.whenStopped.addBoth(lambda _: reactor.stop())
  623. reactor.callWhenRunning(lambda: reactor.connectTCP(self.interface,
  624. portNumber,
  625. clientFactory))
  626. self.runReactor(reactor)
  627. if fatalErrors:
  628. self.fail(fatalErrors[0].getTraceback())
  629. clientFactory.reason.trap(UserError)
  630. self.assertEqual(clientFactory.failed, 1)
  631. def test_reconnect(self):
  632. """
  633. Calling L{IConnector.connect} in C{Factory.clientConnectionLost} causes
  634. a new connection attempt to be made.
  635. """
  636. serverFactory = ClosingFactory()
  637. reactor = self.buildReactor()
  638. tcpPort = reactor.listenTCP(0, serverFactory, interface=self.interface)
  639. serverFactory.port = tcpPort
  640. portNumber = tcpPort.getHost().port
  641. clientFactory = MyClientFactory()
  642. def clientConnectionLost(connector, reason):
  643. connector.connect()
  644. clientFactory.clientConnectionLost = clientConnectionLost
  645. reactor.connectTCP(self.interface, portNumber, clientFactory)
  646. protocolMadeAndClosed = []
  647. def reconnectFailed(ignored):
  648. p = clientFactory.protocol
  649. protocolMadeAndClosed.append((p.made, p.closed))
  650. reactor.stop()
  651. clientFactory.failDeferred.addCallback(reconnectFailed)
  652. self.runReactor(reactor)
  653. clientFactory.reason.trap(ConnectionRefusedError)
  654. self.assertEqual(protocolMadeAndClosed, [(1, 1)])
  655. class TCP4ConnectorTestsBuilder(TCPConnectorTestsBuilder):
  656. interface = '127.0.0.1'
  657. family = socket.AF_INET
  658. addressClass = IPv4Address
  659. class TCP6ConnectorTestsBuilder(TCPConnectorTestsBuilder):
  660. family = socket.AF_INET6
  661. addressClass = IPv6Address
  662. if ipv6Skip:
  663. skip = ipv6Skip
  664. def setUp(self):
  665. self.interface = getLinkLocalIPv6Address()
  666. def createTestSocket(test, addressFamily, socketType):
  667. """
  668. Create a socket for the duration of the given test.
  669. @param test: the test to add cleanup to.
  670. @param addressFamily: an C{AF_*} constant
  671. @param socketType: a C{SOCK_*} constant.
  672. @return: a socket object.
  673. """
  674. skt = socket.socket(addressFamily, socketType)
  675. test.addCleanup(skt.close)
  676. return skt
  677. class StreamTransportTestsMixin(LogObserverMixin):
  678. """
  679. Mixin defining tests which apply to any port/connection based transport.
  680. """
  681. def test_startedListeningLogMessage(self):
  682. """
  683. When a port starts, a message including a description of the associated
  684. factory is logged.
  685. """
  686. loggedMessages = self.observe()
  687. reactor = self.buildReactor()
  688. @implementer(ILoggingContext)
  689. class SomeFactory(ServerFactory):
  690. def logPrefix(self):
  691. return "Crazy Factory"
  692. factory = SomeFactory()
  693. p = self.getListeningPort(reactor, factory)
  694. expectedMessage = self.getExpectedStartListeningLogMessage(
  695. p, "Crazy Factory")
  696. self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
  697. def test_connectionLostLogMsg(self):
  698. """
  699. When a connection is lost, an informative message should be logged
  700. (see L{getExpectedConnectionLostLogMsg}): an address identifying
  701. the port and the fact that it was closed.
  702. """
  703. loggedMessages = []
  704. def logConnectionLostMsg(eventDict):
  705. loggedMessages.append(log.textFromEventDict(eventDict))
  706. reactor = self.buildReactor()
  707. p = self.getListeningPort(reactor, ServerFactory())
  708. expectedMessage = self.getExpectedConnectionLostLogMsg(p)
  709. log.addObserver(logConnectionLostMsg)
  710. def stopReactor(ignored):
  711. log.removeObserver(logConnectionLostMsg)
  712. reactor.stop()
  713. def doStopListening():
  714. log.addObserver(logConnectionLostMsg)
  715. maybeDeferred(p.stopListening).addCallback(stopReactor)
  716. reactor.callWhenRunning(doStopListening)
  717. reactor.run()
  718. self.assertIn(expectedMessage, loggedMessages)
  719. def test_allNewStyle(self):
  720. """
  721. The L{IListeningPort} object is an instance of a class with no
  722. classic classes in its hierarchy.
  723. """
  724. reactor = self.buildReactor()
  725. port = self.getListeningPort(reactor, ServerFactory())
  726. self.assertFullyNewStyle(port)
  727. class ListenTCPMixin(object):
  728. """
  729. Mixin which uses L{IReactorTCP.listenTCP} to hand out listening TCP ports.
  730. """
  731. def getListeningPort(self, reactor, factory, port=0, interface=''):
  732. """
  733. Get a TCP port from a reactor.
  734. """
  735. return reactor.listenTCP(port, factory, interface=interface)
  736. class SocketTCPMixin(object):
  737. """
  738. Mixin which uses L{IReactorSocket.adoptStreamPort} to hand out listening TCP
  739. ports.
  740. """
  741. def getListeningPort(self, reactor, factory, port=0, interface=''):
  742. """
  743. Get a TCP port from a reactor, wrapping an already-initialized file
  744. descriptor.
  745. """
  746. if IReactorSocket.providedBy(reactor):
  747. if ':' in interface:
  748. domain = socket.AF_INET6
  749. address = socket.getaddrinfo(interface, port)[0][4]
  750. else:
  751. domain = socket.AF_INET
  752. address = (interface, port)
  753. portSock = socket.socket(domain)
  754. portSock.bind(address)
  755. portSock.listen(3)
  756. portSock.setblocking(False)
  757. try:
  758. return reactor.adoptStreamPort(
  759. portSock.fileno(), portSock.family, factory)
  760. finally:
  761. # The socket should still be open; fileno will raise if it is
  762. # not.
  763. portSock.fileno()
  764. # Now clean it up, because the rest of the test does not need
  765. # it.
  766. portSock.close()
  767. else:
  768. raise SkipTest("Reactor does not provide IReactorSocket")
  769. class TCPPortTestsMixin(object):
  770. """
  771. Tests for L{IReactorTCP.listenTCP}
  772. """
  773. requiredInterfaces = (IReactorTCP,)
  774. def getExpectedStartListeningLogMessage(self, port, factory):
  775. """
  776. Get the message expected to be logged when a TCP port starts listening.
  777. """
  778. return "%s starting on %d" % (
  779. factory, port.getHost().port)
  780. def getExpectedConnectionLostLogMsg(self, port):
  781. """
  782. Get the expected connection lost message for a TCP port.
  783. """
  784. return "(TCP Port %s Closed)" % (port.getHost().port,)
  785. def test_portGetHostOnIPv4(self):
  786. """
  787. When no interface is passed to L{IReactorTCP.listenTCP}, the returned
  788. listening port listens on an IPv4 address.
  789. """
  790. reactor = self.buildReactor()
  791. port = self.getListeningPort(reactor, ServerFactory())
  792. address = port.getHost()
  793. self.assertIsInstance(address, IPv4Address)
  794. def test_portGetHostOnIPv6(self):
  795. """
  796. When listening on an IPv6 address, L{IListeningPort.getHost} returns
  797. an L{IPv6Address} with C{host} and C{port} attributes reflecting the
  798. address the port is bound to.
  799. """
  800. reactor = self.buildReactor()
  801. host, portNumber = findFreePort(
  802. family=socket.AF_INET6, interface='::1')[:2]
  803. port = self.getListeningPort(
  804. reactor, ServerFactory(), portNumber, host)
  805. address = port.getHost()
  806. self.assertIsInstance(address, IPv6Address)
  807. self.assertEqual('::1', address.host)
  808. self.assertEqual(portNumber, address.port)
  809. if ipv6Skip:
  810. test_portGetHostOnIPv6.skip = ipv6Skip
  811. def test_portGetHostOnIPv6ScopeID(self):
  812. """
  813. When a link-local IPv6 address including a scope identifier is passed as
  814. the C{interface} argument to L{IReactorTCP.listenTCP}, the resulting
  815. L{IListeningPort} reports its address as an L{IPv6Address} with a host
  816. value that includes the scope identifier.
  817. """
  818. linkLocal = getLinkLocalIPv6Address()
  819. reactor = self.buildReactor()
  820. port = self.getListeningPort(reactor, ServerFactory(), 0, linkLocal)
  821. address = port.getHost()
  822. self.assertIsInstance(address, IPv6Address)
  823. self.assertEqual(linkLocal, address.host)
  824. if ipv6Skip:
  825. test_portGetHostOnIPv6ScopeID.skip = ipv6Skip
  826. def _buildProtocolAddressTest(self, client, interface):
  827. """
  828. Connect C{client} to a server listening on C{interface} started with
  829. L{IReactorTCP.listenTCP} and return the address passed to the factory's
  830. C{buildProtocol} method.
  831. @param client: A C{SOCK_STREAM} L{socket.socket} created with an address
  832. family such that it will be able to connect to a server listening on
  833. C{interface}.
  834. @param interface: A C{str} giving an address for a server to listen on.
  835. This should almost certainly be the loopback address for some
  836. address family supported by L{IReactorTCP.listenTCP}.
  837. @return: Whatever object, probably an L{IAddress} provider, is passed to
  838. a server factory's C{buildProtocol} method when C{client}
  839. establishes a connection.
  840. """
  841. class ObserveAddress(ServerFactory):
  842. def buildProtocol(self, address):
  843. reactor.stop()
  844. self.observedAddress = address
  845. return Protocol()
  846. factory = ObserveAddress()
  847. reactor = self.buildReactor()
  848. port = self.getListeningPort(reactor, factory, 0, interface)
  849. client.setblocking(False)
  850. try:
  851. connect(client, (port.getHost().host, port.getHost().port))
  852. except socket.error as e:
  853. self.assertIn(e.errno, (errno.EINPROGRESS, errno.EWOULDBLOCK))
  854. self.runReactor(reactor)
  855. return factory.observedAddress
  856. def test_buildProtocolIPv4Address(self):
  857. """
  858. When a connection is accepted over IPv4, an L{IPv4Address} is passed
  859. to the factory's C{buildProtocol} method giving the peer's address.
  860. """
  861. interface = '127.0.0.1'
  862. client = createTestSocket(self, socket.AF_INET, socket.SOCK_STREAM)
  863. observedAddress = self._buildProtocolAddressTest(client, interface)
  864. self.assertEqual(
  865. IPv4Address('TCP', *client.getsockname()), observedAddress)
  866. def test_buildProtocolIPv6Address(self):
  867. """
  868. When a connection is accepted to an IPv6 address, an L{IPv6Address} is
  869. passed to the factory's C{buildProtocol} method giving the peer's
  870. address.
  871. """
  872. interface = '::1'
  873. client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
  874. observedAddress = self._buildProtocolAddressTest(client, interface)
  875. self.assertEqual(
  876. IPv6Address('TCP', *client.getsockname()[:2]), observedAddress)
  877. if ipv6Skip:
  878. test_buildProtocolIPv6Address.skip = ipv6Skip
  879. def test_buildProtocolIPv6AddressScopeID(self):
  880. """
  881. When a connection is accepted to a link-local IPv6 address, an
  882. L{IPv6Address} is passed to the factory's C{buildProtocol} method
  883. giving the peer's address, including a scope identifier.
  884. """
  885. interface = getLinkLocalIPv6Address()
  886. client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
  887. observedAddress = self._buildProtocolAddressTest(client, interface)
  888. self.assertEqual(
  889. IPv6Address('TCP', *client.getsockname()[:2]), observedAddress)
  890. if ipv6Skip:
  891. test_buildProtocolIPv6AddressScopeID.skip = ipv6Skip
  892. def _serverGetConnectionAddressTest(self, client, interface, which):
  893. """
  894. Connect C{client} to a server listening on C{interface} started with
  895. L{IReactorTCP.listenTCP} and return the address returned by one of the
  896. server transport's address lookup methods, C{getHost} or C{getPeer}.
  897. @param client: A C{SOCK_STREAM} L{socket.socket} created with an address
  898. family such that it will be able to connect to a server listening on
  899. C{interface}.
  900. @param interface: A C{str} giving an address for a server to listen on.
  901. This should almost certainly be the loopback address for some
  902. address family supported by L{IReactorTCP.listenTCP}.
  903. @param which: A C{str} equal to either C{"getHost"} or C{"getPeer"}
  904. determining which address will be returned.
  905. @return: Whatever object, probably an L{IAddress} provider, is returned
  906. from the method indicated by C{which}.
  907. """
  908. class ObserveAddress(Protocol):
  909. def makeConnection(self, transport):
  910. reactor.stop()
  911. self.factory.address = getattr(transport, which)()
  912. reactor = self.buildReactor()
  913. factory = ServerFactory()
  914. factory.protocol = ObserveAddress
  915. port = self.getListeningPort(reactor, factory, 0, interface)
  916. client.setblocking(False)
  917. try:
  918. connect(client, (port.getHost().host, port.getHost().port))
  919. except socket.error as e:
  920. self.assertIn(e.errno, (errno.EINPROGRESS, errno.EWOULDBLOCK))
  921. self.runReactor(reactor)
  922. return factory.address
  923. def test_serverGetHostOnIPv4(self):
  924. """
  925. When a connection is accepted over IPv4, the server
  926. L{ITransport.getHost} method returns an L{IPv4Address} giving the
  927. address on which the server accepted the connection.
  928. """
  929. interface = '127.0.0.1'
  930. client = createTestSocket(self, socket.AF_INET, socket.SOCK_STREAM)
  931. hostAddress = self._serverGetConnectionAddressTest(
  932. client, interface, 'getHost')
  933. self.assertEqual(
  934. IPv4Address('TCP', *client.getpeername()), hostAddress)
  935. def test_serverGetHostOnIPv6(self):
  936. """
  937. When a connection is accepted over IPv6, the server
  938. L{ITransport.getHost} method returns an L{IPv6Address} giving the
  939. address on which the server accepted the connection.
  940. """
  941. interface = '::1'
  942. client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
  943. hostAddress = self._serverGetConnectionAddressTest(
  944. client, interface, 'getHost')
  945. self.assertEqual(
  946. IPv6Address('TCP', *client.getpeername()[:2]), hostAddress)
  947. if ipv6Skip:
  948. test_serverGetHostOnIPv6.skip = ipv6Skip
  949. def test_serverGetHostOnIPv6ScopeID(self):
  950. """
  951. When a connection is accepted over IPv6, the server
  952. L{ITransport.getHost} method returns an L{IPv6Address} giving the
  953. address on which the server accepted the connection, including the scope
  954. identifier.
  955. """
  956. interface = getLinkLocalIPv6Address()
  957. client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
  958. hostAddress = self._serverGetConnectionAddressTest(
  959. client, interface, 'getHost')
  960. self.assertEqual(
  961. IPv6Address('TCP', *client.getpeername()[:2]), hostAddress)
  962. if ipv6Skip:
  963. test_serverGetHostOnIPv6ScopeID.skip = ipv6Skip
  964. def test_serverGetPeerOnIPv4(self):
  965. """
  966. When a connection is accepted over IPv4, the server
  967. L{ITransport.getPeer} method returns an L{IPv4Address} giving the
  968. address of the remote end of the connection.
  969. """
  970. interface = '127.0.0.1'
  971. client = createTestSocket(self, socket.AF_INET, socket.SOCK_STREAM)
  972. peerAddress = self._serverGetConnectionAddressTest(
  973. client, interface, 'getPeer')
  974. self.assertEqual(
  975. IPv4Address('TCP', *client.getsockname()), peerAddress)
  976. def test_serverGetPeerOnIPv6(self):
  977. """
  978. When a connection is accepted over IPv6, the server
  979. L{ITransport.getPeer} method returns an L{IPv6Address} giving the
  980. address on the remote end of the connection.
  981. """
  982. interface = '::1'
  983. client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
  984. peerAddress = self._serverGetConnectionAddressTest(
  985. client, interface, 'getPeer')
  986. self.assertEqual(
  987. IPv6Address('TCP', *client.getsockname()[:2]), peerAddress)
  988. if ipv6Skip:
  989. test_serverGetPeerOnIPv6.skip = ipv6Skip
  990. def test_serverGetPeerOnIPv6ScopeID(self):
  991. """
  992. When a connection is accepted over IPv6, the server
  993. L{ITransport.getPeer} method returns an L{IPv6Address} giving the
  994. address on the remote end of the connection, including the scope
  995. identifier.
  996. """
  997. interface = getLinkLocalIPv6Address()
  998. client = createTestSocket(self, socket.AF_INET6, socket.SOCK_STREAM)
  999. peerAddress = self._serverGetConnectionAddressTest(
  1000. client, interface, 'getPeer')
  1001. self.assertEqual(
  1002. IPv6Address('TCP', *client.getsockname()[:2]), peerAddress)
  1003. if ipv6Skip:
  1004. test_serverGetPeerOnIPv6ScopeID.skip = ipv6Skip
  1005. class TCPPortTestsBuilder(ReactorBuilder, ListenTCPMixin, TCPPortTestsMixin,
  1006. ObjectModelIntegrationMixin,
  1007. StreamTransportTestsMixin):
  1008. pass
  1009. class TCPFDPortTestsBuilder(ReactorBuilder, SocketTCPMixin, TCPPortTestsMixin,
  1010. ObjectModelIntegrationMixin,
  1011. StreamTransportTestsMixin):
  1012. pass
  1013. class StopStartReadingProtocol(Protocol):
  1014. """
  1015. Protocol that pauses and resumes the transport a few times
  1016. """
  1017. def connectionMade(self):
  1018. self.data = b''
  1019. self.pauseResumeProducing(3)
  1020. def pauseResumeProducing(self, counter):
  1021. """
  1022. Toggle transport read state, then count down.
  1023. """
  1024. self.transport.pauseProducing()
  1025. self.transport.resumeProducing()
  1026. if counter:
  1027. self.factory.reactor.callLater(0,
  1028. self.pauseResumeProducing, counter - 1)
  1029. else:
  1030. self.factory.reactor.callLater(0,
  1031. self.factory.ready.callback, self)
  1032. def dataReceived(self, data):
  1033. log.msg('got data', len(data))
  1034. self.data += data
  1035. if len(self.data) == 4*4096:
  1036. self.factory.stop.callback(self.data)
  1037. def oneTransportTest(testMethod):
  1038. """
  1039. Decorate a L{ReactorBuilder} test function which tests one reactor and one
  1040. connected transport. Run that test method in the context of
  1041. C{connectionMade}, and immediately drop the connection (and end the test)
  1042. when that completes.
  1043. @param testMethod: A unit test method on a L{ReactorBuilder} test suite;
  1044. taking two additional parameters; a C{reactor} as built by the
  1045. L{ReactorBuilder}, and an L{ITCPTransport} provider.
  1046. @type testMethod: 3-argument C{function}
  1047. @return: a no-argument test method.
  1048. @rtype: 1-argument C{function}
  1049. """
  1050. @wraps(testMethod)
  1051. def actualTestMethod(builder):
  1052. other = ConnectableProtocol()
  1053. class ServerProtocol(ConnectableProtocol):
  1054. def connectionMade(self):
  1055. try:
  1056. testMethod(builder, self.reactor, self.transport)
  1057. finally:
  1058. if self.transport is not None:
  1059. self.transport.loseConnection()
  1060. if other.transport is not None:
  1061. other.transport.loseConnection()
  1062. serverProtocol = ServerProtocol()
  1063. runProtocolsWithReactor(builder, serverProtocol, other, TCPCreator())
  1064. return actualTestMethod
  1065. def assertReading(testCase, reactor, transport):
  1066. """
  1067. Use the given test to assert that the given transport is actively reading
  1068. in the given reactor.
  1069. @note: Maintainers; for more information on why this is a function rather
  1070. than a method on a test case, see U{this document on how we structure
  1071. test tools
  1072. <http://twistedmatrix.com/trac/wiki/Design/KeepTestToolsOutOfFixtures>}
  1073. @param testCase: a test case to perform the assertion upon.
  1074. @type testCase: L{TestCase}
  1075. @param reactor: A reactor, possibly one providing L{IReactorFDSet}, or an
  1076. IOCP reactor.
  1077. @param transport: An L{ITCPTransport}
  1078. """
  1079. if IReactorFDSet.providedBy(reactor):
  1080. testCase.assertIn(transport, reactor.getReaders())
  1081. else:
  1082. # IOCP.
  1083. testCase.assertIn(transport, reactor.handles)
  1084. testCase.assertTrue(transport.reading)
  1085. def assertNotReading(testCase, reactor, transport):
  1086. """
  1087. Use the given test to assert that the given transport is I{not} actively
  1088. reading in the given reactor.
  1089. @note: Maintainers; for more information on why this is a function rather
  1090. than a method on a test case, see U{this document on how we structure
  1091. test tools
  1092. <http://twistedmatrix.com/trac/wiki/Design/KeepTestToolsOutOfFixtures>}
  1093. @param testCase: a test case to perform the assertion upon.
  1094. @type testCase: L{TestCase}
  1095. @param reactor: A reactor, possibly one providing L{IReactorFDSet}, or an
  1096. IOCP reactor.
  1097. @param transport: An L{ITCPTransport}
  1098. """
  1099. if IReactorFDSet.providedBy(reactor):
  1100. testCase.assertNotIn(transport, reactor.getReaders())
  1101. else:
  1102. # IOCP.
  1103. testCase.assertFalse(transport.reading)
  1104. class TCPConnectionTestsBuilder(ReactorBuilder):
  1105. """
  1106. Builder defining tests relating to L{twisted.internet.tcp.Connection}.
  1107. """
  1108. requiredInterfaces = (IReactorTCP,)
  1109. def test_stopStartReading(self):
  1110. """
  1111. This test verifies transport socket read state after multiple
  1112. pause/resumeProducing calls.
  1113. """
  1114. sf = ServerFactory()
  1115. reactor = sf.reactor = self.buildReactor()
  1116. skippedReactors = ["Glib2Reactor", "Gtk2Reactor"]
  1117. reactorClassName = reactor.__class__.__name__
  1118. if reactorClassName in skippedReactors and platform.isWindows():
  1119. raise SkipTest(
  1120. "This test is broken on gtk/glib under Windows.")
  1121. sf.protocol = StopStartReadingProtocol
  1122. sf.ready = Deferred()
  1123. sf.stop = Deferred()
  1124. p = reactor.listenTCP(0, sf)
  1125. port = p.getHost().port
  1126. def proceed(protos, port):
  1127. """
  1128. Send several IOCPReactor's buffers' worth of data.
  1129. """
  1130. self.assertTrue(protos[0])
  1131. self.assertTrue(protos[1])
  1132. protos = protos[0][1], protos[1][1]
  1133. protos[0].transport.write(b'x' * (2 * 4096) + b'y' * (2 * 4096))
  1134. return (sf.stop.addCallback(cleanup, protos, port)
  1135. .addCallback(lambda ign: reactor.stop()))
  1136. def cleanup(data, protos, port):
  1137. """
  1138. Make sure IOCPReactor didn't start several WSARecv operations
  1139. that clobbered each other's results.
  1140. """
  1141. self.assertEqual(data, b'x'*(2*4096) + b'y'*(2*4096),
  1142. 'did not get the right data')
  1143. return DeferredList([
  1144. maybeDeferred(protos[0].transport.loseConnection),
  1145. maybeDeferred(protos[1].transport.loseConnection),
  1146. maybeDeferred(port.stopListening)])
  1147. cc = TCP4ClientEndpoint(reactor, '127.0.0.1', port)
  1148. cf = ClientFactory()
  1149. cf.protocol = Protocol
  1150. d = DeferredList([cc.connect(cf), sf.ready]).addCallback(proceed, p)
  1151. d.addErrback(log.err)
  1152. self.runReactor(reactor)
  1153. @oneTransportTest
  1154. def test_resumeProducing(self, reactor, server):
  1155. """
  1156. When a L{Server} is connected, its C{resumeProducing} method adds it as
  1157. a reader to the reactor.
  1158. """
  1159. server.pauseProducing()
  1160. assertNotReading(self, reactor, server)
  1161. server.resumeProducing()
  1162. assertReading(self, reactor, server)
  1163. @oneTransportTest
  1164. def test_resumeProducingWhileDisconnecting(self, reactor, server):
  1165. """
  1166. When a L{Server} has already started disconnecting via
  1167. C{loseConnection}, its C{resumeProducing} method does not add it as a
  1168. reader to its reactor.
  1169. """
  1170. server.loseConnection()
  1171. server.resumeProducing()
  1172. assertNotReading(self, reactor, server)
  1173. @oneTransportTest
  1174. def test_resumeProducingWhileDisconnected(self, reactor, server):
  1175. """
  1176. When a L{Server} has already lost its connection, its
  1177. C{resumeProducing} method does not add it as a reader to its reactor.
  1178. """
  1179. server.connectionLost(Failure(Exception("dummy")))
  1180. assertNotReading(self, reactor, server)
  1181. server.resumeProducing()
  1182. assertNotReading(self, reactor, server)
  1183. def test_connectionLostAfterPausedTransport(self):
  1184. """
  1185. Alice connects to Bob. Alice writes some bytes and then shuts down the
  1186. connection. Bob receives the bytes from the connection and then pauses
  1187. the transport object. Shortly afterwards Bob resumes the transport
  1188. object. At that point, Bob is notified that the connection has been
  1189. closed.
  1190. This is no problem for most reactors. The underlying event notification
  1191. API will probably just remind them that the connection has been closed.
  1192. It is a little tricky for win32eventreactor (MsgWaitForMultipleObjects).
  1193. MsgWaitForMultipleObjects will only deliver the close notification once.
  1194. The reactor needs to remember that notification until Bob resumes the
  1195. transport.
  1196. """
  1197. class Pauser(ConnectableProtocol):
  1198. def __init__(self):
  1199. self.events = []
  1200. def dataReceived(self, bytes):
  1201. self.events.append("paused")
  1202. self.transport.pauseProducing()
  1203. self.reactor.callLater(0, self.resume)
  1204. def resume(self):
  1205. self.events.append("resumed")
  1206. self.transport.resumeProducing()
  1207. def connectionLost(self, reason):
  1208. # This is the event you have been waiting for.
  1209. self.events.append("lost")
  1210. ConnectableProtocol.connectionLost(self, reason)
  1211. class Client(ConnectableProtocol):
  1212. def connectionMade(self):
  1213. self.transport.write(b"some bytes for you")
  1214. self.transport.loseConnection()
  1215. pauser = Pauser()
  1216. runProtocolsWithReactor(self, pauser, Client(), TCPCreator())
  1217. self.assertEqual(pauser.events, ["paused", "resumed", "lost"])
  1218. def test_doubleHalfClose(self):
  1219. """
  1220. If one side half-closes its connection, and then the other side of the
  1221. connection calls C{loseWriteConnection}, and then C{loseConnection} in
  1222. {writeConnectionLost}, the connection is closed correctly.
  1223. This rather obscure case used to fail (see ticket #3037).
  1224. """
  1225. @implementer(IHalfCloseableProtocol)
  1226. class ListenerProtocol(ConnectableProtocol):
  1227. def readConnectionLost(self):
  1228. self.transport.loseWriteConnection()
  1229. def writeConnectionLost(self):
  1230. self.transport.loseConnection()
  1231. class Client(ConnectableProtocol):
  1232. def connectionMade(self):
  1233. self.transport.loseConnection()
  1234. # If test fails, reactor won't stop and we'll hit timeout:
  1235. runProtocolsWithReactor(
  1236. self, ListenerProtocol(), Client(), TCPCreator())
  1237. class WriteSequenceTestsMixin(object):
  1238. """
  1239. Test for L{twisted.internet.abstract.FileDescriptor.writeSequence}.
  1240. """
  1241. requiredInterfaces = (IReactorTCP,)
  1242. def setWriteBufferSize(self, transport, value):
  1243. """
  1244. Set the write buffer size for the given transport, mananing possible
  1245. differences (ie, IOCP). Bug #4322 should remove the need of that hack.
  1246. """
  1247. if getattr(transport, "writeBufferSize", None) is not None:
  1248. transport.writeBufferSize = value
  1249. else:
  1250. transport.bufferSize = value
  1251. def test_writeSequeceWithoutWrite(self):
  1252. """
  1253. C{writeSequence} sends the data even if C{write} hasn't been called.
  1254. """
  1255. def connected(protocols):
  1256. client, server, port = protocols
  1257. def dataReceived(data):
  1258. log.msg("data received: %r" % data)
  1259. self.assertEqual(data, b"Some sequence splitted")
  1260. client.transport.loseConnection()
  1261. server.dataReceived = dataReceived
  1262. client.transport.writeSequence([b"Some ", b"sequence ", b"splitted"])
  1263. reactor = self.buildReactor()
  1264. d = self.getConnectedClientAndServer(reactor, "127.0.0.1",
  1265. socket.AF_INET)
  1266. d.addCallback(connected)
  1267. d.addErrback(log.err)
  1268. self.runReactor(reactor)
  1269. def test_writeSequenceWithUnicodeRaisesException(self):
  1270. """
  1271. C{writeSequence} with an element in the sequence of type unicode raises
  1272. C{TypeError}.
  1273. """
  1274. def connected(protocols):
  1275. client, server, port = protocols
  1276. exc = self.assertRaises(
  1277. TypeError,
  1278. server.transport.writeSequence, [u"Unicode is not kosher"])
  1279. self.assertEqual(str(exc), "Data must not be unicode")
  1280. server.transport.loseConnection()
  1281. reactor = self.buildReactor()
  1282. d = self.getConnectedClientAndServer(reactor, "127.0.0.1",
  1283. socket.AF_INET)
  1284. d.addCallback(connected)
  1285. d.addErrback(log.err)
  1286. self.runReactor(reactor)
  1287. def test_streamingProducer(self):
  1288. """
  1289. C{writeSequence} pauses its streaming producer if too much data is
  1290. buffered, and then resumes it.
  1291. """
  1292. @implementer(IPushProducer)
  1293. class SaveActionProducer(object):
  1294. client = None
  1295. server = None
  1296. def __init__(self):
  1297. self.actions = []
  1298. def pauseProducing(self):
  1299. self.actions.append("pause")
  1300. def resumeProducing(self):
  1301. self.actions.append("resume")
  1302. # Unregister the producer so the connection can close
  1303. self.client.transport.unregisterProducer()
  1304. # This is why the code below waits for the server connection
  1305. # first - so we have it to close here. We close the server
  1306. # side because win32evenreactor cannot reliably observe us
  1307. # closing the client side (#5285).
  1308. self.server.transport.loseConnection()
  1309. def stopProducing(self):
  1310. self.actions.append("stop")
  1311. producer = SaveActionProducer()
  1312. def connected(protocols):
  1313. client, server = protocols[:2]
  1314. producer.client = client
  1315. producer.server = server
  1316. # Register a streaming producer and verify that it gets paused
  1317. # after it writes more than the local send buffer can hold.
  1318. client.transport.registerProducer(producer, True)
  1319. self.assertEqual(producer.actions, [])
  1320. self.setWriteBufferSize(client.transport, 500)
  1321. client.transport.writeSequence([b"x" * 50] * 20)
  1322. self.assertEqual(producer.actions, ["pause"])
  1323. reactor = self.buildReactor()
  1324. d = self.getConnectedClientAndServer(reactor, "127.0.0.1",
  1325. socket.AF_INET)
  1326. d.addCallback(connected)
  1327. d.addErrback(log.err)
  1328. self.runReactor(reactor)
  1329. # After the send buffer gets a chance to empty out a bit, the producer
  1330. # should be resumed.
  1331. self.assertEqual(producer.actions, ["pause", "resume"])
  1332. def test_nonStreamingProducer(self):
  1333. """
  1334. C{writeSequence} pauses its producer if too much data is buffered only
  1335. if this is a streaming producer.
  1336. """
  1337. test = self
  1338. @implementer(IPullProducer)
  1339. class SaveActionProducer(object):
  1340. client = None
  1341. def __init__(self):
  1342. self.actions = []
  1343. def resumeProducing(self):
  1344. self.actions.append("resume")
  1345. if self.actions.count("resume") == 2:
  1346. self.client.transport.stopConsuming()
  1347. else:
  1348. test.setWriteBufferSize(self.client.transport, 500)
  1349. self.client.transport.writeSequence([b"x" * 50] * 20)
  1350. def stopProducing(self):
  1351. self.actions.append("stop")
  1352. producer = SaveActionProducer()
  1353. def connected(protocols):
  1354. client = protocols[0]
  1355. producer.client = client
  1356. # Register a non-streaming producer and verify that it is resumed
  1357. # immediately.
  1358. client.transport.registerProducer(producer, False)
  1359. self.assertEqual(producer.actions, ["resume"])
  1360. reactor = self.buildReactor()
  1361. d = self.getConnectedClientAndServer(reactor, "127.0.0.1",
  1362. socket.AF_INET)
  1363. d.addCallback(connected)
  1364. d.addErrback(log.err)
  1365. self.runReactor(reactor)
  1366. # After the local send buffer empties out, the producer should be
  1367. # resumed again.
  1368. self.assertEqual(producer.actions, ["resume", "resume"])
  1369. class TCPTransportServerAddressTestMixin(object):
  1370. """
  1371. Test mixing for TCP server address building and log prefix.
  1372. """
  1373. def getConnectedClientAndServer(self, reactor, interface, addressFamily):
  1374. """
  1375. Helper method returnine a L{Deferred} firing with a tuple of a client
  1376. protocol, a server protocol, and a running TCP port.
  1377. """
  1378. raise NotImplementedError()
  1379. def _testServerAddress(self, interface, addressFamily, adressClass):
  1380. """
  1381. Helper method to test TCP server addresses on either IPv4 or IPv6.
  1382. """
  1383. def connected(protocols):
  1384. client, server, port = protocols
  1385. try:
  1386. self.assertEqual(
  1387. "<AccumulatingProtocol #%s on %s>" %
  1388. (server.transport.sessionno, port.getHost().port),
  1389. str(server.transport))
  1390. self.assertEqual(
  1391. "AccumulatingProtocol,%s,%s" %
  1392. (server.transport.sessionno, interface),
  1393. server.transport.logstr)
  1394. [peerAddress] = server.factory.peerAddresses
  1395. self.assertIsInstance(peerAddress, adressClass)
  1396. self.assertEqual('TCP', peerAddress.type)
  1397. self.assertEqual(interface, peerAddress.host)
  1398. finally:
  1399. # Be certain to drop the connection so the test completes.
  1400. server.transport.loseConnection()
  1401. reactor = self.buildReactor()
  1402. d = self.getConnectedClientAndServer(reactor, interface, addressFamily)
  1403. d.addCallback(connected)
  1404. d.addErrback(log.err)
  1405. self.runReactor(reactor)
  1406. def test_serverAddressTCP4(self):
  1407. """
  1408. L{Server} instances have a string representation indicating on which
  1409. port they're running, and the connected address is stored on the
  1410. C{peerAddresses} attribute of the factory.
  1411. """
  1412. return self._testServerAddress("127.0.0.1", socket.AF_INET,
  1413. IPv4Address)
  1414. def test_serverAddressTCP6(self):
  1415. """
  1416. IPv6 L{Server} instances have a string representation indicating on
  1417. which port they're running, and the connected address is stored on the
  1418. C{peerAddresses} attribute of the factory.
  1419. """
  1420. return self._testServerAddress(getLinkLocalIPv6Address(),
  1421. socket.AF_INET6, IPv6Address)
  1422. if ipv6Skip:
  1423. test_serverAddressTCP6.skip = ipv6Skip
  1424. class TCPTransportTestsBuilder(TCPTransportServerAddressTestMixin,
  1425. WriteSequenceTestsMixin, ReactorBuilder):
  1426. """
  1427. Test standard L{ITCPTransport}s built with C{listenTCP} and C{connectTCP}.
  1428. """
  1429. def getConnectedClientAndServer(self, reactor, interface, addressFamily):
  1430. """
  1431. Return a L{Deferred} firing with a L{MyClientFactory} and
  1432. L{MyServerFactory} connected pair, and the listening C{Port}.
  1433. """
  1434. server = MyServerFactory()
  1435. server.protocolConnectionMade = Deferred()
  1436. server.protocolConnectionLost = Deferred()
  1437. client = MyClientFactory()
  1438. client.protocolConnectionMade = Deferred()
  1439. client.protocolConnectionLost = Deferred()
  1440. port = reactor.listenTCP(0, server, interface=interface)
  1441. lostDeferred = gatherResults([client.protocolConnectionLost,
  1442. server.protocolConnectionLost])
  1443. def stop(result):
  1444. reactor.stop()
  1445. return result
  1446. lostDeferred.addBoth(stop)
  1447. startDeferred = gatherResults([client.protocolConnectionMade,
  1448. server.protocolConnectionMade])
  1449. deferred = Deferred()
  1450. def start(protocols):
  1451. client, server = protocols
  1452. log.msg("client connected %s" % client)
  1453. log.msg("server connected %s" % server)
  1454. deferred.callback((client, server, port))
  1455. startDeferred.addCallback(start)
  1456. reactor.connectTCP(interface, port.getHost().port, client)
  1457. return deferred
  1458. class AdoptStreamConnectionTestsBuilder(TCPTransportServerAddressTestMixin,
  1459. WriteSequenceTestsMixin,
  1460. ReactorBuilder):
  1461. """
  1462. Test server transports built using C{adoptStreamConnection}.
  1463. """
  1464. requiredInterfaces = (IReactorFDSet, IReactorSocket)
  1465. def getConnectedClientAndServer(self, reactor, interface, addressFamily):
  1466. """
  1467. Return a L{Deferred} firing with a L{MyClientFactory} and
  1468. L{MyServerFactory} connected pair, and the listening C{Port}. The
  1469. particularity is that the server protocol has been obtained after doing
  1470. a C{adoptStreamConnection} against the original server connection.
  1471. """
  1472. firstServer = MyServerFactory()
  1473. firstServer.protocolConnectionMade = Deferred()
  1474. server = MyServerFactory()
  1475. server.protocolConnectionMade = Deferred()
  1476. server.protocolConnectionLost = Deferred()
  1477. client = MyClientFactory()
  1478. client.protocolConnectionMade = Deferred()
  1479. client.protocolConnectionLost = Deferred()
  1480. port = reactor.listenTCP(0, firstServer, interface=interface)
  1481. def firtServerConnected(proto):
  1482. reactor.removeReader(proto.transport)
  1483. reactor.removeWriter(proto.transport)
  1484. reactor.adoptStreamConnection(
  1485. proto.transport.fileno(), addressFamily, server)
  1486. firstServer.protocolConnectionMade.addCallback(firtServerConnected)
  1487. lostDeferred = gatherResults([client.protocolConnectionLost,
  1488. server.protocolConnectionLost])
  1489. def stop(result):
  1490. if reactor.running:
  1491. reactor.stop()
  1492. return result
  1493. lostDeferred.addBoth(stop)
  1494. deferred = Deferred()
  1495. deferred.addErrback(stop)
  1496. startDeferred = gatherResults([client.protocolConnectionMade,
  1497. server.protocolConnectionMade])
  1498. def start(protocols):
  1499. client, server = protocols
  1500. log.msg("client connected %s" % client)
  1501. log.msg("server connected %s" % server)
  1502. deferred.callback((client, server, port))
  1503. startDeferred.addCallback(start)
  1504. reactor.connectTCP(interface, port.getHost().port, client)
  1505. return deferred
  1506. globals().update(TCP4ClientTestsBuilder.makeTestCaseClasses())
  1507. globals().update(TCP6ClientTestsBuilder.makeTestCaseClasses())
  1508. globals().update(TCPPortTestsBuilder.makeTestCaseClasses())
  1509. globals().update(TCPFDPortTestsBuilder.makeTestCaseClasses())
  1510. globals().update(TCPConnectionTestsBuilder.makeTestCaseClasses())
  1511. globals().update(TCP4ConnectorTestsBuilder.makeTestCaseClasses())
  1512. globals().update(TCP6ConnectorTestsBuilder.makeTestCaseClasses())
  1513. globals().update(TCPTransportTestsBuilder.makeTestCaseClasses())
  1514. globals().update(AdoptStreamConnectionTestsBuilder.makeTestCaseClasses())
  1515. class ServerAbortsTwice(ConnectableProtocol):
  1516. """
  1517. Call abortConnection() twice.
  1518. """
  1519. def dataReceived(self, data):
  1520. self.transport.abortConnection()
  1521. self.transport.abortConnection()
  1522. class ServerAbortsThenLoses(ConnectableProtocol):
  1523. """
  1524. Call abortConnection() followed by loseConnection().
  1525. """
  1526. def dataReceived(self, data):
  1527. self.transport.abortConnection()
  1528. self.transport.loseConnection()
  1529. class AbortServerWritingProtocol(ConnectableProtocol):
  1530. """
  1531. Protocol that writes data upon connection.
  1532. """
  1533. def connectionMade(self):
  1534. """
  1535. Tell the client that the connection is set up and it's time to abort.
  1536. """
  1537. self.transport.write(b"ready")
  1538. class ReadAbortServerProtocol(AbortServerWritingProtocol):
  1539. """
  1540. Server that should never receive any data, except 'X's which are written
  1541. by the other side of the connection before abortConnection, and so might
  1542. possibly arrive.
  1543. """
  1544. def dataReceived(self, data):
  1545. if data.replace(b'X', b''):
  1546. raise Exception("Unexpectedly received data.")
  1547. class NoReadServer(ConnectableProtocol):
  1548. """
  1549. Stop reading immediately on connection.
  1550. This simulates a lost connection that will cause the other side to time
  1551. out, and therefore call abortConnection().
  1552. """
  1553. def connectionMade(self):
  1554. self.transport.stopReading()
  1555. class EventualNoReadServer(ConnectableProtocol):
  1556. """
  1557. Like NoReadServer, except we Wait until some bytes have been delivered
  1558. before stopping reading. This means TLS handshake has finished, where
  1559. applicable.
  1560. """
  1561. gotData = False
  1562. stoppedReading = False
  1563. def dataReceived(self, data):
  1564. if not self.gotData:
  1565. self.gotData = True
  1566. self.transport.registerProducer(self, False)
  1567. self.transport.write(b"hello")
  1568. def resumeProducing(self):
  1569. if self.stoppedReading:
  1570. return
  1571. self.stoppedReading = True
  1572. # We've written out the data:
  1573. self.transport.stopReading()
  1574. def pauseProducing(self):
  1575. pass
  1576. def stopProducing(self):
  1577. pass
  1578. class BaseAbortingClient(ConnectableProtocol):
  1579. """
  1580. Base class for abort-testing clients.
  1581. """
  1582. inReactorMethod = False
  1583. def connectionLost(self, reason):
  1584. if self.inReactorMethod:
  1585. raise RuntimeError("BUG: connectionLost was called re-entrantly!")
  1586. ConnectableProtocol.connectionLost(self, reason)
  1587. class WritingButNotAbortingClient(BaseAbortingClient):
  1588. """
  1589. Write data, but don't abort.
  1590. """
  1591. def connectionMade(self):
  1592. self.transport.write(b"hello")
  1593. class AbortingClient(BaseAbortingClient):
  1594. """
  1595. Call abortConnection() after writing some data.
  1596. """
  1597. def dataReceived(self, data):
  1598. """
  1599. Some data was received, so the connection is set up.
  1600. """
  1601. self.inReactorMethod = True
  1602. self.writeAndAbort()
  1603. self.inReactorMethod = False
  1604. def writeAndAbort(self):
  1605. # X is written before abortConnection, and so there is a chance it
  1606. # might arrive. Y is written after, and so no Ys should ever be
  1607. # delivered:
  1608. self.transport.write(b"X" * 10000)
  1609. self.transport.abortConnection()
  1610. self.transport.write(b"Y" * 10000)
  1611. class AbortingTwiceClient(AbortingClient):
  1612. """
  1613. Call abortConnection() twice, after writing some data.
  1614. """
  1615. def writeAndAbort(self):
  1616. AbortingClient.writeAndAbort(self)
  1617. self.transport.abortConnection()
  1618. class AbortingThenLosingClient(AbortingClient):
  1619. """
  1620. Call abortConnection() and then loseConnection().
  1621. """
  1622. def writeAndAbort(self):
  1623. AbortingClient.writeAndAbort(self)
  1624. self.transport.loseConnection()
  1625. class ProducerAbortingClient(ConnectableProtocol):
  1626. """
  1627. Call abortConnection from doWrite, via resumeProducing.
  1628. """
  1629. inReactorMethod = True
  1630. producerStopped = False
  1631. def write(self):
  1632. self.transport.write(b"lalala" * 127000)
  1633. self.inRegisterProducer = True
  1634. self.transport.registerProducer(self, False)
  1635. self.inRegisterProducer = False
  1636. def connectionMade(self):
  1637. self.write()
  1638. def resumeProducing(self):
  1639. self.inReactorMethod = True
  1640. if not self.inRegisterProducer:
  1641. self.transport.abortConnection()
  1642. self.inReactorMethod = False
  1643. def stopProducing(self):
  1644. self.producerStopped = True
  1645. def connectionLost(self, reason):
  1646. if not self.producerStopped:
  1647. raise RuntimeError("BUG: stopProducing() was never called.")
  1648. if self.inReactorMethod:
  1649. raise RuntimeError("BUG: connectionLost called re-entrantly!")
  1650. ConnectableProtocol.connectionLost(self, reason)
  1651. class StreamingProducerClient(ConnectableProtocol):
  1652. """
  1653. Call abortConnection() when the other side has stopped reading.
  1654. In particular, we want to call abortConnection() only once our local
  1655. socket hits a state where it is no longer writeable. This helps emulate
  1656. the most common use case for abortConnection(), closing a connection after
  1657. a timeout, with write buffers being full.
  1658. Since it's very difficult to know when this actually happens, we just
  1659. write a lot of data, and assume at that point no more writes will happen.
  1660. """
  1661. paused = False
  1662. extraWrites = 0
  1663. inReactorMethod = False
  1664. def connectionMade(self):
  1665. self.write()
  1666. def write(self):
  1667. """
  1668. Write large amount to transport, then wait for a while for buffers to
  1669. fill up.
  1670. """
  1671. self.transport.registerProducer(self, True)
  1672. for i in range(100):
  1673. self.transport.write(b"1234567890" * 32000)
  1674. def resumeProducing(self):
  1675. self.paused = False
  1676. def stopProducing(self):
  1677. pass
  1678. def pauseProducing(self):
  1679. """
  1680. Called when local buffer fills up.
  1681. The goal is to hit the point where the local file descriptor is not
  1682. writeable (or the moral equivalent). The fact that pauseProducing has
  1683. been called is not sufficient, since that can happen when Twisted's
  1684. buffers fill up but OS hasn't gotten any writes yet. We want to be as
  1685. close as possible to every buffer (including OS buffers) being full.
  1686. So, we wait a bit more after this for Twisted to write out a few
  1687. chunks, then abortConnection.
  1688. """
  1689. if self.paused:
  1690. return
  1691. self.paused = True
  1692. # The amount we wait is arbitrary, we just want to make sure some
  1693. # writes have happened and outgoing OS buffers filled up -- see
  1694. # http://twistedmatrix.com/trac/ticket/5303 for details:
  1695. self.reactor.callLater(0.01, self.doAbort)
  1696. def doAbort(self):
  1697. if not self.paused:
  1698. log.err(RuntimeError("BUG: We should be paused a this point."))
  1699. self.inReactorMethod = True
  1700. self.transport.abortConnection()
  1701. self.inReactorMethod = False
  1702. def connectionLost(self, reason):
  1703. # Tell server to start reading again so it knows to go away:
  1704. self.otherProtocol.transport.startReading()
  1705. ConnectableProtocol.connectionLost(self, reason)
  1706. class StreamingProducerClientLater(StreamingProducerClient):
  1707. """
  1708. Call abortConnection() from dataReceived, after bytes have been
  1709. exchanged.
  1710. """
  1711. def connectionMade(self):
  1712. self.transport.write(b"hello")
  1713. self.gotData = False
  1714. def dataReceived(self, data):
  1715. if not self.gotData:
  1716. self.gotData = True
  1717. self.write()
  1718. class ProducerAbortingClientLater(ProducerAbortingClient):
  1719. """
  1720. Call abortConnection from doWrite, via resumeProducing.
  1721. Try to do so after some bytes have already been exchanged, so we
  1722. don't interrupt SSL handshake.
  1723. """
  1724. def connectionMade(self):
  1725. # Override base class connectionMade().
  1726. pass
  1727. def dataReceived(self, data):
  1728. self.write()
  1729. class DataReceivedRaisingClient(AbortingClient):
  1730. """
  1731. Call abortConnection(), and then throw exception, from dataReceived.
  1732. """
  1733. def dataReceived(self, data):
  1734. self.transport.abortConnection()
  1735. raise ZeroDivisionError("ONO")
  1736. class ResumeThrowsClient(ProducerAbortingClient):
  1737. """
  1738. Call abortConnection() and throw exception from resumeProducing().
  1739. """
  1740. def resumeProducing(self):
  1741. if not self.inRegisterProducer:
  1742. self.transport.abortConnection()
  1743. raise ZeroDivisionError("ono!")
  1744. def connectionLost(self, reason):
  1745. # Base class assertion about stopProducing being called isn't valid;
  1746. # if the we blew up in resumeProducing, consumers are justified in
  1747. # giving up on the producer and not calling stopProducing.
  1748. ConnectableProtocol.connectionLost(self, reason)
  1749. class AbortConnectionMixin(object):
  1750. """
  1751. Unit tests for L{ITransport.abortConnection}.
  1752. """
  1753. # Override in subclasses, should be an EndpointCreator instance:
  1754. endpoints = None
  1755. def runAbortTest(self, clientClass, serverClass,
  1756. clientConnectionLostReason=None):
  1757. """
  1758. A test runner utility function, which hooks up a matched pair of client
  1759. and server protocols.
  1760. We then run the reactor until both sides have disconnected, and then
  1761. verify that the right exception resulted.
  1762. """
  1763. clientExpectedExceptions = (ConnectionAborted, ConnectionLost)
  1764. serverExpectedExceptions = (ConnectionLost, ConnectionDone)
  1765. # In TLS tests we may get SSL.Error instead of ConnectionLost,
  1766. # since we're trashing the TLS protocol layer.
  1767. if useSSL:
  1768. clientExpectedExceptions = clientExpectedExceptions + (SSL.Error,)
  1769. serverExpectedExceptions = serverExpectedExceptions + (SSL.Error,)
  1770. client = clientClass()
  1771. server = serverClass()
  1772. client.otherProtocol = server
  1773. server.otherProtocol = client
  1774. reactor = runProtocolsWithReactor(self, server, client, self.endpoints)
  1775. # Make sure everything was shutdown correctly:
  1776. self.assertEqual(reactor.removeAll(), [])
  1777. self.assertEqual(reactor.getDelayedCalls(), [])
  1778. if clientConnectionLostReason is not None:
  1779. self.assertIsInstance(
  1780. client.disconnectReason.value,
  1781. (clientConnectionLostReason,) + clientExpectedExceptions)
  1782. else:
  1783. self.assertIsInstance(client.disconnectReason.value,
  1784. clientExpectedExceptions)
  1785. self.assertIsInstance(server.disconnectReason.value, serverExpectedExceptions)
  1786. def test_dataReceivedAbort(self):
  1787. """
  1788. abortConnection() is called in dataReceived. The protocol should be
  1789. disconnected, but connectionLost should not be called re-entrantly.
  1790. """
  1791. return self.runAbortTest(AbortingClient, ReadAbortServerProtocol)
  1792. def test_clientAbortsConnectionTwice(self):
  1793. """
  1794. abortConnection() is called twice by client.
  1795. No exception should be thrown, and the connection will be closed.
  1796. """
  1797. return self.runAbortTest(AbortingTwiceClient, ReadAbortServerProtocol)
  1798. def test_clientAbortsConnectionThenLosesConnection(self):
  1799. """
  1800. Client calls abortConnection(), followed by loseConnection().
  1801. No exception should be thrown, and the connection will be closed.
  1802. """
  1803. return self.runAbortTest(AbortingThenLosingClient,
  1804. ReadAbortServerProtocol)
  1805. def test_serverAbortsConnectionTwice(self):
  1806. """
  1807. abortConnection() is called twice by server.
  1808. No exception should be thrown, and the connection will be closed.
  1809. """
  1810. return self.runAbortTest(WritingButNotAbortingClient, ServerAbortsTwice,
  1811. clientConnectionLostReason=ConnectionLost)
  1812. def test_serverAbortsConnectionThenLosesConnection(self):
  1813. """
  1814. Server calls abortConnection(), followed by loseConnection().
  1815. No exception should be thrown, and the connection will be closed.
  1816. """
  1817. return self.runAbortTest(WritingButNotAbortingClient,
  1818. ServerAbortsThenLoses,
  1819. clientConnectionLostReason=ConnectionLost)
  1820. def test_resumeProducingAbort(self):
  1821. """
  1822. abortConnection() is called in resumeProducing, before any bytes have
  1823. been exchanged. The protocol should be disconnected, but
  1824. connectionLost should not be called re-entrantly.
  1825. """
  1826. self.runAbortTest(ProducerAbortingClient,
  1827. ConnectableProtocol)
  1828. def test_resumeProducingAbortLater(self):
  1829. """
  1830. abortConnection() is called in resumeProducing, after some
  1831. bytes have been exchanged. The protocol should be disconnected.
  1832. """
  1833. return self.runAbortTest(ProducerAbortingClientLater,
  1834. AbortServerWritingProtocol)
  1835. def test_fullWriteBuffer(self):
  1836. """
  1837. abortConnection() triggered by the write buffer being full.
  1838. In particular, the server side stops reading. This is supposed
  1839. to simulate a realistic timeout scenario where the client
  1840. notices the server is no longer accepting data.
  1841. The protocol should be disconnected, but connectionLost should not be
  1842. called re-entrantly.
  1843. """
  1844. self.runAbortTest(StreamingProducerClient,
  1845. NoReadServer)
  1846. def test_fullWriteBufferAfterByteExchange(self):
  1847. """
  1848. abortConnection() is triggered by a write buffer being full.
  1849. However, this buffer is filled after some bytes have been exchanged,
  1850. allowing a TLS handshake if we're testing TLS. The connection will
  1851. then be lost.
  1852. """
  1853. return self.runAbortTest(StreamingProducerClientLater,
  1854. EventualNoReadServer)
  1855. def test_dataReceivedThrows(self):
  1856. """
  1857. dataReceived calls abortConnection(), and then raises an exception.
  1858. The connection will be lost, with the thrown exception
  1859. (C{ZeroDivisionError}) as the reason on the client. The idea here is
  1860. that bugs should not be masked by abortConnection, in particular
  1861. unexpected exceptions.
  1862. """
  1863. self.runAbortTest(DataReceivedRaisingClient,
  1864. AbortServerWritingProtocol,
  1865. clientConnectionLostReason=ZeroDivisionError)
  1866. errors = self.flushLoggedErrors(ZeroDivisionError)
  1867. self.assertEqual(len(errors), 1)
  1868. def test_resumeProducingThrows(self):
  1869. """
  1870. resumeProducing calls abortConnection(), and then raises an exception.
  1871. The connection will be lost, with the thrown exception
  1872. (C{ZeroDivisionError}) as the reason on the client. The idea here is
  1873. that bugs should not be masked by abortConnection, in particular
  1874. unexpected exceptions.
  1875. """
  1876. self.runAbortTest(ResumeThrowsClient,
  1877. ConnectableProtocol,
  1878. clientConnectionLostReason=ZeroDivisionError)
  1879. errors = self.flushLoggedErrors(ZeroDivisionError)
  1880. self.assertEqual(len(errors), 1)
  1881. class AbortConnectionTests(ReactorBuilder, AbortConnectionMixin):
  1882. """
  1883. TCP-specific L{AbortConnectionMixin} tests.
  1884. """
  1885. requiredInterfaces = (IReactorTCP,)
  1886. endpoints = TCPCreator()
  1887. globals().update(AbortConnectionTests.makeTestCaseClasses())
  1888. class SimpleUtilityTests(TestCase):
  1889. """
  1890. Simple, direct tests for helpers within L{twisted.internet.tcp}.
  1891. """
  1892. if ipv6Skip:
  1893. skip = ipv6Skip
  1894. def test_resolveNumericHost(self):
  1895. """
  1896. L{_resolveIPv6} raises a L{socket.gaierror} (L{socket.EAI_NONAME}) when
  1897. invoked with a non-numeric host. (In other words, it is passing
  1898. L{socket.AI_NUMERICHOST} to L{socket.getaddrinfo} and will not
  1899. accidentally block if it receives bad input.)
  1900. """
  1901. err = self.assertRaises(socket.gaierror, _resolveIPv6, "localhost", 1)
  1902. self.assertEqual(err.args[0], socket.EAI_NONAME)
  1903. def test_resolveNumericService(self):
  1904. """
  1905. L{_resolveIPv6} raises a L{socket.gaierror} (L{socket.EAI_NONAME}) when
  1906. invoked with a non-numeric port. (In other words, it is passing
  1907. L{socket.AI_NUMERICSERV} to L{socket.getaddrinfo} and will not
  1908. accidentally block if it receives bad input.)
  1909. """
  1910. err = self.assertRaises(socket.gaierror, _resolveIPv6, "::1", "http")
  1911. self.assertEqual(err.args[0], socket.EAI_NONAME)
  1912. if platform.isWindows():
  1913. test_resolveNumericService.skip = ("The AI_NUMERICSERV flag is not "
  1914. "supported by Microsoft providers.")
  1915. # http://msdn.microsoft.com/en-us/library/windows/desktop/ms738520.aspx
  1916. def test_resolveIPv6(self):
  1917. """
  1918. L{_resolveIPv6} discovers the flow info and scope ID of an IPv6
  1919. address.
  1920. """
  1921. result = _resolveIPv6("::1", 2)
  1922. self.assertEqual(len(result), 4)
  1923. # We can't say anything more useful about these than that they're
  1924. # integers, because the whole point of getaddrinfo is that you can never
  1925. # know a-priori know _anything_ about the network interfaces of the
  1926. # computer that you're on and you have to ask it.
  1927. self.assertIsInstance(result[2], (int, long)) # flow info
  1928. self.assertIsInstance(result[3], (int, long)) # scope id
  1929. # but, luckily, IP presentation format and what it means to be a port
  1930. # number are a little better specified.
  1931. self.assertEqual(result[:2], ("::1", 2))