test_endpoints.py 142 KB


  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test the C{I...Endpoint} implementations that wrap the L{IReactorTCP},
  5. L{IReactorSSL}, and L{IReactorUNIX} interfaces found in
  6. L{twisted.internet.endpoints}.
  7. """
  8. from __future__ import division, absolute_import
  9. from errno import EPERM
  10. from socket import AF_INET, AF_INET6, SOCK_STREAM, IPPROTO_TCP, gaierror
  11. from unicodedata import normalize
  12. from types import FunctionType
  13. from zope.interface import implementer, providedBy, provider
  14. from zope.interface.interface import InterfaceClass
  15. from zope.interface.verify import verifyObject, verifyClass
  16. from twisted.trial import unittest
  17. from twisted.test.proto_helpers import MemoryReactorClock as MemoryReactor
  18. from twisted.test.proto_helpers import RaisingMemoryReactor, StringTransport
  19. from twisted.test.proto_helpers import StringTransportWithDisconnection
  20. from twisted import plugins
  21. from twisted.internet import error, interfaces, defer, endpoints, protocol
  22. from twisted.internet import reactor, threads, stdio
  23. from twisted.internet.address import IPv4Address, IPv6Address, UNIXAddress
  24. from twisted.internet.address import _ProcessAddress, HostnameAddress
  25. from twisted.internet.endpoints import StandardErrorBehavior
  26. from twisted.internet.interfaces import IConsumer, IPushProducer, ITransport
  27. from twisted.internet.protocol import ClientFactory, Protocol, Factory
  28. from twisted.internet.stdio import PipeAddress
  29. from twisted.internet.task import Clock
  30. from twisted.logger import ILogObserver, globalLogPublisher
  31. from twisted.plugin import getPlugins
  32. from twisted.python import log
  33. from twisted.python.failure import Failure
  34. from twisted.python.filepath import FilePath
  35. from twisted.python.modules import getModule
  36. from twisted.python.systemd import ListenFDs
  37. from twisted.protocols import basic, policies
  38. from twisted.test.iosim import connectedServerAndClient, connectableEndpoint
  39. from twisted.internet.error import ConnectingCancelledError
  40. from twisted.python.compat import nativeString
  41. from twisted.internet.interfaces import IHostnameResolver
  42. from twisted.internet.interfaces import IReactorPluggableNameResolver
  43. from twisted.python.components import proxyForInterface
  44. from twisted.internet.abstract import isIPv6Address
  45. pemPath = getModule("twisted.test").filePath.sibling("server.pem")
  46. casPath = getModule(__name__).filePath.sibling("fake_CAs")
  47. chainPath = casPath.child("chain.pem")
  48. escapedPEMPathName = endpoints.quoteStringArgument(pemPath.path)
  49. escapedCAsPathName = endpoints.quoteStringArgument(casPath.path)
  50. escapedChainPathName = endpoints.quoteStringArgument(chainPath.path)
  51. try:
  52. from twisted.test.test_sslverify import makeCertificate
  53. from twisted.internet.ssl import (
  54. PrivateCertificate, Certificate, CertificateOptions, KeyPair,
  55. DiffieHellmanParameters
  56. )
  57. from twisted.protocols.tls import TLSMemoryBIOFactory
  58. from OpenSSL.SSL import (
  59. ContextType, SSLv23_METHOD, TLSv1_METHOD, OP_NO_SSLv3
  60. )
  61. testCertificate = Certificate.loadPEM(pemPath.getContent())
  62. testPrivateCertificate = PrivateCertificate.loadPEM(pemPath.getContent())
  63. skipSSL = False
  64. except ImportError:
  65. skipSSL = "OpenSSL is required to construct SSL Endpoints"
  66. class TestProtocol(Protocol):
  67. """
  68. Protocol whose only function is to callback deferreds on the
  69. factory when it is connected or disconnected.
  70. """
  71. def __init__(self):
  72. self.data = []
  73. self.connectionsLost = []
  74. self.connectionMadeCalls = 0
  75. def logPrefix(self):
  76. return "A Test Protocol"
  77. def connectionMade(self):
  78. self.connectionMadeCalls += 1
  79. def dataReceived(self, data):
  80. self.data.append(data)
  81. def connectionLost(self, reason):
  82. self.connectionsLost.append(reason)
  83. @implementer(interfaces.IHalfCloseableProtocol)
  84. class TestHalfCloseableProtocol(TestProtocol):
  85. """
  86. A Protocol that implements L{IHalfCloseableProtocol} and records whether
  87. its C{readConnectionLost} and {writeConnectionLost} methods are called.
  88. @ivar readLost: A C{bool} indicating whether C{readConnectionLost} has been
  89. called.
  90. @ivar writeLost: A C{bool} indicating whether C{writeConnectionLost} has
  91. been called.
  92. """
  93. def __init__(self):
  94. TestProtocol.__init__(self)
  95. self.readLost = False
  96. self.writeLost = False
  97. def readConnectionLost(self):
  98. self.readLost = True
  99. def writeConnectionLost(self):
  100. self.writeLost = True
  101. @implementer(interfaces.IFileDescriptorReceiver)
  102. class TestFileDescriptorReceiverProtocol(TestProtocol):
  103. """
  104. A Protocol that implements L{IFileDescriptorReceiver} and records how its
  105. C{fileDescriptorReceived} method is called.
  106. @ivar receivedDescriptors: A C{list} containing all of the file descriptors
  107. passed to C{fileDescriptorReceived} calls made on this instance.
  108. """
  109. def connectionMade(self):
  110. TestProtocol.connectionMade(self)
  111. self.receivedDescriptors = []
  112. def fileDescriptorReceived(self, descriptor):
  113. self.receivedDescriptors.append(descriptor)
  114. @implementer(interfaces.IHandshakeListener)
  115. class TestHandshakeListener(TestProtocol):
  116. """
  117. A Protocol that implements L{IHandshakeListener} and records the
  118. number of times its C{handshakeCompleted} method has been called.
  119. @ivar handshakeCompletedCalls: The number of times
  120. C{handshakeCompleted}
  121. @type handshakeCompletedCalls: L{int}
  122. """
  123. def __init__(self):
  124. TestProtocol.__init__(self)
  125. self.handshakeCompletedCalls = 0
  126. def handshakeCompleted(self):
  127. """
  128. Called when a TLS handshake has completed. Implemented per
  129. L{IHandshakeListener}
  130. """
  131. self.handshakeCompletedCalls += 1
  132. class TestFactory(ClientFactory):
  133. """
  134. Simple factory to be used both when connecting and listening. It contains
  135. two deferreds which are called back when my protocol connects and
  136. disconnects.
  137. """
  138. protocol = TestProtocol
  139. class NoneFactory(ClientFactory):
  140. """
  141. A one off factory whose C{buildProtocol} returns L{None}.
  142. """
  143. def buildProtocol(self, addr):
  144. return None
  145. class WrappingFactoryTests(unittest.TestCase):
  146. """
  147. Test the behaviour of our ugly implementation detail C{_WrappingFactory}.
  148. """
  149. def test_doStart(self):
  150. """
  151. L{_WrappingFactory.doStart} passes through to the wrapped factory's
  152. C{doStart} method, allowing application-specific setup and logging.
  153. """
  154. factory = ClientFactory()
  155. wf = endpoints._WrappingFactory(factory)
  156. wf.doStart()
  157. self.assertEqual(1, factory.numPorts)
  158. def test_doStop(self):
  159. """
  160. L{_WrappingFactory.doStop} passes through to the wrapped factory's
  161. C{doStop} method, allowing application-specific cleanup and logging.
  162. """
  163. factory = ClientFactory()
  164. factory.numPorts = 3
  165. wf = endpoints._WrappingFactory(factory)
  166. wf.doStop()
  167. self.assertEqual(2, factory.numPorts)
  168. def test_failedBuildProtocol(self):
  169. """
  170. An exception raised in C{buildProtocol} of our wrappedFactory
  171. results in our C{onConnection} errback being fired.
  172. """
  173. class BogusFactory(ClientFactory):
  174. """
  175. A one off factory whose C{buildProtocol} raises an C{Exception}.
  176. """
  177. def buildProtocol(self, addr):
  178. raise ValueError("My protocol is poorly defined.")
  179. wf = endpoints._WrappingFactory(BogusFactory())
  180. wf.buildProtocol(None)
  181. d = self.assertFailure(wf._onConnection, ValueError)
  182. d.addCallback(lambda e: self.assertEqual(
  183. e.args,
  184. ("My protocol is poorly defined.",)))
  185. return d
  186. def test_buildNoneProtocol(self):
  187. """
  188. If the wrapped factory's C{buildProtocol} returns L{None} the
  189. C{onConnection} errback fires with L{error.NoProtocol}.
  190. """
  191. wrappingFactory = endpoints._WrappingFactory(NoneFactory())
  192. wrappingFactory.buildProtocol(None)
  193. self.failureResultOf(wrappingFactory._onConnection, error.NoProtocol)
  194. def test_buildProtocolReturnsNone(self):
  195. """
  196. If the wrapped factory's C{buildProtocol} returns L{None} then
  197. L{endpoints._WrappingFactory.buildProtocol} returns L{None}.
  198. """
  199. wrappingFactory = endpoints._WrappingFactory(NoneFactory())
  200. # Discard the failure this Deferred will get
  201. wrappingFactory._onConnection.addErrback(lambda reason: None)
  202. self.assertIsNone(wrappingFactory.buildProtocol(None))
  203. def test_logPrefixPassthrough(self):
  204. """
  205. If the wrapped protocol provides L{ILoggingContext}, whatever is
  206. returned from the wrapped C{logPrefix} method is returned from
  207. L{_WrappingProtocol.logPrefix}.
  208. """
  209. wf = endpoints._WrappingFactory(TestFactory())
  210. wp = wf.buildProtocol(None)
  211. self.assertEqual(wp.logPrefix(), "A Test Protocol")
  212. def test_logPrefixDefault(self):
  213. """
  214. If the wrapped protocol does not provide L{ILoggingContext}, the
  215. wrapped protocol's class name is returned from
  216. L{_WrappingProtocol.logPrefix}.
  217. """
  218. class NoProtocol(object):
  219. pass
  220. factory = TestFactory()
  221. factory.protocol = NoProtocol
  222. wf = endpoints._WrappingFactory(factory)
  223. wp = wf.buildProtocol(None)
  224. self.assertEqual(wp.logPrefix(), "NoProtocol")
  225. def test_wrappedProtocolDataReceived(self):
  226. """
  227. The wrapped C{Protocol}'s C{dataReceived} will get called when our
  228. C{_WrappingProtocol}'s C{dataReceived} gets called.
  229. """
  230. wf = endpoints._WrappingFactory(TestFactory())
  231. p = wf.buildProtocol(None)
  232. p.makeConnection(None)
  233. p.dataReceived(b'foo')
  234. self.assertEqual(p._wrappedProtocol.data, [b'foo'])
  235. p.dataReceived(b'bar')
  236. self.assertEqual(p._wrappedProtocol.data, [b'foo', b'bar'])
  237. def test_wrappedProtocolTransport(self):
  238. """
  239. Our transport is properly hooked up to the wrappedProtocol when a
  240. connection is made.
  241. """
  242. wf = endpoints._WrappingFactory(TestFactory())
  243. p = wf.buildProtocol(None)
  244. dummyTransport = object()
  245. p.makeConnection(dummyTransport)
  246. self.assertEqual(p.transport, dummyTransport)
  247. self.assertEqual(p._wrappedProtocol.transport, dummyTransport)
  248. def test_wrappedProtocolConnectionLost(self):
  249. """
  250. Our wrappedProtocol's connectionLost method is called when
  251. L{_WrappingProtocol.connectionLost} is called.
  252. """
  253. tf = TestFactory()
  254. wf = endpoints._WrappingFactory(tf)
  255. p = wf.buildProtocol(None)
  256. p.connectionLost("fail")
  257. self.assertEqual(p._wrappedProtocol.connectionsLost, ["fail"])
  258. def test_clientConnectionFailed(self):
  259. """
  260. Calls to L{_WrappingFactory.clientConnectionLost} should errback the
  261. L{_WrappingFactory._onConnection} L{Deferred}
  262. """
  263. wf = endpoints._WrappingFactory(TestFactory())
  264. expectedFailure = Failure(error.ConnectError(string="fail"))
  265. wf.clientConnectionFailed(None, expectedFailure)
  266. errors = []
  267. def gotError(f):
  268. errors.append(f)
  269. wf._onConnection.addErrback(gotError)
  270. self.assertEqual(errors, [expectedFailure])
  271. def test_wrappingProtocolFileDescriptorReceiver(self):
  272. """
  273. Our L{_WrappingProtocol} should be an L{IFileDescriptorReceiver} if the
  274. wrapped protocol is.
  275. """
  276. connectedDeferred = None
  277. applicationProtocol = TestFileDescriptorReceiverProtocol()
  278. wrapper = endpoints._WrappingProtocol(
  279. connectedDeferred, applicationProtocol)
  280. self.assertTrue(interfaces.IFileDescriptorReceiver.providedBy(wrapper))
  281. self.assertTrue(
  282. verifyObject(interfaces.IFileDescriptorReceiver, wrapper))
  283. def test_wrappingProtocolNotFileDescriptorReceiver(self):
  284. """
  285. Our L{_WrappingProtocol} does not provide L{IHalfCloseableProtocol} if
  286. the wrapped protocol doesn't.
  287. """
  288. tp = TestProtocol()
  289. p = endpoints._WrappingProtocol(None, tp)
  290. self.assertFalse(interfaces.IFileDescriptorReceiver.providedBy(p))
  291. def test_wrappedProtocolFileDescriptorReceived(self):
  292. """
  293. L{_WrappingProtocol.fileDescriptorReceived} calls the wrapped
  294. protocol's C{fileDescriptorReceived} method.
  295. """
  296. wrappedProtocol = TestFileDescriptorReceiverProtocol()
  297. wrapper = endpoints._WrappingProtocol(
  298. defer.Deferred(), wrappedProtocol)
  299. wrapper.makeConnection(StringTransport())
  300. wrapper.fileDescriptorReceived(42)
  301. self.assertEqual(wrappedProtocol.receivedDescriptors, [42])
  302. def test_wrappingProtocolHalfCloseable(self):
  303. """
  304. Our L{_WrappingProtocol} should be an L{IHalfCloseableProtocol} if the
  305. C{wrappedProtocol} is.
  306. """
  307. cd = object()
  308. hcp = TestHalfCloseableProtocol()
  309. p = endpoints._WrappingProtocol(cd, hcp)
  310. self.assertEqual(
  311. interfaces.IHalfCloseableProtocol.providedBy(p), True)
  312. def test_wrappingProtocolNotHalfCloseable(self):
  313. """
  314. Our L{_WrappingProtocol} should not provide L{IHalfCloseableProtocol}
  315. if the C{WrappedProtocol} doesn't.
  316. """
  317. tp = TestProtocol()
  318. p = endpoints._WrappingProtocol(None, tp)
  319. self.assertEqual(
  320. interfaces.IHalfCloseableProtocol.providedBy(p), False)
  321. def test_wrappingProtocolHandshakeListener(self):
  322. """
  323. Our L{_WrappingProtocol} should be an L{IHandshakeListener} if
  324. the C{wrappedProtocol} is.
  325. """
  326. handshakeListener = TestHandshakeListener()
  327. wrapped = endpoints._WrappingProtocol(None, handshakeListener)
  328. self.assertTrue(interfaces.IHandshakeListener.providedBy(wrapped))
  329. def test_wrappingProtocolNotHandshakeListener(self):
  330. """
  331. Our L{_WrappingProtocol} should not provide L{IHandshakeListener}
  332. if the C{wrappedProtocol} doesn't.
  333. """
  334. tp = TestProtocol()
  335. p = endpoints._WrappingProtocol(None, tp)
  336. self.assertFalse(interfaces.IHandshakeListener.providedBy(p))
  337. def test_wrappedProtocolReadConnectionLost(self):
  338. """
  339. L{_WrappingProtocol.readConnectionLost} should proxy to the wrapped
  340. protocol's C{readConnectionLost}
  341. """
  342. hcp = TestHalfCloseableProtocol()
  343. p = endpoints._WrappingProtocol(None, hcp)
  344. p.readConnectionLost()
  345. self.assertTrue(hcp.readLost)
  346. def test_wrappedProtocolWriteConnectionLost(self):
  347. """
  348. L{_WrappingProtocol.writeConnectionLost} should proxy to the wrapped
  349. protocol's C{writeConnectionLost}
  350. """
  351. hcp = TestHalfCloseableProtocol()
  352. p = endpoints._WrappingProtocol(None, hcp)
  353. p.writeConnectionLost()
  354. self.assertTrue(hcp.writeLost)
  355. def test_wrappedProtocolHandshakeCompleted(self):
  356. """
  357. L{_WrappingProtocol.handshakeCompleted} should proxy to the
  358. wrapped protocol's C{handshakeCompleted}
  359. """
  360. listener = TestHandshakeListener()
  361. wrapped = endpoints._WrappingProtocol(None, listener)
  362. wrapped.handshakeCompleted()
  363. self.assertEqual(listener.handshakeCompletedCalls, 1)
  364. class ClientEndpointTestCaseMixin(object):
  365. """
  366. Generic test methods to be mixed into all client endpoint test classes.
  367. """
  368. def test_interface(self):
  369. """
  370. The endpoint provides L{interfaces.IStreamClientEndpoint}
  371. """
  372. clientFactory = object()
  373. ep, ignoredArgs, address = self.createClientEndpoint(
  374. MemoryReactor(), clientFactory)
  375. self.assertTrue(verifyObject(interfaces.IStreamClientEndpoint, ep))
  376. def retrieveConnectedFactory(self, reactor):
  377. """
  378. Retrieve a single factory that has connected using the given reactor.
  379. (This behavior is valid for TCP and SSL but needs to be overridden for
  380. UNIX.)
  381. @param reactor: a L{MemoryReactor}
  382. """
  383. return self.expectedClients(reactor)[0][2]
  384. def test_endpointConnectSuccess(self):
  385. """
  386. A client endpoint can connect and returns a deferred who gets called
  387. back with a protocol instance.
  388. """
  389. proto = object()
  390. mreactor = MemoryReactor()
  391. clientFactory = object()
  392. ep, expectedArgs, ignoredDest = self.createClientEndpoint(
  393. mreactor, clientFactory)
  394. d = ep.connect(clientFactory)
  395. receivedProtos = []
  396. def checkProto(p):
  397. receivedProtos.append(p)
  398. d.addCallback(checkProto)
  399. factory = self.retrieveConnectedFactory(mreactor)
  400. factory._onConnection.callback(proto)
  401. self.assertEqual(receivedProtos, [proto])
  402. expectedClients = self.expectedClients(mreactor)
  403. self.assertEqual(len(expectedClients), 1)
  404. self.assertConnectArgs(expectedClients[0], expectedArgs)
  405. def test_endpointConnectFailure(self):
  406. """
  407. If an endpoint tries to connect to a non-listening port it gets
  408. a C{ConnectError} failure.
  409. """
  410. expectedError = error.ConnectError(string="Connection Failed")
  411. mreactor = RaisingMemoryReactor(connectException=expectedError)
  412. clientFactory = object()
  413. ep, ignoredArgs, ignoredDest = self.createClientEndpoint(
  414. mreactor, clientFactory)
  415. d = ep.connect(clientFactory)
  416. receivedExceptions = []
  417. def checkFailure(f):
  418. receivedExceptions.append(f.value)
  419. d.addErrback(checkFailure)
  420. self.assertEqual(receivedExceptions, [expectedError])
  421. def test_endpointConnectingCancelled(self):
  422. """
  423. Calling L{Deferred.cancel} on the L{Deferred} returned from
  424. L{IStreamClientEndpoint.connect} is errbacked with an expected
  425. L{ConnectingCancelledError} exception.
  426. """
  427. mreactor = MemoryReactor()
  428. clientFactory = object()
  429. ep, ignoredArgs, address = self.createClientEndpoint(
  430. mreactor, clientFactory)
  431. d = ep.connect(clientFactory)
  432. receivedFailures = []
  433. def checkFailure(f):
  434. receivedFailures.append(f)
  435. d.addErrback(checkFailure)
  436. d.cancel()
  437. # When canceled, the connector will immediately notify its factory that
  438. # the connection attempt has failed due to a UserError.
  439. attemptFactory = self.retrieveConnectedFactory(mreactor)
  440. attemptFactory.clientConnectionFailed(None, Failure(error.UserError()))
  441. # This should be a feature of MemoryReactor: <http://tm.tl/5630>.
  442. self.assertEqual(len(receivedFailures), 1)
  443. failure = receivedFailures[0]
  444. self.assertIsInstance(failure.value, error.ConnectingCancelledError)
  445. self.assertEqual(failure.value.address, address)
  446. def test_endpointConnectNonDefaultArgs(self):
  447. """
  448. The endpoint should pass it's connectArgs parameter to the reactor's
  449. listen methods.
  450. """
  451. factory = object()
  452. mreactor = MemoryReactor()
  453. ep, expectedArgs, ignoredHost = self.createClientEndpoint(
  454. mreactor, factory,
  455. **self.connectArgs())
  456. ep.connect(factory)
  457. expectedClients = self.expectedClients(mreactor)
  458. self.assertEqual(len(expectedClients), 1)
  459. self.assertConnectArgs(expectedClients[0], expectedArgs)
  460. class ServerEndpointTestCaseMixin(object):
  461. """
  462. Generic test methods to be mixed into all client endpoint test classes.
  463. """
  464. def test_interface(self):
  465. """
  466. The endpoint provides L{interfaces.IStreamServerEndpoint}.
  467. """
  468. factory = object()
  469. ep, ignoredArgs, ignoredDest = self.createServerEndpoint(
  470. MemoryReactor(), factory)
  471. self.assertTrue(verifyObject(interfaces.IStreamServerEndpoint, ep))
  472. def test_endpointListenSuccess(self):
  473. """
  474. An endpoint can listen and returns a deferred that gets called back
  475. with a port instance.
  476. """
  477. mreactor = MemoryReactor()
  478. factory = object()
  479. ep, expectedArgs, expectedHost = self.createServerEndpoint(
  480. mreactor, factory)
  481. d = ep.listen(factory)
  482. receivedHosts = []
  483. def checkPortAndServer(port):
  484. receivedHosts.append(port.getHost())
  485. d.addCallback(checkPortAndServer)
  486. self.assertEqual(receivedHosts, [expectedHost])
  487. self.assertEqual(self.expectedServers(mreactor), [expectedArgs])
  488. def test_endpointListenFailure(self):
  489. """
  490. When an endpoint tries to listen on an already listening port, a
  491. C{CannotListenError} failure is errbacked.
  492. """
  493. factory = object()
  494. exception = error.CannotListenError('', 80, factory)
  495. mreactor = RaisingMemoryReactor(listenException=exception)
  496. ep, ignoredArgs, ignoredDest = self.createServerEndpoint(
  497. mreactor, factory)
  498. d = ep.listen(object())
  499. receivedExceptions = []
  500. def checkFailure(f):
  501. receivedExceptions.append(f.value)
  502. d.addErrback(checkFailure)
  503. self.assertEqual(receivedExceptions, [exception])
  504. def test_endpointListenNonDefaultArgs(self):
  505. """
  506. The endpoint should pass it's listenArgs parameter to the reactor's
  507. listen methods.
  508. """
  509. factory = object()
  510. mreactor = MemoryReactor()
  511. ep, expectedArgs, ignoredHost = self.createServerEndpoint(
  512. mreactor, factory,
  513. **self.listenArgs())
  514. ep.listen(factory)
  515. expectedServers = self.expectedServers(mreactor)
  516. self.assertEqual(expectedServers, [expectedArgs])
  517. class EndpointTestCaseMixin(ServerEndpointTestCaseMixin,
  518. ClientEndpointTestCaseMixin):
  519. """
  520. Generic test methods to be mixed into all endpoint test classes.
  521. """
  522. class SpecificFactory(Factory):
  523. """
  524. An L{IProtocolFactory} whose C{buildProtocol} always returns its
  525. C{specificProtocol} and sets C{passedAddress}.
  526. Raising an exception if C{specificProtocol} has already been used.
  527. """
  528. def __init__(self, specificProtocol):
  529. self.specificProtocol = specificProtocol
  530. def buildProtocol(self, addr):
  531. if hasattr(self.specificProtocol, 'passedAddress'):
  532. raise ValueError("specificProtocol already used.")
  533. self.specificProtocol.passedAddress = addr
  534. return self.specificProtocol
  535. class FakeStdio(object):
  536. """
  537. A L{stdio.StandardIO} like object that simply captures its constructor
  538. arguments.
  539. """
  540. def __init__(self, protocolInstance, reactor=None):
  541. """
  542. @param protocolInstance: like the first argument of L{stdio.StandardIO}
  543. @param reactor: like the reactor keyword argument of
  544. L{stdio.StandardIO}
  545. """
  546. self.protocolInstance = protocolInstance
  547. self.reactor = reactor
  548. class StandardIOEndpointsTests(unittest.TestCase):
  549. """
  550. Tests for Standard I/O Endpoints
  551. """
  552. def setUp(self):
  553. """
  554. Construct a L{StandardIOEndpoint} with a dummy reactor and a fake
  555. L{stdio.StandardIO} like object. Listening on it with a
  556. L{SpecificFactory}.
  557. """
  558. self.reactor = object()
  559. endpoint = endpoints.StandardIOEndpoint(self.reactor)
  560. self.assertIs(endpoint._stdio, stdio.StandardIO)
  561. endpoint._stdio = FakeStdio
  562. self.specificProtocol = Protocol()
  563. self.fakeStdio = self.successResultOf(
  564. endpoint.listen(SpecificFactory(self.specificProtocol))
  565. )
  566. def test_protocolCreation(self):
  567. """
  568. L{StandardIOEndpoint} returns a L{Deferred} that fires with an instance
  569. of a L{stdio.StandardIO} like object that was passed the result of
  570. L{SpecificFactory.buildProtocol} which was passed a L{PipeAddress}.
  571. """
  572. self.assertIs(self.fakeStdio.protocolInstance,
  573. self.specificProtocol)
  574. self.assertIsInstance(self.fakeStdio.protocolInstance.passedAddress,
  575. PipeAddress)
  576. def test_passedReactor(self):
  577. """
  578. L{StandardIOEndpoint} passes its C{reactor} argument to the constructor
  579. of its L{stdio.StandardIO} like object.
  580. """
  581. self.assertIs(self.fakeStdio.reactor, self.reactor)
  582. class StubApplicationProtocol(protocol.Protocol):
  583. """
  584. An L{IProtocol} provider.
  585. """
  586. def dataReceived(self, data):
  587. """
  588. @param data: The data received by the protocol.
  589. @type data: str
  590. """
  591. self.data = data
  592. def connectionLost(self, reason):
  593. """
  594. @type reason: L{twisted.python.failure.Failure}
  595. """
  596. self.reason = reason
  597. @implementer(interfaces.IProcessTransport)
  598. class MemoryProcessTransport(StringTransportWithDisconnection, object):
  599. """
  600. A fake L{IProcessTransport} provider to be used in tests.
  601. """
  602. def __init__(self, protocol=None):
  603. super(MemoryProcessTransport, self).__init__(
  604. hostAddress=_ProcessAddress(),
  605. peerAddress=_ProcessAddress())
  606. self.signals = []
  607. self.closedChildFDs = set()
  608. self.protocol = Protocol()
  609. def writeToChild(self, childFD, data):
  610. if childFD == 0:
  611. self.write(data)
  612. def closeStdin(self):
  613. self.closeChildFD(0)
  614. def closeStdout(self):
  615. self.closeChildFD(1)
  616. def closeStderr(self):
  617. self.closeChildFD(2)
  618. def closeChildFD(self, fd):
  619. self.closedChildFDs.add(fd)
  620. def signalProcess(self, signal):
  621. self.signals.append(signal)
  622. verifyClass(interfaces.IConsumer, MemoryProcessTransport)
  623. verifyClass(interfaces.IPushProducer, MemoryProcessTransport)
  624. verifyClass(interfaces.IProcessTransport, MemoryProcessTransport)
  625. @implementer(interfaces.IReactorProcess)
  626. class MemoryProcessReactor(object):
  627. """
  628. A fake L{IReactorProcess} provider to be used in tests.
  629. """
  630. def spawnProcess(self, processProtocol, executable, args=(), env={},
  631. path=None, uid=None, gid=None, usePTY=0, childFDs=None):
  632. """
  633. @ivar processProtocol: Stores the protocol passed to the reactor.
  634. @return: An L{IProcessTransport} provider.
  635. """
  636. self.processProtocol = processProtocol
  637. self.executable = executable
  638. self.args = args
  639. self.env = env
  640. self.path = path
  641. self.uid = uid
  642. self.gid = gid
  643. self.usePTY = usePTY
  644. self.childFDs = childFDs
  645. self.processTransport = MemoryProcessTransport()
  646. self.processProtocol.makeConnection(self.processTransport)
  647. return self.processTransport
  648. class ProcessEndpointsTests(unittest.TestCase):
  649. """
  650. Tests for child process endpoints.
  651. """
  652. def setUp(self):
  653. self.reactor = MemoryProcessReactor()
  654. self.ep = endpoints.ProcessEndpoint(self.reactor, b'/bin/executable')
  655. self.factory = protocol.Factory()
  656. self.factory.protocol = StubApplicationProtocol
  657. def test_constructorDefaults(self):
  658. """
  659. Default values are set for the optional parameters in the endpoint.
  660. """
  661. self.assertIsInstance(self.ep._reactor, MemoryProcessReactor)
  662. self.assertEqual(self.ep._executable, b'/bin/executable')
  663. self.assertEqual(self.ep._args, ())
  664. self.assertEqual(self.ep._env, {})
  665. self.assertIsNone(self.ep._path)
  666. self.assertIsNone(self.ep._uid)
  667. self.assertIsNone(self.ep._gid)
  668. self.assertEqual(self.ep._usePTY, 0)
  669. self.assertIsNone(self.ep._childFDs)
  670. self.assertEqual(self.ep._errFlag, StandardErrorBehavior.LOG)
  671. def test_constructorNonDefaults(self):
  672. """
  673. The parameters passed to the endpoint are stored in it.
  674. """
  675. environ = {b'HOME': None}
  676. ep = endpoints.ProcessEndpoint(
  677. MemoryProcessReactor(), b'/bin/executable',
  678. [b'/bin/executable'], {b'HOME': environ[b'HOME']},
  679. b'/runProcessHere/', 1, 2, True, {3: 'w', 4: 'r', 5: 'r'},
  680. StandardErrorBehavior.DROP)
  681. self.assertIsInstance(ep._reactor, MemoryProcessReactor)
  682. self.assertEqual(ep._executable, b'/bin/executable')
  683. self.assertEqual(ep._args, [b'/bin/executable'])
  684. self.assertEqual(ep._env, {b'HOME': environ[b'HOME']})
  685. self.assertEqual(ep._path, b'/runProcessHere/')
  686. self.assertEqual(ep._uid, 1)
  687. self.assertEqual(ep._gid, 2)
  688. self.assertTrue(ep._usePTY)
  689. self.assertEqual(ep._childFDs, {3: 'w', 4: 'r', 5: 'r'})
  690. self.assertEqual(ep._errFlag, StandardErrorBehavior.DROP)
  691. def test_wrappedProtocol(self):
  692. """
  693. The wrapper function _WrapIProtocol gives an IProcessProtocol
  694. implementation that wraps over an IProtocol.
  695. """
  696. d = self.ep.connect(self.factory)
  697. self.successResultOf(d)
  698. wpp = self.reactor.processProtocol
  699. self.assertIsInstance(wpp, endpoints._WrapIProtocol)
  700. def test_spawnProcess(self):
  701. """
  702. The parameters for spawnProcess stored in the endpoint are passed when
  703. the endpoint's connect method is invoked.
  704. """
  705. environ = {b'HOME': None}
  706. memoryReactor = MemoryProcessReactor()
  707. ep = endpoints.ProcessEndpoint(
  708. memoryReactor, b'/bin/executable',
  709. [b'/bin/executable'], {b'HOME': environ[b'HOME']},
  710. b'/runProcessHere/', 1, 2, True, {3: 'w', 4: 'r', 5: 'r'})
  711. d = ep.connect(self.factory)
  712. self.successResultOf(d)
  713. self.assertIsInstance(memoryReactor.processProtocol,
  714. endpoints._WrapIProtocol)
  715. self.assertEqual(memoryReactor.executable, ep._executable)
  716. self.assertEqual(memoryReactor.args, ep._args)
  717. self.assertEqual(memoryReactor.env, ep._env)
  718. self.assertEqual(memoryReactor.path, ep._path)
  719. self.assertEqual(memoryReactor.uid, ep._uid)
  720. self.assertEqual(memoryReactor.gid, ep._gid)
  721. self.assertEqual(memoryReactor.usePTY, ep._usePTY)
  722. self.assertEqual(memoryReactor.childFDs, ep._childFDs)
  723. def test_processAddress(self):
  724. """
  725. The address passed to the factory's buildProtocol in the endpoint is a
  726. _ProcessAddress instance.
  727. """
  728. class TestAddrFactory(protocol.Factory):
  729. protocol = StubApplicationProtocol
  730. address = None
  731. def buildProtocol(self, addr):
  732. self.address = addr
  733. p = self.protocol()
  734. p.factory = self
  735. return p
  736. myFactory = TestAddrFactory()
  737. d = self.ep.connect(myFactory)
  738. self.successResultOf(d)
  739. self.assertIsInstance(myFactory.address, _ProcessAddress)
  740. def test_connect(self):
  741. """
  742. L{ProcessEndpoint.connect} returns a Deferred with the connected
  743. protocol.
  744. """
  745. proto = self.successResultOf(self.ep.connect(self.factory))
  746. self.assertIsInstance(proto, StubApplicationProtocol)
  747. def test_connectFailure(self):
  748. """
  749. In case of failure, L{ProcessEndpoint.connect} returns a Deferred that
  750. fails.
  751. """
  752. def testSpawnProcess(pp, executable, args, env, path,
  753. uid, gid, usePTY, childFDs):
  754. raise Exception()
  755. self.ep._spawnProcess = testSpawnProcess
  756. d = self.ep.connect(self.factory)
  757. error = self.failureResultOf(d)
  758. error.trap(Exception)
  759. class ProcessEndpointTransportTests(unittest.TestCase):
  760. """
  761. Test the behaviour of the implementation detail
  762. L{endpoints._ProcessEndpointTransport}.
  763. """
  764. def setUp(self):
  765. self.reactor = MemoryProcessReactor()
  766. self.endpoint = endpoints.ProcessEndpoint(self.reactor,
  767. b'/bin/executable')
  768. protocol = self.successResultOf(
  769. self.endpoint.connect(Factory.forProtocol(Protocol))
  770. )
  771. self.process = self.reactor.processTransport
  772. self.endpointTransport = protocol.transport
  773. def test_verifyConsumer(self):
  774. """
  775. L{_ProcessEndpointTransport}s provide L{IConsumer}.
  776. """
  777. verifyObject(IConsumer, self.endpointTransport)
  778. def test_verifyProducer(self):
  779. """
  780. L{_ProcessEndpointTransport}s provide L{IPushProducer}.
  781. """
  782. verifyObject(IPushProducer, self.endpointTransport)
  783. def test_verifyTransport(self):
  784. """
  785. L{_ProcessEndpointTransport}s provide L{ITransport}.
  786. """
  787. verifyObject(ITransport, self.endpointTransport)
  788. def test_constructor(self):
  789. """
  790. The L{_ProcessEndpointTransport} instance stores the process passed to
  791. it.
  792. """
  793. self.assertIs(self.endpointTransport._process, self.process)
  794. def test_registerProducer(self):
  795. """
  796. Registering a producer with the endpoint transport registers it with
  797. the underlying process transport.
  798. """
  799. @implementer(IPushProducer)
  800. class AProducer(object):
  801. pass
  802. aProducer = AProducer()
  803. self.endpointTransport.registerProducer(aProducer, False)
  804. self.assertIs(self.process.producer, aProducer)
  805. def test_pauseProducing(self):
  806. """
  807. Pausing the endpoint transport pauses the underlying process transport.
  808. """
  809. self.endpointTransport.pauseProducing()
  810. self.assertEqual(self.process.producerState, 'paused')
  811. def test_resumeProducing(self):
  812. """
  813. Resuming the endpoint transport resumes the underlying process
  814. transport.
  815. """
  816. self.test_pauseProducing()
  817. self.endpointTransport.resumeProducing()
  818. self.assertEqual(self.process.producerState, 'producing')
  819. def test_stopProducing(self):
  820. """
  821. Stopping the endpoint transport as a producer stops the underlying
  822. process transport.
  823. """
  824. self.endpointTransport.stopProducing()
  825. self.assertEqual(self.process.producerState, 'stopped')
  826. def test_unregisterProducer(self):
  827. """
  828. Unregistring the endpoint transport's producer unregisters the
  829. underlying process transport's producer.
  830. """
  831. self.test_registerProducer()
  832. self.endpointTransport.unregisterProducer()
  833. self.assertIsNone(self.process.producer)
  834. def test_extraneousAttributes(self):
  835. """
  836. L{endpoints._ProcessEndpointTransport} filters out extraneous
  837. attributes of its underlying transport, to present a more consistent
  838. cross-platform view of subprocesses and prevent accidental
  839. dependencies.
  840. """
  841. self.process.pipes = []
  842. self.assertRaises(AttributeError,
  843. getattr, self.endpointTransport, 'pipes')
  844. def test_writeSequence(self):
  845. """
  846. The writeSequence method of L{_ProcessEndpointTransport} writes a list
  847. of string passed to it to the transport's stdin.
  848. """
  849. self.endpointTransport.writeSequence([b'test1', b'test2', b'test3'])
  850. self.assertEqual(self.process.io.getvalue(), b'test1test2test3')
  851. def test_write(self):
  852. """
  853. The write method of L{_ProcessEndpointTransport} writes a string of
  854. data passed to it to the child process's stdin.
  855. """
  856. self.endpointTransport.write(b'test')
  857. self.assertEqual(self.process.io.getvalue(), b'test')
  858. def test_loseConnection(self):
  859. """
  860. A call to the loseConnection method of a L{_ProcessEndpointTransport}
  861. instance returns a call to the process transport's loseConnection.
  862. """
  863. self.endpointTransport.loseConnection()
  864. self.assertFalse(self.process.connected)
  865. def test_getHost(self):
  866. """
  867. L{_ProcessEndpointTransport.getHost} returns a L{_ProcessAddress}
  868. instance matching the process C{getHost}.
  869. """
  870. host = self.endpointTransport.getHost()
  871. self.assertIsInstance(host, _ProcessAddress)
  872. self.assertIs(host, self.process.getHost())
  873. def test_getPeer(self):
  874. """
  875. L{_ProcessEndpointTransport.getPeer} returns a L{_ProcessAddress}
  876. instance matching the process C{getPeer}.
  877. """
  878. peer = self.endpointTransport.getPeer()
  879. self.assertIsInstance(peer, _ProcessAddress)
  880. self.assertIs(peer, self.process.getPeer())
  881. class WrappedIProtocolTests(unittest.TestCase):
  882. """
  883. Test the behaviour of the implementation detail C{_WrapIProtocol}.
  884. """
  885. def setUp(self):
  886. self.reactor = MemoryProcessReactor()
  887. self.ep = endpoints.ProcessEndpoint(self.reactor, b'/bin/executable')
  888. self.eventLog = None
  889. self.factory = protocol.Factory()
  890. self.factory.protocol = StubApplicationProtocol
  891. def test_constructor(self):
  892. """
  893. Stores an L{IProtocol} provider and the flag to log/drop stderr
  894. """
  895. d = self.ep.connect(self.factory)
  896. self.successResultOf(d)
  897. wpp = self.reactor.processProtocol
  898. self.assertIsInstance(wpp.protocol, StubApplicationProtocol)
  899. self.assertEqual(wpp.errFlag, self.ep._errFlag)
  900. def test_makeConnection(self):
  901. """
  902. Our process transport is properly hooked up to the wrappedIProtocol
  903. when a connection is made.
  904. """
  905. d = self.ep.connect(self.factory)
  906. self.successResultOf(d)
  907. wpp = self.reactor.processProtocol
  908. self.assertEqual(wpp.protocol.transport, wpp.transport)
  909. def _stdLog(self, eventDict):
  910. """
  911. A log observer.
  912. """
  913. self.eventLog = eventDict
  914. def test_logStderr(self):
  915. """
  916. When the _errFlag is set to L{StandardErrorBehavior.LOG},
  917. L{endpoints._WrapIProtocol} logs stderr (in childDataReceived).
  918. """
  919. d = self.ep.connect(self.factory)
  920. self.successResultOf(d)
  921. wpp = self.reactor.processProtocol
  922. log.addObserver(self._stdLog)
  923. self.addCleanup(log.removeObserver, self._stdLog)
  924. wpp.childDataReceived(2, b'stderr1')
  925. self.assertEqual(self.eventLog['executable'], wpp.executable)
  926. self.assertEqual(self.eventLog['data'], b'stderr1')
  927. self.assertEqual(self.eventLog['protocol'], wpp.protocol)
  928. self.assertIn(
  929. 'wrote stderr unhandled by',
  930. log.textFromEventDict(self.eventLog))
  931. def test_stderrSkip(self):
  932. """
  933. When the _errFlag is set to L{StandardErrorBehavior.DROP},
  934. L{endpoints._WrapIProtocol} ignores stderr.
  935. """
  936. self.ep._errFlag = StandardErrorBehavior.DROP
  937. d = self.ep.connect(self.factory)
  938. self.successResultOf(d)
  939. wpp = self.reactor.processProtocol
  940. log.addObserver(self._stdLog)
  941. self.addCleanup(log.removeObserver, self._stdLog)
  942. wpp.childDataReceived(2, b'stderr2')
  943. self.assertIsNone(self.eventLog)
  944. def test_stdout(self):
  945. """
  946. In childDataReceived of L{_WrappedIProtocol} instance, the protocol's
  947. dataReceived is called when stdout is generated.
  948. """
  949. d = self.ep.connect(self.factory)
  950. self.successResultOf(d)
  951. wpp = self.reactor.processProtocol
  952. wpp.childDataReceived(1, b'stdout')
  953. self.assertEqual(wpp.protocol.data, b'stdout')
  954. def test_processDone(self):
  955. """
  956. L{error.ProcessDone} with status=0 is turned into a clean disconnect
  957. type, i.e. L{error.ConnectionDone}.
  958. """
  959. d = self.ep.connect(self.factory)
  960. self.successResultOf(d)
  961. wpp = self.reactor.processProtocol
  962. wpp.processEnded(Failure(error.ProcessDone(0)))
  963. self.assertEqual(
  964. wpp.protocol.reason.check(error.ConnectionDone),
  965. error.ConnectionDone)
  966. def test_processEnded(self):
  967. """
  968. Exceptions other than L{error.ProcessDone} with status=0 are turned
  969. into L{error.ConnectionLost}.
  970. """
  971. d = self.ep.connect(self.factory)
  972. self.successResultOf(d)
  973. wpp = self.reactor.processProtocol
  974. wpp.processEnded(Failure(error.ProcessTerminated()))
  975. self.assertEqual(wpp.protocol.reason.check(error.ConnectionLost),
  976. error.ConnectionLost)
  977. class TCP4EndpointsTests(EndpointTestCaseMixin, unittest.TestCase):
  978. """
  979. Tests for TCP IPv4 Endpoints.
  980. """
  981. def expectedServers(self, reactor):
  982. """
  983. @return: List of calls to L{IReactorTCP.listenTCP}
  984. """
  985. return reactor.tcpServers
  986. def expectedClients(self, reactor):
  987. """
  988. @return: List of calls to L{IReactorTCP.connectTCP}
  989. """
  990. return reactor.tcpClients
  991. def assertConnectArgs(self, receivedArgs, expectedArgs):
  992. """
  993. Compare host, port, timeout, and bindAddress in C{receivedArgs}
  994. to C{expectedArgs}. We ignore the factory because we don't
  995. only care what protocol comes out of the
  996. C{IStreamClientEndpoint.connect} call.
  997. @param receivedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  998. C{timeout}, C{bindAddress}) that was passed to
  999. L{IReactorTCP.connectTCP}.
  1000. @param expectedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1001. C{timeout}, C{bindAddress}) that we expect to have been passed
  1002. to L{IReactorTCP.connectTCP}.
  1003. """
  1004. (host, port, ignoredFactory, timeout, bindAddress) = receivedArgs
  1005. (expectedHost, expectedPort, _ignoredFactory,
  1006. expectedTimeout, expectedBindAddress) = expectedArgs
  1007. self.assertEqual(host, expectedHost)
  1008. self.assertEqual(port, expectedPort)
  1009. self.assertEqual(timeout, expectedTimeout)
  1010. self.assertEqual(bindAddress, expectedBindAddress)
  1011. def connectArgs(self):
  1012. """
  1013. @return: C{dict} of keyword arguments to pass to connect.
  1014. """
  1015. return {'timeout': 10, 'bindAddress': ('localhost', 49595)}
  1016. def listenArgs(self):
  1017. """
  1018. @return: C{dict} of keyword arguments to pass to listen
  1019. """
  1020. return {'backlog': 100, 'interface': '127.0.0.1'}
  1021. def createServerEndpoint(self, reactor, factory, **listenArgs):
  1022. """
  1023. Create an L{TCP4ServerEndpoint} and return the values needed to verify
  1024. its behaviour.
  1025. @param reactor: A fake L{IReactorTCP} that L{TCP4ServerEndpoint} can
  1026. call L{IReactorTCP.listenTCP} on.
  1027. @param factory: The thing that we expect to be passed to our
  1028. L{IStreamServerEndpoint.listen} implementation.
  1029. @param listenArgs: Optional dictionary of arguments to
  1030. L{IReactorTCP.listenTCP}.
  1031. """
  1032. address = IPv4Address("TCP", "0.0.0.0", 0)
  1033. if listenArgs is None:
  1034. listenArgs = {}
  1035. return (endpoints.TCP4ServerEndpoint(reactor,
  1036. address.port,
  1037. **listenArgs),
  1038. (address.port, factory,
  1039. listenArgs.get('backlog', 50),
  1040. listenArgs.get('interface', '')),
  1041. address)
  1042. def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
  1043. """
  1044. Create an L{TCP4ClientEndpoint} and return the values needed to verify
  1045. its behavior.
  1046. @param reactor: A fake L{IReactorTCP} that L{TCP4ClientEndpoint} can
  1047. call L{IReactorTCP.connectTCP} on.
  1048. @param clientFactory: The thing that we expect to be passed to our
  1049. L{IStreamClientEndpoint.connect} implementation.
  1050. @param connectArgs: Optional dictionary of arguments to
  1051. L{IReactorTCP.connectTCP}
  1052. """
  1053. address = IPv4Address("TCP", "localhost", 80)
  1054. return (endpoints.TCP4ClientEndpoint(reactor,
  1055. address.host,
  1056. address.port,
  1057. **connectArgs),
  1058. (address.host, address.port, clientFactory,
  1059. connectArgs.get('timeout', 30),
  1060. connectArgs.get('bindAddress', None)),
  1061. address)
  1062. class TCP6EndpointsTests(EndpointTestCaseMixin, unittest.TestCase):
  1063. """
  1064. Tests for TCP IPv6 Endpoints.
  1065. """
  1066. def expectedServers(self, reactor):
  1067. """
  1068. @return: List of calls to L{IReactorTCP.listenTCP}
  1069. """
  1070. return reactor.tcpServers
  1071. def expectedClients(self, reactor):
  1072. """
  1073. @return: List of calls to L{IReactorTCP.connectTCP}
  1074. """
  1075. return reactor.tcpClients
  1076. def assertConnectArgs(self, receivedArgs, expectedArgs):
  1077. """
  1078. Compare host, port, timeout, and bindAddress in C{receivedArgs}
  1079. to C{expectedArgs}. We ignore the factory because we don't
  1080. only care what protocol comes out of the
  1081. C{IStreamClientEndpoint.connect} call.
  1082. @param receivedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1083. C{timeout}, C{bindAddress}) that was passed to
  1084. L{IReactorTCP.connectTCP}.
  1085. @param expectedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1086. C{timeout}, C{bindAddress}) that we expect to have been passed
  1087. to L{IReactorTCP.connectTCP}.
  1088. """
  1089. (host, port, ignoredFactory, timeout, bindAddress) = receivedArgs
  1090. (expectedHost, expectedPort, _ignoredFactory,
  1091. expectedTimeout, expectedBindAddress) = expectedArgs
  1092. self.assertEqual(host, expectedHost)
  1093. self.assertEqual(port, expectedPort)
  1094. self.assertEqual(timeout, expectedTimeout)
  1095. self.assertEqual(bindAddress, expectedBindAddress)
  1096. def connectArgs(self):
  1097. """
  1098. @return: C{dict} of keyword arguments to pass to connect.
  1099. """
  1100. return {'timeout': 10, 'bindAddress': ('localhost', 49595)}
  1101. def listenArgs(self):
  1102. """
  1103. @return: C{dict} of keyword arguments to pass to listen
  1104. """
  1105. return {'backlog': 100, 'interface': '::1'}
  1106. def createServerEndpoint(self, reactor, factory, **listenArgs):
  1107. """
  1108. Create a L{TCP6ServerEndpoint} and return the values needed to verify
  1109. its behaviour.
  1110. @param reactor: A fake L{IReactorTCP} that L{TCP6ServerEndpoint} can
  1111. call L{IReactorTCP.listenTCP} on.
  1112. @param factory: The thing that we expect to be passed to our
  1113. L{IStreamServerEndpoint.listen} implementation.
  1114. @param listenArgs: Optional dictionary of arguments to
  1115. L{IReactorTCP.listenTCP}.
  1116. """
  1117. interface = listenArgs.get('interface', '::')
  1118. address = IPv6Address("TCP", interface, 0)
  1119. if listenArgs is None:
  1120. listenArgs = {}
  1121. return (endpoints.TCP6ServerEndpoint(reactor,
  1122. address.port,
  1123. **listenArgs),
  1124. (address.port, factory,
  1125. listenArgs.get('backlog', 50),
  1126. interface),
  1127. address)
  1128. def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
  1129. """
  1130. Create a L{TCP6ClientEndpoint} and return the values needed to verify
  1131. its behavior.
  1132. @param reactor: A fake L{IReactorTCP} that L{TCP6ClientEndpoint} can
  1133. call L{IReactorTCP.connectTCP} on.
  1134. @param clientFactory: The thing that we expect to be passed to our
  1135. L{IStreamClientEndpoint.connect} implementation.
  1136. @param connectArgs: Optional dictionary of arguments to
  1137. L{IReactorTCP.connectTCP}
  1138. """
  1139. address = IPv6Address("TCP", "::1", 80)
  1140. return (endpoints.TCP6ClientEndpoint(reactor,
  1141. address.host,
  1142. address.port,
  1143. **connectArgs),
  1144. (address.host, address.port, clientFactory,
  1145. connectArgs.get('timeout', 30),
  1146. connectArgs.get('bindAddress', None)),
  1147. address)
  1148. class TCP6EndpointNameResolutionTests(ClientEndpointTestCaseMixin,
  1149. unittest.TestCase):
  1150. """
  1151. Tests for a TCP IPv6 Client Endpoint pointed at a hostname instead
  1152. of an IPv6 address literal.
  1153. """
  1154. def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
  1155. """
  1156. Create a L{TCP6ClientEndpoint} and return the values needed to verify
  1157. its behavior.
  1158. @param reactor: A fake L{IReactorTCP} that L{TCP6ClientEndpoint} can
  1159. call L{IReactorTCP.connectTCP} on.
  1160. @param clientFactory: The thing that we expect to be passed to our
  1161. L{IStreamClientEndpoint.connect} implementation.
  1162. @param connectArgs: Optional dictionary of arguments to
  1163. L{IReactorTCP.connectTCP}
  1164. """
  1165. address = IPv6Address("TCP", "::2", 80)
  1166. self.ep = endpoints.TCP6ClientEndpoint(
  1167. reactor, 'ipv6.example.com', address.port, **connectArgs)
  1168. def testNameResolution(host):
  1169. self.assertEqual("ipv6.example.com", host)
  1170. data = [(AF_INET6, SOCK_STREAM, IPPROTO_TCP, '', ('::2', 0, 0, 0)),
  1171. (AF_INET6, SOCK_STREAM, IPPROTO_TCP, '', ('::3', 0, 0, 0)),
  1172. (AF_INET6, SOCK_STREAM, IPPROTO_TCP, '', ('::4', 0, 0, 0))]
  1173. return defer.succeed(data)
  1174. self.ep._nameResolution = testNameResolution
  1175. return (self.ep,
  1176. (address.host, address.port, clientFactory,
  1177. connectArgs.get('timeout', 30),
  1178. connectArgs.get('bindAddress', None)),
  1179. address)
  1180. def connectArgs(self):
  1181. """
  1182. @return: C{dict} of keyword arguments to pass to connect.
  1183. """
  1184. return {'timeout': 10, 'bindAddress': ('localhost', 49595)}
  1185. def expectedClients(self, reactor):
  1186. """
  1187. @return: List of calls to L{IReactorTCP.connectTCP}
  1188. """
  1189. return reactor.tcpClients
  1190. def assertConnectArgs(self, receivedArgs, expectedArgs):
  1191. """
  1192. Compare host, port, timeout, and bindAddress in C{receivedArgs}
  1193. to C{expectedArgs}. We ignore the factory because we don't
  1194. only care what protocol comes out of the
  1195. C{IStreamClientEndpoint.connect} call.
  1196. @param receivedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1197. C{timeout}, C{bindAddress}) that was passed to
  1198. L{IReactorTCP.connectTCP}.
  1199. @param expectedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1200. C{timeout}, C{bindAddress}) that we expect to have been passed
  1201. to L{IReactorTCP.connectTCP}.
  1202. """
  1203. (host, port, ignoredFactory, timeout, bindAddress) = receivedArgs
  1204. (expectedHost, expectedPort, _ignoredFactory,
  1205. expectedTimeout, expectedBindAddress) = expectedArgs
  1206. self.assertEqual(host, expectedHost)
  1207. self.assertEqual(port, expectedPort)
  1208. self.assertEqual(timeout, expectedTimeout)
  1209. self.assertEqual(bindAddress, expectedBindAddress)
  1210. def test_freeFunctionDeferToThread(self):
  1211. """
  1212. By default, L{TCP6ClientEndpoint._deferToThread} is
  1213. L{threads.deferToThread}.
  1214. """
  1215. ep = endpoints.TCP6ClientEndpoint(None, 'www.example.com', 1234)
  1216. self.assertEqual(ep._deferToThread, threads.deferToThread)
  1217. def test_nameResolution(self):
  1218. """
  1219. While resolving hostnames, _nameResolution calls
  1220. _deferToThread with _getaddrinfo.
  1221. """
  1222. calls = []
  1223. def fakeDeferToThread(f, *args, **kwargs):
  1224. calls.append((f, args, kwargs))
  1225. return defer.Deferred()
  1226. endpoint = endpoints.TCP6ClientEndpoint(
  1227. reactor, 'ipv6.example.com', 1234)
  1228. fakegetaddrinfo = object()
  1229. endpoint._getaddrinfo = fakegetaddrinfo
  1230. endpoint._deferToThread = fakeDeferToThread
  1231. endpoint.connect(TestFactory())
  1232. self.assertEqual(
  1233. [(fakegetaddrinfo, ("ipv6.example.com", 0, AF_INET6), {})], calls)
  1234. class RaisingMemoryReactorWithClock(RaisingMemoryReactor, Clock):
  1235. """
  1236. An extension of L{RaisingMemoryReactor} with L{task.Clock}.
  1237. """
  1238. def __init__(self, listenException=None, connectException=None):
  1239. Clock.__init__(self)
  1240. RaisingMemoryReactor.__init__(self, listenException, connectException)
  1241. def deterministicResolvingReactor(reactor, expectedAddresses=(),
  1242. hostMap=None):
  1243. """
  1244. Create a reactor that will deterministically resolve all hostnames it is
  1245. passed to the list of addresses given.
  1246. @param reactor: An object that we wish to add an
  1247. L{IReactorPluggableNameResolver} to.
  1248. @type reactor: Any object with some formally-declared interfaces (i.e. one
  1249. where C{list(providedBy(reactor))} is not empty); usually C{IReactor*}
  1250. interfaces.
  1251. @param expectedAddresses: (optional); the addresses expected to be returned
  1252. for every address. If these are strings, they should be IPv4 or IPv6
  1253. literals, and they will be wrapped in L{IPv4Address} and L{IPv6Address}
  1254. objects in the resolution result.
  1255. @type expectedAddresses: iterable of C{object} or C{str}
  1256. @param hostMap: (optional); the names (unicode) mapped to lists of
  1257. addresses (str or L{IAddress}); in the same format as expectedAddress,
  1258. which map the results for I{specific} hostnames to addresses.
  1259. @return: A new reactor which provides all the interfaces previously
  1260. provided by C{reactor} as well as L{IReactorPluggableNameResolver}.
  1261. All name resolutions performed with its C{nameResolver} attribute will
  1262. resolve reentrantly and synchronously with the given
  1263. C{expectedAddresses}. However, it is not a complete implementation as
  1264. it does not have an C{installNameResolver} method.
  1265. """
  1266. if hostMap is None:
  1267. hostMap = {}
  1268. hostMap = hostMap.copy()
  1269. @implementer(IHostnameResolver)
  1270. class SimpleNameResolver(object):
  1271. @staticmethod
  1272. def resolveHostName(resolutionReceiver, hostName, portNumber=0,
  1273. addressTypes=None, transportSemantics='TCP'):
  1274. resolutionReceiver.resolutionBegan(None)
  1275. for expectedAddress in hostMap.get(hostName, expectedAddresses):
  1276. if isinstance(expectedAddress, str):
  1277. expectedAddress = ([IPv4Address, IPv6Address]
  1278. [isIPv6Address(expectedAddress)]
  1279. ('TCP', expectedAddress, portNumber))
  1280. resolutionReceiver.addressResolved(expectedAddress)
  1281. resolutionReceiver.resolutionComplete()
  1282. @implementer(IReactorPluggableNameResolver)
  1283. class WithResolver(proxyForInterface(
  1284. InterfaceClass('*', tuple(providedBy(reactor)))
  1285. )):
  1286. nameResolver = SimpleNameResolver()
  1287. return WithResolver(reactor)
  1288. class SimpleHostnameResolverTests(unittest.SynchronousTestCase):
  1289. """
  1290. Tests for L{endpoints._SimpleHostnameResolver}.
  1291. @ivar fakeResolverCalls: Arguments with which L{fakeResolver} was
  1292. called.
  1293. @type fakeResolverCalls: L{list} of C{(hostName, port)} L{tuple}s.
  1294. @ivar fakeResolverReturns: The return value of L{fakeResolver}.
  1295. @type fakeResolverReturns: L{Deferred}
  1296. @ivar resolver: The instance to test.
  1297. @type resolver: L{endpoints._SimpleHostnameResolver}
  1298. @ivar resolutionBeganCalls: Arguments with which receiver's
  1299. C{resolutionBegan} method was called.
  1300. @type resolutionBeganCalls: L{list}
  1301. @ivar addressResolved: Arguments with which C{addressResolved} was
  1302. called.
  1303. @type addressResolved: L{list}
  1304. @ivar resolutionCompleteCallCount: The number of calls to the
  1305. receiver's C{resolutionComplete} method.
  1306. @type resolutionCompleteCallCount: L{int}
  1307. @ivar receiver: A L{interfaces.IResolutionReceiver} provider.
  1308. """
  1309. def setUp(self):
  1310. self.fakeResolverCalls = []
  1311. self.fakeResolverReturns = defer.Deferred()
  1312. self.resolver = endpoints._SimpleHostnameResolver(self.fakeResolver)
  1313. self.resolutionBeganCalls = []
  1314. self.addressResolvedCalls = []
  1315. self.resolutionCompleteCallCount = 0
  1316. @provider(interfaces.IResolutionReceiver)
  1317. class _Receiver(object):
  1318. @staticmethod
  1319. def resolutionBegan(resolutionInProgress):
  1320. self.resolutionBeganCalls.append(resolutionInProgress)
  1321. @staticmethod
  1322. def addressResolved(address):
  1323. self.addressResolvedCalls.append(address)
  1324. @staticmethod
  1325. def resolutionComplete():
  1326. self.resolutionCompleteCallCount += 1
  1327. self.receiver = _Receiver
  1328. def fakeResolver(self, hostName, portNumber):
  1329. """
  1330. A fake resolver callable.
  1331. @param hostName: The hostname to resolve.
  1332. @param portNumber: The port number the returned address should
  1333. include.
  1334. @return: L{fakeResolverCalls}
  1335. @rtype: L{Deferred}
  1336. """
  1337. self.fakeResolverCalls.append((hostName, portNumber))
  1338. return self.fakeResolverReturns
  1339. def test_interface(self):
  1340. """
  1341. A L{endpoints._SimpleHostnameResolver} instance provides
  1342. L{interfaces.IHostnameResolver}.
  1343. """
  1344. self.assertTrue(verifyObject(interfaces.IHostnameResolver,
  1345. self.resolver))
  1346. def test_resolveNameFailure(self):
  1347. """
  1348. A resolution failure is logged with the name that failed to
  1349. resolve and the callable that tried to resolve it. The
  1350. resolution receiver begins, receives no addresses, and
  1351. completes.
  1352. """
  1353. logs = []
  1354. @provider(ILogObserver)
  1355. def captureLogs(event):
  1356. logs.append(event)
  1357. globalLogPublisher.addObserver(captureLogs)
  1358. self.addCleanup(lambda: globalLogPublisher.removeObserver(captureLogs))
  1359. receiver = self.resolver.resolveHostName(self.receiver, "example.com")
  1360. self.assertIs(receiver, self.receiver)
  1361. self.fakeResolverReturns.errback(Exception())
  1362. self.assertEqual(1, len(logs))
  1363. self.assertEqual(1, len(self.flushLoggedErrors(Exception)))
  1364. [event] = logs
  1365. self.assertTrue(event.get("isError"))
  1366. self.assertTrue(event.get("name", "example.com"))
  1367. self.assertTrue(event.get("callable", repr(self.fakeResolver)))
  1368. self.assertEqual(1, len(self.resolutionBeganCalls))
  1369. self.assertEqual(self.resolutionBeganCalls[0].name, "example.com")
  1370. self.assertFalse(self.addressResolvedCalls)
  1371. self.assertEqual(1, self.resolutionCompleteCallCount)
  1372. def test_resolveNameDelivers(self):
  1373. """
  1374. The resolution receiver begins, and resolved hostnames are
  1375. delivered before it completes.
  1376. """
  1377. port = 80
  1378. ipv4Host = '1.2.3.4'
  1379. ipv6Host = '1::2::3::4'
  1380. receiver = self.resolver.resolveHostName(self.receiver, "example.com")
  1381. self.assertIs(receiver, self.receiver)
  1382. self.fakeResolverReturns.callback([
  1383. (AF_INET, SOCK_STREAM, IPPROTO_TCP, '', (ipv4Host, port)),
  1384. (AF_INET6, SOCK_STREAM, IPPROTO_TCP, '', (ipv6Host, port)),
  1385. ])
  1386. self.assertEqual(1, len(self.resolutionBeganCalls))
  1387. self.assertEqual(self.resolutionBeganCalls[0].name, "example.com")
  1388. self.assertEqual(self.addressResolvedCalls, [
  1389. IPv4Address("TCP", ipv4Host, port),
  1390. IPv6Address("TCP", ipv6Host, port)
  1391. ])
  1392. self.assertEqual(self.resolutionCompleteCallCount, 1)
  1393. class HostnameEndpointFallbackNameResolutionTests(unittest.TestCase):
  1394. """
  1395. L{HostnameEndpoint._fallbackNameResolution} defers a name
  1396. resolution call to a thread.
  1397. """
  1398. def test_fallbackNameResolution(self):
  1399. """
  1400. L{_fallbackNameResolution} returns a L{Deferred} that fires
  1401. with the resoution of the the host and request port.
  1402. """
  1403. from twisted.internet import reactor
  1404. ep = endpoints.HostnameEndpoint(reactor,
  1405. host='ignored',
  1406. port=0)
  1407. host, port = ("1.2.3.4", 1)
  1408. resolutionDeferred = ep._fallbackNameResolution(host, port)
  1409. def assertHostPortFamilySockType(result):
  1410. self.assertEqual(len(result), 1)
  1411. [(family, socktype, _, _, sockaddr)] = result
  1412. self.assertEqual(family, AF_INET)
  1413. self.assertEqual(socktype, SOCK_STREAM)
  1414. self.assertEqual(sockaddr, (host, port))
  1415. return resolutionDeferred.addCallback(assertHostPortFamilySockType)
  1416. class _HostnameEndpointMemoryReactorMixin(ClientEndpointTestCaseMixin):
  1417. """
  1418. Common methods for testing L{HostnameEndpoint} against
  1419. L{MemoryReactor} instances that do not provide
  1420. L{IReactorPluggableNameResolver}.
  1421. """
  1422. def synchronousDeferredToThread(self, f, *args, **kwargs):
  1423. """
  1424. A synchronous version of L{deferToThread}.
  1425. @param f: The callable to invoke.
  1426. @type f: L{callable}
  1427. @param args: Positional arguments to the callable.
  1428. @param kwargs: Keyword arguments to the callable.
  1429. @return: A L{Deferred} that fires with the result of applying
  1430. C{f} to C{args} and C{kwargs} or the exception raised.
  1431. """
  1432. try:
  1433. result = f(*args, **kwargs)
  1434. except:
  1435. return defer.fail()
  1436. else:
  1437. return defer.succeed(result)
  1438. def expectedClients(self, reactor):
  1439. """
  1440. Extract expected clients from the reactor.
  1441. @param reactor: The L{MemoryReactor} under test.
  1442. @return: List of calls to L{IReactorTCP.connectTCP}
  1443. """
  1444. return reactor.tcpClients
  1445. def connectArgs(self):
  1446. """
  1447. @return: C{dict} of keyword arguments to pass to connect.
  1448. """
  1449. return {'timeout': 10, 'bindAddress': ('localhost', 49595)}
  1450. def assertConnectArgs(self, receivedArgs, expectedArgs):
  1451. """
  1452. Compare host, port, timeout, and bindAddress in C{receivedArgs}
  1453. to C{expectedArgs}. We ignore the factory because we don't
  1454. only care what protocol comes out of the
  1455. C{IStreamClientEndpoint.connect} call.
  1456. @param receivedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1457. C{timeout}, C{bindAddress}) that was passed to
  1458. L{IReactorTCP.connectTCP}.
  1459. @param expectedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1460. C{timeout}, C{bindAddress}) that we expect to have been passed
  1461. to L{IReactorTCP.connectTCP}.
  1462. """
  1463. (host, port, ignoredFactory, timeout, bindAddress) = receivedArgs
  1464. (expectedHost, expectedPort, _ignoredFactory,
  1465. expectedTimeout, expectedBindAddress) = expectedArgs
  1466. self.assertEqual(host, expectedHost)
  1467. self.assertEqual(port, expectedPort)
  1468. self.assertEqual(timeout, expectedTimeout)
  1469. self.assertEqual(bindAddress, expectedBindAddress)
  1470. def test_endpointConnectFailure(self):
  1471. """
  1472. When L{HostnameEndpoint.connect} cannot connect to its
  1473. destination, the returned L{Deferred} will fail with
  1474. C{ConnectError}.
  1475. """
  1476. expectedError = error.ConnectError(string="Connection Failed")
  1477. mreactor = RaisingMemoryReactorWithClock(
  1478. connectException=expectedError)
  1479. clientFactory = object()
  1480. ep, ignoredArgs, ignoredDest = self.createClientEndpoint(
  1481. mreactor, clientFactory)
  1482. d = ep.connect(clientFactory)
  1483. mreactor.advance(endpoints.HostnameEndpoint._DEFAULT_ATTEMPT_DELAY)
  1484. self.assertEqual(self.failureResultOf(d).value, expectedError)
  1485. self.assertEqual([], mreactor.getDelayedCalls())
  1486. def test_deprecation(self):
  1487. """
  1488. Instantiating L{HostnameEndpoint} with a reactor that does not
  1489. provide L{IReactorPluggableResolver} emits a deprecation warning.
  1490. """
  1491. mreactor = MemoryReactor()
  1492. clientFactory = object()
  1493. self.createClientEndpoint(mreactor, clientFactory)
  1494. warnings = self.flushWarnings()
  1495. self.assertEqual(1, len(warnings))
  1496. self.assertIs(DeprecationWarning, warnings[0]['category'])
  1497. self.assertTrue(warnings[0]['message'].startswith(
  1498. 'Passing HostnameEndpoint a reactor that does not provide'
  1499. ' IReactorPluggableNameResolver'
  1500. ' (twisted.test.proto_helpers.MemoryReactorClock)'
  1501. ' was deprecated in Twisted 17.5.0;'
  1502. ' please use a reactor that provides'
  1503. ' IReactorPluggableNameResolver instead'))
  1504. def test_errorsLogged(self):
  1505. """
  1506. Hostname resolution errors are logged.
  1507. """
  1508. mreactor = MemoryReactor()
  1509. clientFactory = object()
  1510. ep, ignoredArgs, ignoredDest = self.createClientEndpoint(
  1511. mreactor, clientFactory)
  1512. def getaddrinfoThatFails(*args, **kwargs):
  1513. raise gaierror(-5, 'No address associated with hostname')
  1514. ep._getaddrinfo = getaddrinfoThatFails
  1515. d = ep.connect(clientFactory)
  1516. self.assertIsInstance(self.failureResultOf(d).value,
  1517. error.DNSLookupError)
  1518. self.assertEqual(1, len(self.flushLoggedErrors(gaierror)))
  1519. class HostnameEndpointMemoryIPv4ReactorTests(
  1520. _HostnameEndpointMemoryReactorMixin, unittest.TestCase):
  1521. """
  1522. IPv4 resolution tests for L{HostnameEndpoint} with
  1523. L{MemoryReactor} subclasses that do not provide
  1524. L{IReactorPluggableNameResolver}.
  1525. """
  1526. def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
  1527. """
  1528. Creates a L{HostnameEndpoint} instance where the hostname is
  1529. resolved into a single IPv4 address.
  1530. @param reactor: The L{MemoryReactor}
  1531. @param clientFactory: The client L{IProtocolFactory}
  1532. @param connectArgs: Additional arguments to
  1533. L{HostnameEndpoint.connect}
  1534. @return: A L{tuple} of the form C{(endpoint, (expectedAddress,
  1535. expectedPort, clientFactory, timeout, localBindAddress,
  1536. hostnameAddress))}
  1537. """
  1538. expectedAddress = '1.2.3.4'
  1539. address = HostnameAddress(b"example.com", 80)
  1540. endpoint = endpoints.HostnameEndpoint(
  1541. reactor, b"example.com", address.port, **connectArgs
  1542. )
  1543. def fakegetaddrinfo(host, port, family, socktype):
  1544. return [
  1545. (AF_INET, SOCK_STREAM, IPPROTO_TCP, '', (expectedAddress, 80)),
  1546. ]
  1547. endpoint._getaddrinfo = fakegetaddrinfo
  1548. endpoint._deferToThread = self.synchronousDeferredToThread
  1549. return (endpoint, (expectedAddress, address.port, clientFactory,
  1550. connectArgs.get('timeout', 30),
  1551. connectArgs.get('bindAddress', None)),
  1552. address)
  1553. class HostnameEndpointMemoryIPv6ReactorTests(
  1554. _HostnameEndpointMemoryReactorMixin, unittest.TestCase):
  1555. """
  1556. IPv6 resolution tests for L{HostnameEndpoint} with
  1557. L{MemoryReactor} subclasses that do not provide
  1558. L{IReactorPluggableNameResolver}.
  1559. """
  1560. def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
  1561. """
  1562. Creates a L{HostnameEndpoint} instance where the hostname is
  1563. resolved into a single IPv6 address.
  1564. @param reactor: The L{MemoryReactor}
  1565. @param clientFactory: The client L{IProtocolFactory}
  1566. @param connectArgs: Additional arguments to
  1567. L{HostnameEndpoint.connect}
  1568. @return: A L{tuple} of the form C{(endpoint, (expectedAddress,
  1569. expectedPort, clientFactory, timeout, localBindAddress,
  1570. hostnameAddress))}
  1571. """
  1572. expectedAddress = '1:2::3:4'
  1573. address = HostnameAddress(b"ipv6.example.com", 80)
  1574. endpoint = endpoints.HostnameEndpoint(
  1575. reactor, b"ipv6.example.com", address.port, **connectArgs
  1576. )
  1577. def fakegetaddrinfo(host, port, family, socktype):
  1578. return [
  1579. (AF_INET6, SOCK_STREAM, IPPROTO_TCP, '',
  1580. (expectedAddress, 80)),
  1581. ]
  1582. endpoint._getaddrinfo = fakegetaddrinfo
  1583. endpoint._deferToThread = self.synchronousDeferredToThread
  1584. return (endpoint, (expectedAddress, address.port, clientFactory,
  1585. connectArgs.get('timeout', 30),
  1586. connectArgs.get('bindAddress', None)),
  1587. address)
  1588. class HostnameEndpointsOneIPv4Tests(ClientEndpointTestCaseMixin,
  1589. unittest.TestCase):
  1590. """
  1591. Tests for the hostname based endpoints when GAI returns only one
  1592. (IPv4) address.
  1593. """
  1594. def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
  1595. """
  1596. Creates a L{HostnameEndpoint} instance where the hostname is resolved
  1597. into a single IPv4 address.
  1598. """
  1599. expectedAddress = '1.2.3.4'
  1600. address = HostnameAddress(b"example.com", 80)
  1601. endpoint = endpoints.HostnameEndpoint(
  1602. deterministicResolvingReactor(reactor, [expectedAddress]),
  1603. b"example.com", address.port, **connectArgs
  1604. )
  1605. return (endpoint, (expectedAddress, address.port, clientFactory,
  1606. connectArgs.get('timeout', 30),
  1607. connectArgs.get('bindAddress', None)),
  1608. address)
  1609. def expectedClients(self, reactor):
  1610. """
  1611. @return: List of calls to L{IReactorTCP.connectTCP}
  1612. """
  1613. return reactor.tcpClients
  1614. def assertConnectArgs(self, receivedArgs, expectedArgs):
  1615. """
  1616. Compare host, port, timeout, and bindAddress in C{receivedArgs}
  1617. to C{expectedArgs}. We ignore the factory because we don't
  1618. only care what protocol comes out of the
  1619. C{IStreamClientEndpoint.connect} call.
  1620. @param receivedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1621. C{timeout}, C{bindAddress}) that was passed to
  1622. L{IReactorTCP.connectTCP}.
  1623. @param expectedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1624. C{timeout}, C{bindAddress}) that we expect to have been passed
  1625. to L{IReactorTCP.connectTCP}.
  1626. """
  1627. (host, port, ignoredFactory, timeout, bindAddress) = receivedArgs
  1628. (expectedHost, expectedPort, _ignoredFactory,
  1629. expectedTimeout, expectedBindAddress) = expectedArgs
  1630. self.assertEqual(host, expectedHost)
  1631. self.assertEqual(port, expectedPort)
  1632. self.assertEqual(timeout, expectedTimeout)
  1633. self.assertEqual(bindAddress, expectedBindAddress)
  1634. def connectArgs(self):
  1635. """
  1636. @return: C{dict} of keyword arguments to pass to connect.
  1637. """
  1638. return {'timeout': 10, 'bindAddress': ('localhost', 49595)}
  1639. def test_endpointConnectingCancelled(self, advance=None):
  1640. """
  1641. Calling L{Deferred.cancel} on the L{Deferred} returned from
  1642. L{IStreamClientEndpoint.connect} will cause it to be errbacked with a
  1643. L{ConnectingCancelledError} exception.
  1644. """
  1645. mreactor = MemoryReactor()
  1646. clientFactory = protocol.Factory()
  1647. clientFactory.protocol = protocol.Protocol
  1648. ep, ignoredArgs, address = self.createClientEndpoint(
  1649. mreactor, clientFactory)
  1650. d = ep.connect(clientFactory)
  1651. if advance is not None:
  1652. mreactor.advance(advance)
  1653. d.cancel()
  1654. # When canceled, the connector will immediately notify its factory that
  1655. # the connection attempt has failed due to a UserError.
  1656. attemptFactory = self.retrieveConnectedFactory(mreactor)
  1657. attemptFactory.clientConnectionFailed(None, Failure(error.UserError()))
  1658. # This should be a feature of MemoryReactor: <http://tm.tl/5630>.
  1659. failure = self.failureResultOf(d)
  1660. self.assertIsInstance(failure.value, error.ConnectingCancelledError)
  1661. self.assertEqual(failure.value.address, address)
  1662. self.assertTrue(mreactor.tcpClients[0][2]._connector.stoppedConnecting)
  1663. self.assertEqual([], mreactor.getDelayedCalls())
  1664. def test_endpointConnectingCancelledAfterAllAttemptsStarted(self):
  1665. """
  1666. Calling L{Deferred.cancel} on the L{Deferred} returned from
  1667. L{IStreamClientEndpoint.connect} after enough time has passed that all
  1668. connection attempts have been initiated will cause it to be errbacked
  1669. with a L{ConnectingCancelledError} exception.
  1670. """
  1671. oneBetween = endpoints.HostnameEndpoint._DEFAULT_ATTEMPT_DELAY
  1672. advance = oneBetween + (oneBetween / 2.0)
  1673. self.test_endpointConnectingCancelled(advance=advance)
  1674. def test_endpointConnectFailure(self):
  1675. """
  1676. If L{HostnameEndpoint.connect} is invoked and there is no server
  1677. listening for connections, the returned L{Deferred} will fail with
  1678. C{ConnectError}.
  1679. """
  1680. expectedError = error.ConnectError(string="Connection Failed")
  1681. mreactor = RaisingMemoryReactorWithClock(
  1682. connectException=expectedError)
  1683. clientFactory = object()
  1684. ep, ignoredArgs, ignoredDest = self.createClientEndpoint(
  1685. mreactor, clientFactory)
  1686. d = ep.connect(clientFactory)
  1687. mreactor.advance(endpoints.HostnameEndpoint._DEFAULT_ATTEMPT_DELAY)
  1688. self.assertEqual(self.failureResultOf(d).value, expectedError)
  1689. self.assertEqual([], mreactor.getDelayedCalls())
  1690. def test_endpointConnectFailureAfterIteration(self):
  1691. """
  1692. If a connection attempt initiated by
  1693. L{HostnameEndpoint.connect} fails only after
  1694. L{HostnameEndpoint} has exhausted the list of possible server
  1695. addresses, the returned L{Deferred} will fail with
  1696. C{ConnectError}.
  1697. """
  1698. expectedError = error.ConnectError(string="Connection Failed")
  1699. mreactor = MemoryReactor()
  1700. clientFactory = object()
  1701. ep, ignoredArgs, ignoredDest = self.createClientEndpoint(
  1702. mreactor, clientFactory)
  1703. d = ep.connect(clientFactory)
  1704. mreactor.advance(0.3)
  1705. host, port, factory, timeout, bindAddress = mreactor.tcpClients[0]
  1706. factory.clientConnectionFailed(mreactor.connectors[0], expectedError)
  1707. self.assertEqual(self.failureResultOf(d).value, expectedError)
  1708. self.assertEqual([], mreactor.getDelayedCalls())
  1709. def test_endpointConnectSuccessAfterIteration(self):
  1710. """
  1711. If a connection attempt initiated by
  1712. L{HostnameEndpoint.connect} succeeds only after
  1713. L{HostnameEndpoint} has exhausted the list of possible server
  1714. addresses, the returned L{Deferred} will fire with the
  1715. connected protocol instance and the endpoint will leave no
  1716. delayed calls in the reactor.
  1717. """
  1718. proto = object()
  1719. mreactor = MemoryReactor()
  1720. clientFactory = object()
  1721. ep, expectedArgs, ignoredDest = self.createClientEndpoint(
  1722. mreactor, clientFactory)
  1723. d = ep.connect(clientFactory)
  1724. receivedProtos = []
  1725. def checkProto(p):
  1726. receivedProtos.append(p)
  1727. d.addCallback(checkProto)
  1728. factory = self.retrieveConnectedFactory(mreactor)
  1729. mreactor.advance(0.3)
  1730. factory._onConnection.callback(proto)
  1731. self.assertEqual(receivedProtos, [proto])
  1732. expectedClients = self.expectedClients(mreactor)
  1733. self.assertEqual(len(expectedClients), 1)
  1734. self.assertConnectArgs(expectedClients[0], expectedArgs)
  1735. self.assertEqual([], mreactor.getDelayedCalls())
  1736. class HostnameEndpointsOneIPv6Tests(ClientEndpointTestCaseMixin,
  1737. unittest.TestCase):
  1738. """
  1739. Tests for the hostname based endpoints when GAI returns only one
  1740. (IPv6) address.
  1741. """
  1742. def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
  1743. """
  1744. Creates a L{HostnameEndpoint} instance where the hostname is resolved
  1745. into a single IPv6 address.
  1746. """
  1747. address = HostnameAddress(b"ipv6.example.com", 80)
  1748. endpoint = endpoints.HostnameEndpoint(
  1749. deterministicResolvingReactor(reactor, ['1:2::3:4']),
  1750. b"ipv6.example.com", address.port, **connectArgs
  1751. )
  1752. return (endpoint, ('1:2::3:4', address.port, clientFactory,
  1753. connectArgs.get('timeout', 30),
  1754. connectArgs.get('bindAddress', None)),
  1755. address)
  1756. def expectedClients(self, reactor):
  1757. """
  1758. @return: List of calls to L{IReactorTCP.connectTCP}
  1759. """
  1760. return reactor.tcpClients
  1761. def assertConnectArgs(self, receivedArgs, expectedArgs):
  1762. """
  1763. Compare host, port, timeout, and bindAddress in C{receivedArgs}
  1764. to C{expectedArgs}. We ignore the factory because we don't
  1765. only care what protocol comes out of the
  1766. C{IStreamClientEndpoint.connect} call.
  1767. @param receivedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1768. C{timeout}, C{bindAddress}) that was passed to
  1769. L{IReactorTCP.connectTCP}.
  1770. @param expectedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  1771. C{timeout}, C{bindAddress}) that we expect to have been passed
  1772. to L{IReactorTCP.connectTCP}.
  1773. """
  1774. (host, port, ignoredFactory, timeout, bindAddress) = receivedArgs
  1775. (expectedHost, expectedPort, _ignoredFactory,
  1776. expectedTimeout, expectedBindAddress) = expectedArgs
  1777. self.assertEqual(host, expectedHost)
  1778. self.assertEqual(port, expectedPort)
  1779. self.assertEqual(timeout, expectedTimeout)
  1780. self.assertEqual(bindAddress, expectedBindAddress)
  1781. def connectArgs(self):
  1782. """
  1783. @return: C{dict} of keyword arguments to pass to connect.
  1784. """
  1785. return {'timeout': 10, 'bindAddress': ('localhost', 49595)}
  1786. def test_endpointConnectingCancelled(self):
  1787. """
  1788. Calling L{Deferred.cancel} on the L{Deferred} returned from
  1789. L{IStreamClientEndpoint.connect} is errbacked with an expected
  1790. L{ConnectingCancelledError} exception.
  1791. """
  1792. mreactor = MemoryReactor()
  1793. clientFactory = protocol.Factory()
  1794. clientFactory.protocol = protocol.Protocol
  1795. ep, ignoredArgs, address = self.createClientEndpoint(
  1796. deterministicResolvingReactor(mreactor, ['127.0.0.1']),
  1797. clientFactory
  1798. )
  1799. d = ep.connect(clientFactory)
  1800. d.cancel()
  1801. # When canceled, the connector will immediately notify its factory that
  1802. # the connection attempt has failed due to a UserError.
  1803. attemptFactory = self.retrieveConnectedFactory(mreactor)
  1804. attemptFactory.clientConnectionFailed(None, Failure(error.UserError()))
  1805. # This should be a feature of MemoryReactor: <http://tm.tl/5630>.
  1806. failure = self.failureResultOf(d)
  1807. self.assertIsInstance(failure.value, error.ConnectingCancelledError)
  1808. self.assertEqual(failure.value.address, address)
  1809. self.assertTrue(mreactor.tcpClients[0][2]._connector.stoppedConnecting)
  1810. self.assertEqual([], mreactor.getDelayedCalls())
  1811. def test_endpointConnectFailure(self):
  1812. """
  1813. If an endpoint tries to connect to a non-listening port it gets
  1814. a C{ConnectError} failure.
  1815. """
  1816. expectedError = error.ConnectError(string="Connection Failed")
  1817. mreactor = RaisingMemoryReactorWithClock(connectException=expectedError)
  1818. clientFactory = object()
  1819. ep, ignoredArgs, ignoredDest = self.createClientEndpoint(
  1820. mreactor, clientFactory)
  1821. d = ep.connect(clientFactory)
  1822. mreactor.advance(0.3)
  1823. self.assertEqual(self.failureResultOf(d).value, expectedError)
  1824. self.assertEqual([], mreactor.getDelayedCalls())
  1825. class HostnameEndpointIDNATests(unittest.SynchronousTestCase):
  1826. """
  1827. Tests for L{HostnameEndpoint}'s constructor's encoding behavior.
  1828. """
  1829. sampleIDNAText = u'b\xfccher.ch'
  1830. sampleIDNABytes = b'xn--bcher-kva.ch'
  1831. def test_idnaHostnameText(self):
  1832. """
  1833. A L{HostnameEndpoint} constructed with text will contain an
  1834. IDNA-encoded bytes representation of that text.
  1835. """
  1836. endpoint = endpoints.HostnameEndpoint(
  1837. deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
  1838. self.sampleIDNAText, 80
  1839. )
  1840. self.assertEqual(endpoint._hostBytes, self.sampleIDNABytes)
  1841. self.assertEqual(endpoint._hostText, self.sampleIDNAText)
  1842. def test_idnaHostnameBytes(self):
  1843. """
  1844. A L{HostnameEndpoint} constructed with bytes will contain an
  1845. IDNA-decoded textual representation of those bytes.
  1846. """
  1847. endpoint = endpoints.HostnameEndpoint(
  1848. deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
  1849. self.sampleIDNAText, 80
  1850. )
  1851. self.assertEqual(endpoint._hostBytes, self.sampleIDNABytes)
  1852. self.assertEqual(endpoint._hostText, self.sampleIDNAText)
  1853. def test_nonNormalizedText(self):
  1854. """
  1855. A L{HostnameEndpoint} constructed with NFD-normalized text will store
  1856. the NFC-normalized version of that text.
  1857. """
  1858. endpoint = endpoints.HostnameEndpoint(
  1859. deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
  1860. normalize('NFD', self.sampleIDNAText), 80
  1861. )
  1862. self.assertEqual(endpoint._hostBytes, self.sampleIDNABytes)
  1863. self.assertEqual(endpoint._hostText, self.sampleIDNAText)
  1864. def test_deferBadEncodingToConnect(self):
  1865. """
  1866. Since any client of L{IStreamClientEndpoint} needs to handle Deferred
  1867. failures from C{connect}, L{HostnameEndpoint}'s constructor will not
  1868. raise exceptions when given bad host names, instead deferring to
  1869. returning a failing L{Deferred} from C{connect}.
  1870. """
  1871. endpoint = endpoints.HostnameEndpoint(
  1872. deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
  1873. b'\xff-garbage-\xff', 80
  1874. )
  1875. deferred = endpoint.connect(Factory.forProtocol(Protocol))
  1876. err = self.failureResultOf(deferred, ValueError)
  1877. self.assertIn("\\xff-garbage-\\xff", str(err))
  1878. endpoint = endpoints.HostnameEndpoint(
  1879. deterministicResolvingReactor(MemoryReactor(), ['127.0.0.1']),
  1880. u'\u2ff0-garbage-\u2ff0', 80
  1881. )
  1882. deferred = endpoint.connect(Factory())
  1883. err = self.failureResultOf(deferred, ValueError)
  1884. self.assertIn("\\u2ff0-garbage-\\u2ff0", str(err))
  1885. class HostnameEndpointsGAIFailureTests(unittest.TestCase):
  1886. """
  1887. Tests for the hostname based endpoints when GAI returns no address.
  1888. """
  1889. def test_failure(self):
  1890. """
  1891. If no address is returned by GAI for a hostname, the connection attempt
  1892. fails with L{error.DNSLookupError}.
  1893. """
  1894. endpoint = endpoints.HostnameEndpoint(
  1895. deterministicResolvingReactor(Clock(), []),
  1896. b"example.com", 80
  1897. )
  1898. clientFactory = object()
  1899. dConnect = endpoint.connect(clientFactory)
  1900. exc = self.failureResultOf(dConnect, error.DNSLookupError)
  1901. self.assertIn("example.com", str(exc))
  1902. class HostnameEndpointsFasterConnectionTests(unittest.TestCase):
  1903. """
  1904. Tests for the hostname based endpoints when gai returns an IPv4 and
  1905. an IPv6 address, and one connection takes less time than the other.
  1906. """
  1907. def setUp(self):
  1908. self.mreactor = MemoryReactor()
  1909. self.endpoint = endpoints.HostnameEndpoint(
  1910. deterministicResolvingReactor(self.mreactor,
  1911. ['1.2.3.4', '1:2::3:4']),
  1912. b"www.example.com", 80)
  1913. def test_ignoreUnknownAddressTypes(self):
  1914. """
  1915. If an address type other than L{IPv4Address} and L{IPv6Address} is
  1916. returned by on address resolution, the endpoint ignores that address.
  1917. """
  1918. self.mreactor = MemoryReactor()
  1919. self.endpoint = endpoints.HostnameEndpoint(
  1920. deterministicResolvingReactor(self.mreactor, ['1.2.3.4', object(),
  1921. '1:2::3:4']),
  1922. b"www.example.com", 80
  1923. )
  1924. clientFactory = None
  1925. self.endpoint.connect(clientFactory)
  1926. self.mreactor.advance(0.3)
  1927. (host, port, factory, timeout, bindAddress) = self.mreactor.tcpClients[1]
  1928. self.assertEqual(len(self.mreactor.tcpClients), 2)
  1929. self.assertEqual(host, '1:2::3:4')
  1930. self.assertEqual(port, 80)
  1931. def test_IPv4IsFaster(self):
  1932. """
  1933. The endpoint returns a connection to the IPv4 address.
  1934. IPv4 ought to be the first attempt, since nameResolution (standing in
  1935. for GAI here) returns it first. The IPv4 attempt succeeds, the
  1936. connection is established, and a Deferred fires with the protocol
  1937. constructed.
  1938. """
  1939. clientFactory = protocol.Factory()
  1940. clientFactory.protocol = protocol.Protocol
  1941. d = self.endpoint.connect(clientFactory)
  1942. results = []
  1943. d.addCallback(results.append)
  1944. (host, port, factory, timeout, bindAddress) = self.mreactor.tcpClients[0]
  1945. self.assertEqual(host, '1.2.3.4')
  1946. self.assertEqual(port, 80)
  1947. proto = factory.buildProtocol((host, port))
  1948. fakeTransport = object()
  1949. self.assertEqual(results, [])
  1950. proto.makeConnection(fakeTransport)
  1951. self.assertEqual(len(results), 1)
  1952. self.assertEqual(results[0].factory, clientFactory)
  1953. self.assertEqual([], self.mreactor.getDelayedCalls())
  1954. def test_IPv6IsFaster(self):
  1955. """
  1956. The endpoint returns a connection to the IPv6 address.
  1957. IPv6 ought to be the second attempt, since nameResolution (standing in
  1958. for GAI here) returns it second. The IPv6 attempt succeeds, a
  1959. connection is established, and a Deferred fires with the protocol
  1960. constructed.
  1961. """
  1962. clientFactory = protocol.Factory()
  1963. clientFactory.protocol = protocol.Protocol
  1964. d = self.endpoint.connect(clientFactory)
  1965. results = []
  1966. d.addCallback(results.append)
  1967. self.mreactor.advance(0.3)
  1968. (host, port, factory, timeout, bindAddress) = self.mreactor.tcpClients[1]
  1969. self.assertEqual(host, '1:2::3:4')
  1970. self.assertEqual(port, 80)
  1971. proto = factory.buildProtocol((host, port))
  1972. fakeTransport = object()
  1973. self.assertEqual(results, [])
  1974. proto.makeConnection(fakeTransport)
  1975. self.assertEqual(len(results), 1)
  1976. self.assertEqual(results[0].factory, clientFactory)
  1977. self.assertEqual([], self.mreactor.getDelayedCalls())
  1978. def test_otherConnectionsCancelled(self):
  1979. """
  1980. Once the endpoint returns a successful connection, all the other
  1981. pending connections are cancelled.
  1982. Here, the second connection attempt, i.e. IPv6, succeeds, and the
  1983. pending first attempt, i.e. IPv4, is cancelled.
  1984. """
  1985. clientFactory = protocol.Factory()
  1986. clientFactory.protocol = protocol.Protocol
  1987. d = self.endpoint.connect(clientFactory)
  1988. results = []
  1989. d.addCallback(results.append)
  1990. self.mreactor.advance(0.3)
  1991. (host, port, factory, timeout, bindAddress) = self.mreactor.tcpClients[1]
  1992. proto = factory.buildProtocol((host, port))
  1993. fakeTransport = object()
  1994. proto.makeConnection(fakeTransport)
  1995. self.assertEqual(True,
  1996. self.mreactor.tcpClients[0][2]._connector.stoppedConnecting)
  1997. self.assertEqual([], self.mreactor.getDelayedCalls())
  1998. class SSL4EndpointsTests(EndpointTestCaseMixin,
  1999. unittest.TestCase):
  2000. """
  2001. Tests for SSL Endpoints.
  2002. """
  2003. if skipSSL:
  2004. skip = skipSSL
  2005. def expectedServers(self, reactor):
  2006. """
  2007. @return: List of calls to L{IReactorSSL.listenSSL}
  2008. """
  2009. return reactor.sslServers
  2010. def expectedClients(self, reactor):
  2011. """
  2012. @return: List of calls to L{IReactorSSL.connectSSL}
  2013. """
  2014. return reactor.sslClients
  2015. def assertConnectArgs(self, receivedArgs, expectedArgs):
  2016. """
  2017. Compare host, port, contextFactory, timeout, and bindAddress in
  2018. C{receivedArgs} to C{expectedArgs}. We ignore the factory because we
  2019. don't only care what protocol comes out of the
  2020. C{IStreamClientEndpoint.connect} call.
  2021. @param receivedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  2022. C{contextFactory}, C{timeout}, C{bindAddress}) that was passed to
  2023. L{IReactorSSL.connectSSL}.
  2024. @param expectedArgs: C{tuple} of (C{host}, C{port}, C{factory},
  2025. C{contextFactory}, C{timeout}, C{bindAddress}) that we expect to
  2026. have been passed to L{IReactorSSL.connectSSL}.
  2027. """
  2028. (host, port, ignoredFactory, contextFactory, timeout,
  2029. bindAddress) = receivedArgs
  2030. (expectedHost, expectedPort, _ignoredFactory, expectedContextFactory,
  2031. expectedTimeout, expectedBindAddress) = expectedArgs
  2032. self.assertEqual(host, expectedHost)
  2033. self.assertEqual(port, expectedPort)
  2034. self.assertEqual(contextFactory, expectedContextFactory)
  2035. self.assertEqual(timeout, expectedTimeout)
  2036. self.assertEqual(bindAddress, expectedBindAddress)
  2037. def connectArgs(self):
  2038. """
  2039. @return: C{dict} of keyword arguments to pass to connect.
  2040. """
  2041. return {'timeout': 10, 'bindAddress': ('localhost', 49595)}
  2042. def listenArgs(self):
  2043. """
  2044. @return: C{dict} of keyword arguments to pass to listen
  2045. """
  2046. return {'backlog': 100, 'interface': '127.0.0.1'}
  2047. def setUp(self):
  2048. """
  2049. Set up client and server SSL contexts for use later.
  2050. """
  2051. self.sKey, self.sCert = makeCertificate(
  2052. O="Server Test Certificate",
  2053. CN="server")
  2054. self.cKey, self.cCert = makeCertificate(
  2055. O="Client Test Certificate",
  2056. CN="client")
  2057. self.serverSSLContext = CertificateOptions(
  2058. privateKey=self.sKey,
  2059. certificate=self.sCert,
  2060. requireCertificate=False)
  2061. self.clientSSLContext = CertificateOptions(
  2062. requireCertificate=False)
  2063. def createServerEndpoint(self, reactor, factory, **listenArgs):
  2064. """
  2065. Create an L{SSL4ServerEndpoint} and return the tools to verify its
  2066. behaviour.
  2067. @param factory: The thing that we expect to be passed to our
  2068. L{IStreamServerEndpoint.listen} implementation.
  2069. @param reactor: A fake L{IReactorSSL} that L{SSL4ServerEndpoint} can
  2070. call L{IReactorSSL.listenSSL} on.
  2071. @param listenArgs: Optional dictionary of arguments to
  2072. L{IReactorSSL.listenSSL}.
  2073. """
  2074. address = IPv4Address("TCP", "0.0.0.0", 0)
  2075. return (endpoints.SSL4ServerEndpoint(reactor,
  2076. address.port,
  2077. self.serverSSLContext,
  2078. **listenArgs),
  2079. (address.port, factory, self.serverSSLContext,
  2080. listenArgs.get('backlog', 50),
  2081. listenArgs.get('interface', '')),
  2082. address)
  2083. def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
  2084. """
  2085. Create an L{SSL4ClientEndpoint} and return the values needed to verify
  2086. its behaviour.
  2087. @param reactor: A fake L{IReactorSSL} that L{SSL4ClientEndpoint} can
  2088. call L{IReactorSSL.connectSSL} on.
  2089. @param clientFactory: The thing that we expect to be passed to our
  2090. L{IStreamClientEndpoint.connect} implementation.
  2091. @param connectArgs: Optional dictionary of arguments to
  2092. L{IReactorSSL.connectSSL}
  2093. """
  2094. address = IPv4Address("TCP", "localhost", 80)
  2095. if connectArgs is None:
  2096. connectArgs = {}
  2097. return (endpoints.SSL4ClientEndpoint(reactor,
  2098. address.host,
  2099. address.port,
  2100. self.clientSSLContext,
  2101. **connectArgs),
  2102. (address.host, address.port, clientFactory,
  2103. self.clientSSLContext,
  2104. connectArgs.get('timeout', 30),
  2105. connectArgs.get('bindAddress', None)),
  2106. address)
  2107. class UNIXEndpointsTests(EndpointTestCaseMixin,
  2108. unittest.TestCase):
  2109. """
  2110. Tests for UnixSocket Endpoints.
  2111. """
  2112. def retrieveConnectedFactory(self, reactor):
  2113. """
  2114. Override L{EndpointTestCaseMixin.retrieveConnectedFactory} to account
  2115. for different index of 'factory' in C{connectUNIX} args.
  2116. """
  2117. return self.expectedClients(reactor)[0][1]
  2118. def expectedServers(self, reactor):
  2119. """
  2120. @return: List of calls to L{IReactorUNIX.listenUNIX}
  2121. """
  2122. return reactor.unixServers
  2123. def expectedClients(self, reactor):
  2124. """
  2125. @return: List of calls to L{IReactorUNIX.connectUNIX}
  2126. """
  2127. return reactor.unixClients
  2128. def assertConnectArgs(self, receivedArgs, expectedArgs):
  2129. """
  2130. Compare path, timeout, checkPID in C{receivedArgs} to C{expectedArgs}.
  2131. We ignore the factory because we don't only care what protocol comes
  2132. out of the C{IStreamClientEndpoint.connect} call.
  2133. @param receivedArgs: C{tuple} of (C{path}, C{timeout}, C{checkPID})
  2134. that was passed to L{IReactorUNIX.connectUNIX}.
  2135. @param expectedArgs: C{tuple} of (C{path}, C{timeout}, C{checkPID})
  2136. that we expect to have been passed to L{IReactorUNIX.connectUNIX}.
  2137. """
  2138. (path, ignoredFactory, timeout, checkPID) = receivedArgs
  2139. (expectedPath, _ignoredFactory, expectedTimeout,
  2140. expectedCheckPID) = expectedArgs
  2141. self.assertEqual(path, expectedPath)
  2142. self.assertEqual(timeout, expectedTimeout)
  2143. self.assertEqual(checkPID, expectedCheckPID)
  2144. def connectArgs(self):
  2145. """
  2146. @return: C{dict} of keyword arguments to pass to connect.
  2147. """
  2148. return {'timeout': 10, 'checkPID': 1}
  2149. def listenArgs(self):
  2150. """
  2151. @return: C{dict} of keyword arguments to pass to listen
  2152. """
  2153. return {'backlog': 100, 'mode': 0o600, 'wantPID': 1}
  2154. def createServerEndpoint(self, reactor, factory, **listenArgs):
  2155. """
  2156. Create an L{UNIXServerEndpoint} and return the tools to verify its
  2157. behaviour.
  2158. @param reactor: A fake L{IReactorUNIX} that L{UNIXServerEndpoint} can
  2159. call L{IReactorUNIX.listenUNIX} on.
  2160. @param factory: The thing that we expect to be passed to our
  2161. L{IStreamServerEndpoint.listen} implementation.
  2162. @param listenArgs: Optional dictionary of arguments to
  2163. L{IReactorUNIX.listenUNIX}.
  2164. """
  2165. address = UNIXAddress(self.mktemp())
  2166. return (endpoints.UNIXServerEndpoint(reactor, address.name,
  2167. **listenArgs),
  2168. (address.name, factory,
  2169. listenArgs.get('backlog', 50),
  2170. listenArgs.get('mode', 0o666),
  2171. listenArgs.get('wantPID', 0)),
  2172. address)
  2173. def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
  2174. """
  2175. Create an L{UNIXClientEndpoint} and return the values needed to verify
  2176. its behaviour.
  2177. @param reactor: A fake L{IReactorUNIX} that L{UNIXClientEndpoint} can
  2178. call L{IReactorUNIX.connectUNIX} on.
  2179. @param clientFactory: The thing that we expect to be passed to our
  2180. L{IStreamClientEndpoint.connect} implementation.
  2181. @param connectArgs: Optional dictionary of arguments to
  2182. L{IReactorUNIX.connectUNIX}
  2183. """
  2184. address = UNIXAddress(self.mktemp())
  2185. return (endpoints.UNIXClientEndpoint(reactor, address.name,
  2186. **connectArgs),
  2187. (address.name, clientFactory,
  2188. connectArgs.get('timeout', 30),
  2189. connectArgs.get('checkPID', 0)),
  2190. address)
  2191. class ParserTests(unittest.TestCase):
  2192. """
  2193. Tests for L{endpoints._parseServer}, the low-level parsing logic.
  2194. """
  2195. f = "Factory"
  2196. def parse(self, *a, **kw):
  2197. """
  2198. Provide a hook for test_strports to substitute the deprecated API.
  2199. """
  2200. return endpoints._parseServer(*a, **kw)
  2201. def test_simpleTCP(self):
  2202. """
  2203. Simple strings with a 'tcp:' prefix should be parsed as TCP.
  2204. """
  2205. self.assertEqual(
  2206. self.parse('tcp:80', self.f),
  2207. ('TCP', (80, self.f), {'interface': '', 'backlog': 50}))
  2208. def test_interfaceTCP(self):
  2209. """
  2210. TCP port descriptions parse their 'interface' argument as a string.
  2211. """
  2212. self.assertEqual(
  2213. self.parse('tcp:80:interface=127.0.0.1', self.f),
  2214. ('TCP', (80, self.f), {'interface': '127.0.0.1', 'backlog': 50}))
  2215. def test_backlogTCP(self):
  2216. """
  2217. TCP port descriptions parse their 'backlog' argument as an integer.
  2218. """
  2219. self.assertEqual(
  2220. self.parse('tcp:80:backlog=6', self.f),
  2221. ('TCP', (80, self.f), {'interface': '', 'backlog': 6}))
  2222. def test_simpleUNIX(self):
  2223. """
  2224. L{endpoints._parseServer} returns a C{'UNIX'} port description with
  2225. defaults for C{'mode'}, C{'backlog'}, and C{'wantPID'} when passed a
  2226. string with the C{'unix:'} prefix and no other parameter values.
  2227. """
  2228. self.assertEqual(
  2229. self.parse('unix:/var/run/finger', self.f),
  2230. ('UNIX', ('/var/run/finger', self.f),
  2231. {'mode': 0o666, 'backlog': 50, 'wantPID': True}))
  2232. def test_modeUNIX(self):
  2233. """
  2234. C{mode} can be set by including C{"mode=<some integer>"}.
  2235. """
  2236. self.assertEqual(
  2237. self.parse('unix:/var/run/finger:mode=0660', self.f),
  2238. ('UNIX', ('/var/run/finger', self.f),
  2239. {'mode': 0o660, 'backlog': 50, 'wantPID': True}))
  2240. def test_wantPIDUNIX(self):
  2241. """
  2242. C{wantPID} can be set to false by included C{"lockfile=0"}.
  2243. """
  2244. self.assertEqual(
  2245. self.parse('unix:/var/run/finger:lockfile=0', self.f),
  2246. ('UNIX', ('/var/run/finger', self.f),
  2247. {'mode': 0o666, 'backlog': 50, 'wantPID': False}))
  2248. def test_escape(self):
  2249. """
  2250. Backslash can be used to escape colons and backslashes in port
  2251. descriptions.
  2252. """
  2253. self.assertEqual(
  2254. self.parse('unix:foo\x5c:bar\x5c=baz\x5c:qux\x5c\x5c', self.f),
  2255. ('UNIX', ('foo:bar=baz:qux\x5c', self.f),
  2256. {'mode': 0o666, 'backlog': 50, 'wantPID': True}))
  2257. def test_quoteStringArgument(self):
  2258. """
  2259. L{endpoints.quoteStringArgument} should quote backslashes and colons
  2260. for interpolation into L{endpoints.serverFromString} and
  2261. L{endpoints.clientFactory} arguments.
  2262. """
  2263. self.assertEqual(endpoints.quoteStringArgument("some : stuff \x5c"),
  2264. "some \x5c: stuff \x5c\x5c")
  2265. def test_impliedEscape(self):
  2266. """
  2267. In strports descriptions, '=' in a parameter value does not need to be
  2268. quoted; it will simply be parsed as part of the value.
  2269. """
  2270. self.assertEqual(
  2271. self.parse(r'unix:address=foo=bar', self.f),
  2272. ('UNIX', ('foo=bar', self.f),
  2273. {'mode': 0o666, 'backlog': 50, 'wantPID': True}))
  2274. def test_unknownType(self):
  2275. """
  2276. L{strports.parse} raises C{ValueError} when given an unknown endpoint
  2277. type.
  2278. """
  2279. self.assertRaises(ValueError, self.parse, "bogus-type:nothing", self.f)
  2280. class ServerStringTests(unittest.TestCase):
  2281. """
  2282. Tests for L{twisted.internet.endpoints.serverFromString}.
  2283. """
  2284. def test_tcp(self):
  2285. """
  2286. When passed a TCP strports description, L{endpoints.serverFromString}
  2287. returns a L{TCP4ServerEndpoint} instance initialized with the values
  2288. from the string.
  2289. """
  2290. reactor = object()
  2291. server = endpoints.serverFromString(
  2292. reactor, "tcp:1234:backlog=12:interface=10.0.0.1")
  2293. self.assertIsInstance(server, endpoints.TCP4ServerEndpoint)
  2294. self.assertIs(server._reactor, reactor)
  2295. self.assertEqual(server._port, 1234)
  2296. self.assertEqual(server._backlog, 12)
  2297. self.assertEqual(server._interface, "10.0.0.1")
  2298. def test_ssl(self):
  2299. """
  2300. When passed an SSL strports description, L{endpoints.serverFromString}
  2301. returns a L{SSL4ServerEndpoint} instance initialized with the values
  2302. from the string.
  2303. """
  2304. reactor = object()
  2305. server = endpoints.serverFromString(
  2306. reactor,
  2307. "ssl:1234:backlog=12:privateKey=%s:"
  2308. "certKey=%s:sslmethod=TLSv1_METHOD:interface=10.0.0.1"
  2309. % (escapedPEMPathName, escapedPEMPathName))
  2310. self.assertIsInstance(server, endpoints.SSL4ServerEndpoint)
  2311. self.assertIs(server._reactor, reactor)
  2312. self.assertEqual(server._port, 1234)
  2313. self.assertEqual(server._backlog, 12)
  2314. self.assertEqual(server._interface, "10.0.0.1")
  2315. self.assertEqual(server._sslContextFactory.method, TLSv1_METHOD)
  2316. ctx = server._sslContextFactory.getContext()
  2317. self.assertIsInstance(ctx, ContextType)
  2318. def test_sslWithDefaults(self):
  2319. """
  2320. An SSL string endpoint description with minimal arguments returns
  2321. a properly initialized L{SSL4ServerEndpoint} instance.
  2322. """
  2323. reactor = object()
  2324. server = endpoints.serverFromString(
  2325. reactor, "ssl:4321:privateKey=%s" % (escapedPEMPathName,))
  2326. self.assertIsInstance(server, endpoints.SSL4ServerEndpoint)
  2327. self.assertIs(server._reactor, reactor)
  2328. self.assertEqual(server._port, 4321)
  2329. self.assertEqual(server._backlog, 50)
  2330. self.assertEqual(server._interface, "")
  2331. self.assertEqual(server._sslContextFactory.method, SSLv23_METHOD)
  2332. self.assertTrue(
  2333. server._sslContextFactory._options & OP_NO_SSLv3,
  2334. )
  2335. ctx = server._sslContextFactory.getContext()
  2336. self.assertIsInstance(ctx, ContextType)
  2337. # Use a class variable to ensure we use the exactly same endpoint string
  2338. # except for the chain file itself.
  2339. SSL_CHAIN_TEMPLATE = "ssl:1234:privateKey=%s:extraCertChain=%s"
  2340. def test_sslChainLoads(self):
  2341. """
  2342. Specifying a chain file loads the contained certificates in the right
  2343. order.
  2344. """
  2345. server = endpoints.serverFromString(
  2346. object(),
  2347. self.SSL_CHAIN_TEMPLATE % (escapedPEMPathName,
  2348. escapedChainPathName,)
  2349. )
  2350. # Test chain file is just a concatenation of thing1.pem and thing2.pem
  2351. # so we can check that loading has succeeded and order has been
  2352. # preserved.
  2353. expectedChainCerts = [
  2354. Certificate.loadPEM(casPath.child("thing%d.pem" % (n,))
  2355. .getContent())
  2356. for n in [1, 2]
  2357. ]
  2358. cf = server._sslContextFactory
  2359. self.assertEqual(cf.extraCertChain[0].digest('sha1'),
  2360. expectedChainCerts[0].digest('sha1'))
  2361. self.assertEqual(cf.extraCertChain[1].digest('sha1'),
  2362. expectedChainCerts[1].digest('sha1'))
  2363. def test_sslChainFileMustContainCert(self):
  2364. """
  2365. If C{extraCertChain} is passed, it has to contain at least one valid
  2366. certificate in PEM format.
  2367. """
  2368. fp = FilePath(self.mktemp())
  2369. fp.create().close()
  2370. # The endpoint string is the same as in the valid case except for
  2371. # a different chain file. We use an empty temp file which obviously
  2372. # will never contain any certificates.
  2373. with self.assertRaises(ValueError) as caught:
  2374. endpoints.serverFromString(
  2375. object(),
  2376. self.SSL_CHAIN_TEMPLATE % (
  2377. escapedPEMPathName,
  2378. endpoints.quoteStringArgument(fp.path),
  2379. )
  2380. )
  2381. # The raised exception should list what file it is attempting to find
  2382. # the chain in.
  2383. self.assertEqual(str(caught.exception),
  2384. ("Specified chain file '%s' doesn't contain any valid"
  2385. " certificates in PEM format.") % (fp.path,))
  2386. def test_sslDHparameters(self):
  2387. """
  2388. If C{dhParameters} are specified, they are passed as
  2389. L{DiffieHellmanParameters} into L{CertificateOptions}.
  2390. """
  2391. fileName = 'someFile'
  2392. reactor = object()
  2393. server = endpoints.serverFromString(
  2394. reactor,
  2395. "ssl:4321:privateKey={0}:certKey={1}:dhParameters={2}"
  2396. .format(escapedPEMPathName, escapedPEMPathName, fileName)
  2397. )
  2398. cf = server._sslContextFactory
  2399. self.assertIsInstance(cf.dhParameters, DiffieHellmanParameters)
  2400. self.assertEqual(FilePath(fileName), cf.dhParameters._dhFile)
  2401. if skipSSL:
  2402. test_ssl.skip = test_sslWithDefaults.skip = skipSSL
  2403. test_sslChainLoads.skip = skipSSL
  2404. test_sslChainFileMustContainCert.skip = skipSSL
  2405. test_sslDHparameters.skip = skipSSL
  2406. def test_unix(self):
  2407. """
  2408. When passed a UNIX strports description, L{endpoint.serverFromString}
  2409. returns a L{UNIXServerEndpoint} instance initialized with the values
  2410. from the string.
  2411. """
  2412. reactor = object()
  2413. endpoint = endpoints.serverFromString(
  2414. reactor,
  2415. "unix:/var/foo/bar:backlog=7:mode=0123:lockfile=1")
  2416. self.assertIsInstance(endpoint, endpoints.UNIXServerEndpoint)
  2417. self.assertIs(endpoint._reactor, reactor)
  2418. self.assertEqual(endpoint._address, "/var/foo/bar")
  2419. self.assertEqual(endpoint._backlog, 7)
  2420. self.assertEqual(endpoint._mode, 0o123)
  2421. self.assertTrue(endpoint._wantPID)
  2422. def test_unknownType(self):
  2423. """
  2424. L{endpoints.serverFromString} raises C{ValueError} when given an
  2425. unknown endpoint type.
  2426. """
  2427. value = self.assertRaises(
  2428. # faster-than-light communication not supported
  2429. ValueError, endpoints.serverFromString, None,
  2430. "ftl:andromeda/carcosa/hali/2387")
  2431. self.assertEqual(
  2432. str(value),
  2433. "Unknown endpoint type: 'ftl'")
  2434. def test_typeFromPlugin(self):
  2435. """
  2436. L{endpoints.serverFromString} looks up plugins of type
  2437. L{IStreamServerEndpoint} and constructs endpoints from them.
  2438. """
  2439. # Set up a plugin which will only be accessible for the duration of
  2440. # this test.
  2441. addFakePlugin(self)
  2442. # Plugin is set up: now actually test.
  2443. notAReactor = object()
  2444. fakeEndpoint = endpoints.serverFromString(
  2445. notAReactor, "fake:hello:world:yes=no:up=down")
  2446. from twisted.plugins.fakeendpoint import fake
  2447. self.assertIs(fakeEndpoint.parser, fake)
  2448. self.assertEqual(fakeEndpoint.args, (notAReactor, 'hello', 'world'))
  2449. self.assertEqual(fakeEndpoint.kwargs, dict(yes='no', up='down'))
  2450. def addFakePlugin(testCase, dropinSource="fakeendpoint.py"):
  2451. """
  2452. For the duration of C{testCase}, add a fake plugin to twisted.plugins which
  2453. contains some sample endpoint parsers.
  2454. """
  2455. import sys
  2456. savedModules = sys.modules.copy()
  2457. savedPluginPath = list(plugins.__path__)
  2458. def cleanup():
  2459. sys.modules.clear()
  2460. sys.modules.update(savedModules)
  2461. plugins.__path__[:] = savedPluginPath
  2462. testCase.addCleanup(cleanup)
  2463. fp = FilePath(testCase.mktemp())
  2464. fp.createDirectory()
  2465. getModule(__name__).filePath.sibling(dropinSource).copyTo(
  2466. fp.child(dropinSource))
  2467. plugins.__path__.append(fp.path)
  2468. class ClientStringTests(unittest.TestCase):
  2469. """
  2470. Tests for L{twisted.internet.endpoints.clientFromString}.
  2471. """
  2472. def test_tcp(self):
  2473. """
  2474. When passed a TCP strports description, L{endpoints.clientFromString}
  2475. returns a L{TCP4ClientEndpoint} instance initialized with the values
  2476. from the string.
  2477. """
  2478. reactor = object()
  2479. client = endpoints.clientFromString(
  2480. reactor,
  2481. "tcp:host=example.com:port=1234:timeout=7:bindAddress=10.0.0.2")
  2482. self.assertIsInstance(client, endpoints.TCP4ClientEndpoint)
  2483. self.assertIs(client._reactor, reactor)
  2484. self.assertEqual(client._host, "example.com")
  2485. self.assertEqual(client._port, 1234)
  2486. self.assertEqual(client._timeout, 7)
  2487. self.assertEqual(client._bindAddress, ("10.0.0.2", 0))
  2488. def test_tcpPositionalArgs(self):
  2489. """
  2490. When passed a TCP strports description using positional arguments,
  2491. L{endpoints.clientFromString} returns a L{TCP4ClientEndpoint} instance
  2492. initialized with the values from the string.
  2493. """
  2494. reactor = object()
  2495. client = endpoints.clientFromString(
  2496. reactor,
  2497. "tcp:example.com:1234:timeout=7:bindAddress=10.0.0.2")
  2498. self.assertIsInstance(client, endpoints.TCP4ClientEndpoint)
  2499. self.assertIs(client._reactor, reactor)
  2500. self.assertEqual(client._host, "example.com")
  2501. self.assertEqual(client._port, 1234)
  2502. self.assertEqual(client._timeout, 7)
  2503. self.assertEqual(client._bindAddress, ("10.0.0.2", 0))
  2504. def test_tcpHostPositionalArg(self):
  2505. """
  2506. When passed a TCP strports description specifying host as a positional
  2507. argument, L{endpoints.clientFromString} returns a L{TCP4ClientEndpoint}
  2508. instance initialized with the values from the string.
  2509. """
  2510. reactor = object()
  2511. client = endpoints.clientFromString(
  2512. reactor,
  2513. "tcp:example.com:port=1234:timeout=7:bindAddress=10.0.0.2")
  2514. self.assertEqual(client._host, "example.com")
  2515. self.assertEqual(client._port, 1234)
  2516. def test_tcpPortPositionalArg(self):
  2517. """
  2518. When passed a TCP strports description specifying port as a positional
  2519. argument, L{endpoints.clientFromString} returns a L{TCP4ClientEndpoint}
  2520. instance initialized with the values from the string.
  2521. """
  2522. reactor = object()
  2523. client = endpoints.clientFromString(
  2524. reactor,
  2525. "tcp:host=example.com:1234:timeout=7:bindAddress=10.0.0.2")
  2526. self.assertEqual(client._host, "example.com")
  2527. self.assertEqual(client._port, 1234)
  2528. def test_tcpDefaults(self):
  2529. """
  2530. A TCP strports description may omit I{timeout} or I{bindAddress} to
  2531. allow the default to be used.
  2532. """
  2533. reactor = object()
  2534. client = endpoints.clientFromString(
  2535. reactor,
  2536. "tcp:host=example.com:port=1234")
  2537. self.assertEqual(client._timeout, 30)
  2538. self.assertIsNone(client._bindAddress)
  2539. def test_unix(self):
  2540. """
  2541. When passed a UNIX strports description, L{endpoints.clientFromString}
  2542. returns a L{UNIXClientEndpoint} instance initialized with the values
  2543. from the string.
  2544. """
  2545. reactor = object()
  2546. client = endpoints.clientFromString(
  2547. reactor,
  2548. "unix:path=/var/foo/bar:lockfile=1:timeout=9")
  2549. self.assertIsInstance(client, endpoints.UNIXClientEndpoint)
  2550. self.assertIs(client._reactor, reactor)
  2551. self.assertEqual(client._path, "/var/foo/bar")
  2552. self.assertEqual(client._timeout, 9)
  2553. self.assertTrue(client._checkPID)
  2554. def test_unixDefaults(self):
  2555. """
  2556. A UNIX strports description may omit I{lockfile} or I{timeout} to allow
  2557. the defaults to be used.
  2558. """
  2559. client = endpoints.clientFromString(
  2560. object(), "unix:path=/var/foo/bar")
  2561. self.assertEqual(client._timeout, 30)
  2562. self.assertFalse(client._checkPID)
  2563. def test_unixPathPositionalArg(self):
  2564. """
  2565. When passed a UNIX strports description specifying path as a positional
  2566. argument, L{endpoints.clientFromString} returns a L{UNIXClientEndpoint}
  2567. instance initialized with the values from the string.
  2568. """
  2569. reactor = object()
  2570. client = endpoints.clientFromString(
  2571. reactor,
  2572. "unix:/var/foo/bar:lockfile=1:timeout=9")
  2573. self.assertIsInstance(client, endpoints.UNIXClientEndpoint)
  2574. self.assertIs(client._reactor, reactor)
  2575. self.assertEqual(client._path, "/var/foo/bar")
  2576. self.assertEqual(client._timeout, 9)
  2577. self.assertTrue(client._checkPID)
  2578. def test_typeFromPlugin(self):
  2579. """
  2580. L{endpoints.clientFromString} looks up plugins of type
  2581. L{IStreamClientEndpoint} and constructs endpoints from them.
  2582. """
  2583. addFakePlugin(self)
  2584. notAReactor = object()
  2585. clientEndpoint = endpoints.clientFromString(
  2586. notAReactor, "crfake:alpha:beta:cee=dee:num=1")
  2587. from twisted.plugins.fakeendpoint import fakeClientWithReactor
  2588. self.assertIs(clientEndpoint.parser, fakeClientWithReactor)
  2589. self.assertEqual(clientEndpoint.args, (notAReactor, 'alpha', 'beta'))
  2590. self.assertEqual(clientEndpoint.kwargs, dict(cee='dee', num='1'))
  2591. def test_unknownType(self):
  2592. """
  2593. L{endpoints.clientFromString} raises C{ValueError} when given an
  2594. unknown endpoint type.
  2595. """
  2596. value = self.assertRaises(
  2597. # faster-than-light communication not supported
  2598. ValueError, endpoints.clientFromString, None,
  2599. "ftl:andromeda/carcosa/hali/2387")
  2600. self.assertEqual(
  2601. str(value),
  2602. "Unknown endpoint type: 'ftl'")
  2603. def test_stringParserWithReactor(self):
  2604. """
  2605. L{endpoints.clientFromString} will pass a reactor to plugins
  2606. implementing the L{IStreamClientEndpointStringParserWithReactor}
  2607. interface.
  2608. """
  2609. addFakePlugin(self)
  2610. reactor = object()
  2611. clientEndpoint = endpoints.clientFromString(
  2612. reactor, 'crfake:alpha:beta:cee=dee:num=1')
  2613. from twisted.plugins.fakeendpoint import fakeClientWithReactor
  2614. self.assertEqual(
  2615. (clientEndpoint.parser,
  2616. clientEndpoint.args,
  2617. clientEndpoint.kwargs),
  2618. (fakeClientWithReactor,
  2619. (reactor, 'alpha', 'beta'),
  2620. dict(cee='dee', num='1')))
  2621. class SSLClientStringTests(unittest.TestCase):
  2622. """
  2623. Tests for L{twisted.internet.endpoints.clientFromString} which require SSL.
  2624. """
  2625. if skipSSL:
  2626. skip = skipSSL
  2627. def test_ssl(self):
  2628. """
  2629. When passed an SSL strports description, L{clientFromString} returns a
  2630. L{SSL4ClientEndpoint} instance initialized with the values from the
  2631. string.
  2632. """
  2633. reactor = object()
  2634. client = endpoints.clientFromString(
  2635. reactor,
  2636. "ssl:host=example.net:port=4321:privateKey=%s:"
  2637. "certKey=%s:bindAddress=10.0.0.3:timeout=3:caCertsDir=%s" %
  2638. (escapedPEMPathName, escapedPEMPathName, escapedCAsPathName))
  2639. self.assertIsInstance(client, endpoints.SSL4ClientEndpoint)
  2640. self.assertIs(client._reactor, reactor)
  2641. self.assertEqual(client._host, "example.net")
  2642. self.assertEqual(client._port, 4321)
  2643. self.assertEqual(client._timeout, 3)
  2644. self.assertEqual(client._bindAddress, ("10.0.0.3", 0))
  2645. certOptions = client._sslContextFactory
  2646. self.assertIsInstance(certOptions, CertificateOptions)
  2647. self.assertEqual(certOptions.method, SSLv23_METHOD)
  2648. self.assertTrue(certOptions._options & OP_NO_SSLv3)
  2649. ctx = certOptions.getContext()
  2650. self.assertIsInstance(ctx, ContextType)
  2651. self.assertEqual(Certificate(certOptions.certificate), testCertificate)
  2652. privateCert = PrivateCertificate(certOptions.certificate)
  2653. privateCert._setPrivateKey(KeyPair(certOptions.privateKey))
  2654. self.assertEqual(privateCert, testPrivateCertificate)
  2655. expectedCerts = [
  2656. Certificate.loadPEM(x.getContent()) for x in
  2657. [casPath.child("thing1.pem"), casPath.child("thing2.pem")]
  2658. if x.basename().lower().endswith('.pem')
  2659. ]
  2660. addedCerts = []
  2661. class ListCtx(object):
  2662. def get_cert_store(self):
  2663. class Store(object):
  2664. def add_cert(self, cert):
  2665. addedCerts.append(cert)
  2666. return Store()
  2667. certOptions.trustRoot._addCACertsToContext(ListCtx())
  2668. self.assertEqual(
  2669. sorted((Certificate(x) for x in addedCerts),
  2670. key=lambda cert: cert.digest()),
  2671. sorted(expectedCerts,
  2672. key=lambda cert: cert.digest())
  2673. )
  2674. def test_sslPositionalArgs(self):
  2675. """
  2676. When passed an SSL strports description, L{clientFromString} returns a
  2677. L{SSL4ClientEndpoint} instance initialized with the values from the
  2678. string.
  2679. """
  2680. reactor = object()
  2681. client = endpoints.clientFromString(
  2682. reactor,
  2683. "ssl:example.net:4321:privateKey=%s:"
  2684. "certKey=%s:bindAddress=10.0.0.3:timeout=3:caCertsDir=%s" %
  2685. (escapedPEMPathName, escapedPEMPathName, escapedCAsPathName))
  2686. self.assertIsInstance(client, endpoints.SSL4ClientEndpoint)
  2687. self.assertIs(client._reactor, reactor)
  2688. self.assertEqual(client._host, "example.net")
  2689. self.assertEqual(client._port, 4321)
  2690. self.assertEqual(client._timeout, 3)
  2691. self.assertEqual(client._bindAddress, ("10.0.0.3", 0))
  2692. def test_sslWithDefaults(self):
  2693. """
  2694. When passed an SSL strports description without extra arguments,
  2695. L{clientFromString} returns a L{SSL4ClientEndpoint} instance
  2696. whose context factory is initialized with default values.
  2697. """
  2698. reactor = object()
  2699. client = endpoints.clientFromString(reactor, "ssl:example.net:4321")
  2700. self.assertIsInstance(client, endpoints.SSL4ClientEndpoint)
  2701. self.assertIs(client._reactor, reactor)
  2702. self.assertEqual(client._host, "example.net")
  2703. self.assertEqual(client._port, 4321)
  2704. certOptions = client._sslContextFactory
  2705. self.assertEqual(certOptions.method, SSLv23_METHOD)
  2706. self.assertIsNone(certOptions.certificate)
  2707. self.assertIsNone(certOptions.privateKey)
  2708. def test_unreadableCertificate(self):
  2709. """
  2710. If a certificate in the directory is unreadable,
  2711. L{endpoints._loadCAsFromDir} will ignore that certificate.
  2712. """
  2713. class UnreadableFilePath(FilePath):
  2714. def getContent(self):
  2715. data = FilePath.getContent(self)
  2716. # There is a duplicate of thing2.pem, so ignore anything that
  2717. # looks like it.
  2718. if data == casPath.child("thing2.pem").getContent():
  2719. raise IOError(EPERM)
  2720. else:
  2721. return data
  2722. casPathClone = casPath.child("ignored").parent()
  2723. casPathClone.clonePath = UnreadableFilePath
  2724. self.assertEqual(
  2725. [Certificate(x) for x in
  2726. endpoints._loadCAsFromDir(casPathClone)._caCerts],
  2727. [Certificate.loadPEM(casPath.child("thing1.pem").getContent())])
  2728. def test_sslSimple(self):
  2729. """
  2730. When passed an SSL strports description without any extra parameters,
  2731. L{clientFromString} returns a simple non-verifying endpoint that will
  2732. speak SSL.
  2733. """
  2734. reactor = object()
  2735. client = endpoints.clientFromString(
  2736. reactor, "ssl:host=simple.example.org:port=4321")
  2737. certOptions = client._sslContextFactory
  2738. self.assertIsInstance(certOptions, CertificateOptions)
  2739. self.assertFalse(certOptions.verify)
  2740. ctx = certOptions.getContext()
  2741. self.assertIsInstance(ctx, ContextType)
  2742. class AdoptedStreamServerEndpointTests(ServerEndpointTestCaseMixin,
  2743. unittest.TestCase):
  2744. """
  2745. Tests for adopted socket-based stream server endpoints.
  2746. """
  2747. def _createStubbedAdoptedEndpoint(self, reactor, fileno, addressFamily):
  2748. """
  2749. Create an L{AdoptedStreamServerEndpoint} which may safely be used with
  2750. an invalid file descriptor. This is convenient for a number of unit
  2751. tests.
  2752. """
  2753. e = endpoints.AdoptedStreamServerEndpoint(reactor, fileno,
  2754. addressFamily)
  2755. # Stub out some syscalls which would fail, given our invalid file
  2756. # descriptor.
  2757. e._close = lambda fd: None
  2758. e._setNonBlocking = lambda fd: None
  2759. return e
  2760. def createServerEndpoint(self, reactor, factory):
  2761. """
  2762. Create a new L{AdoptedStreamServerEndpoint} for use by a test.
  2763. @return: A three-tuple:
  2764. - The endpoint
  2765. - A tuple of the arguments expected to be passed to the underlying
  2766. reactor method
  2767. - An IAddress object which will match the result of
  2768. L{IListeningPort.getHost} on the port returned by the endpoint.
  2769. """
  2770. fileno = 12
  2771. addressFamily = AF_INET
  2772. endpoint = self._createStubbedAdoptedEndpoint(
  2773. reactor, fileno, addressFamily)
  2774. # Magic numbers come from the implementation of MemoryReactor
  2775. address = IPv4Address("TCP", "0.0.0.0", 1234)
  2776. return (endpoint, (fileno, addressFamily, factory), address)
  2777. def expectedServers(self, reactor):
  2778. """
  2779. @return: The ports which were actually adopted by C{reactor} via calls
  2780. to its L{IReactorSocket.adoptStreamPort} implementation.
  2781. """
  2782. return reactor.adoptedPorts
  2783. def listenArgs(self):
  2784. """
  2785. @return: A C{dict} of additional keyword arguments to pass to the
  2786. C{createServerEndpoint}.
  2787. """
  2788. return {}
  2789. def test_singleUse(self):
  2790. """
  2791. L{AdoptedStreamServerEndpoint.listen} can only be used once. The file
  2792. descriptor given is closed after the first use, and subsequent calls to
  2793. C{listen} return a L{Deferred} that fails with L{AlreadyListened}.
  2794. """
  2795. reactor = MemoryReactor()
  2796. endpoint = self._createStubbedAdoptedEndpoint(reactor, 13, AF_INET)
  2797. endpoint.listen(object())
  2798. d = self.assertFailure(
  2799. endpoint.listen(object()), error.AlreadyListened)
  2800. def listenFailed(ignored):
  2801. self.assertEqual(1, len(reactor.adoptedPorts))
  2802. d.addCallback(listenFailed)
  2803. return d
  2804. def test_descriptionNonBlocking(self):
  2805. """
  2806. L{AdoptedStreamServerEndpoint.listen} sets the file description given
  2807. to it to non-blocking.
  2808. """
  2809. reactor = MemoryReactor()
  2810. endpoint = self._createStubbedAdoptedEndpoint(reactor, 13, AF_INET)
  2811. events = []
  2812. def setNonBlocking(fileno):
  2813. events.append(("setNonBlocking", fileno))
  2814. endpoint._setNonBlocking = setNonBlocking
  2815. d = endpoint.listen(object())
  2816. def listened(ignored):
  2817. self.assertEqual([("setNonBlocking", 13)], events)
  2818. d.addCallback(listened)
  2819. return d
  2820. def test_descriptorClosed(self):
  2821. """
  2822. L{AdoptedStreamServerEndpoint.listen} closes its file descriptor after
  2823. adding it to the reactor with L{IReactorSocket.adoptStreamPort}.
  2824. """
  2825. reactor = MemoryReactor()
  2826. endpoint = self._createStubbedAdoptedEndpoint(reactor, 13, AF_INET)
  2827. events = []
  2828. def close(fileno):
  2829. events.append(("close", fileno, len(reactor.adoptedPorts)))
  2830. endpoint._close = close
  2831. d = endpoint.listen(object())
  2832. def listened(ignored):
  2833. self.assertEqual([("close", 13, 1)], events)
  2834. d.addCallback(listened)
  2835. return d
  2836. class SystemdEndpointPluginTests(unittest.TestCase):
  2837. """
  2838. Unit tests for the systemd stream server endpoint and endpoint string
  2839. description parser.
  2840. @see: U{systemd<http://www.freedesktop.org/wiki/Software/systemd>}
  2841. """
  2842. _parserClass = endpoints._SystemdParser
  2843. def test_pluginDiscovery(self):
  2844. """
  2845. L{endpoints._SystemdParser} is found as a plugin for
  2846. L{interfaces.IStreamServerEndpointStringParser} interface.
  2847. """
  2848. parsers = list(getPlugins(
  2849. interfaces.IStreamServerEndpointStringParser))
  2850. for p in parsers:
  2851. if isinstance(p, self._parserClass):
  2852. break
  2853. else:
  2854. self.fail("Did not find systemd parser in %r" % (parsers,))
  2855. def test_interface(self):
  2856. """
  2857. L{endpoints._SystemdParser} instances provide
  2858. L{interfaces.IStreamServerEndpointStringParser}.
  2859. """
  2860. parser = self._parserClass()
  2861. self.assertTrue(verifyObject(
  2862. interfaces.IStreamServerEndpointStringParser, parser))
  2863. def _parseStreamServerTest(self, addressFamily, addressFamilyString):
  2864. """
  2865. Helper for unit tests for L{endpoints._SystemdParser.parseStreamServer}
  2866. for different address families.
  2867. Handling of the address family given will be verify. If there is a
  2868. problem a test-failing exception will be raised.
  2869. @param addressFamily: An address family constant, like
  2870. L{socket.AF_INET}.
  2871. @param addressFamilyString: A string which should be recognized by the
  2872. parser as representing C{addressFamily}.
  2873. """
  2874. reactor = object()
  2875. descriptors = [5, 6, 7, 8, 9]
  2876. index = 3
  2877. parser = self._parserClass()
  2878. parser._sddaemon = ListenFDs(descriptors)
  2879. server = parser.parseStreamServer(
  2880. reactor, domain=addressFamilyString, index=str(index))
  2881. self.assertIs(server.reactor, reactor)
  2882. self.assertEqual(server.addressFamily, addressFamily)
  2883. self.assertEqual(server.fileno, descriptors[index])
  2884. def test_parseStreamServerINET(self):
  2885. """
  2886. IPv4 can be specified using the string C{"INET"}.
  2887. """
  2888. self._parseStreamServerTest(AF_INET, "INET")
  2889. def test_parseStreamServerINET6(self):
  2890. """
  2891. IPv6 can be specified using the string C{"INET6"}.
  2892. """
  2893. self._parseStreamServerTest(AF_INET6, "INET6")
  2894. def test_parseStreamServerUNIX(self):
  2895. """
  2896. A UNIX domain socket can be specified using the string C{"UNIX"}.
  2897. """
  2898. try:
  2899. from socket import AF_UNIX
  2900. except ImportError:
  2901. raise unittest.SkipTest("Platform lacks AF_UNIX support")
  2902. else:
  2903. self._parseStreamServerTest(AF_UNIX, "UNIX")
  2904. class TCP6ServerEndpointPluginTests(unittest.TestCase):
  2905. """
  2906. Unit tests for the TCP IPv6 stream server endpoint string description
  2907. parser.
  2908. """
  2909. _parserClass = endpoints._TCP6ServerParser
  2910. def test_pluginDiscovery(self):
  2911. """
  2912. L{endpoints._TCP6ServerParser} is found as a plugin for
  2913. L{interfaces.IStreamServerEndpointStringParser} interface.
  2914. """
  2915. parsers = list(getPlugins(
  2916. interfaces.IStreamServerEndpointStringParser))
  2917. for p in parsers:
  2918. if isinstance(p, self._parserClass):
  2919. break
  2920. else:
  2921. self.fail(
  2922. "Did not find TCP6ServerEndpoint parser in %r" % (parsers,))
  2923. def test_interface(self):
  2924. """
  2925. L{endpoints._TCP6ServerParser} instances provide
  2926. L{interfaces.IStreamServerEndpointStringParser}.
  2927. """
  2928. parser = self._parserClass()
  2929. self.assertTrue(verifyObject(
  2930. interfaces.IStreamServerEndpointStringParser, parser))
  2931. def test_stringDescription(self):
  2932. """
  2933. L{serverFromString} returns a L{TCP6ServerEndpoint} instance with a
  2934. 'tcp6' endpoint string description.
  2935. """
  2936. ep = endpoints.serverFromString(
  2937. MemoryReactor(), "tcp6:8080:backlog=12:interface=\:\:1")
  2938. self.assertIsInstance(ep, endpoints.TCP6ServerEndpoint)
  2939. self.assertIsInstance(ep._reactor, MemoryReactor)
  2940. self.assertEqual(ep._port, 8080)
  2941. self.assertEqual(ep._backlog, 12)
  2942. self.assertEqual(ep._interface, '::1')
  2943. class StandardIOEndpointPluginTests(unittest.TestCase):
  2944. """
  2945. Unit tests for the Standard I/O endpoint string description parser.
  2946. """
  2947. _parserClass = endpoints._StandardIOParser
  2948. def test_pluginDiscovery(self):
  2949. """
  2950. L{endpoints._StandardIOParser} is found as a plugin for
  2951. L{interfaces.IStreamServerEndpointStringParser} interface.
  2952. """
  2953. parsers = list(getPlugins(
  2954. interfaces.IStreamServerEndpointStringParser))
  2955. for p in parsers:
  2956. if isinstance(p, self._parserClass):
  2957. break
  2958. else:
  2959. self.fail(
  2960. "Did not find StandardIOEndpoint parser in %r" % (parsers,))
  2961. def test_interface(self):
  2962. """
  2963. L{endpoints._StandardIOParser} instances provide
  2964. L{interfaces.IStreamServerEndpointStringParser}.
  2965. """
  2966. parser = self._parserClass()
  2967. self.assertTrue(verifyObject(
  2968. interfaces.IStreamServerEndpointStringParser, parser))
  2969. def test_stringDescription(self):
  2970. """
  2971. L{serverFromString} returns a L{StandardIOEndpoint} instance with a
  2972. 'stdio' endpoint string description.
  2973. """
  2974. ep = endpoints.serverFromString(MemoryReactor(), "stdio:")
  2975. self.assertIsInstance(ep, endpoints.StandardIOEndpoint)
  2976. self.assertIsInstance(ep._reactor, MemoryReactor)
  2977. class ConnectProtocolTests(unittest.TestCase):
  2978. """
  2979. Tests for C{connectProtocol}.
  2980. """
  2981. def test_connectProtocolCreatesFactory(self):
  2982. """
  2983. C{endpoints.connectProtocol} calls the given endpoint's C{connect()}
  2984. method with a factory that will build the given protocol.
  2985. """
  2986. reactor = MemoryReactor()
  2987. endpoint = endpoints.TCP4ClientEndpoint(reactor, "127.0.0.1", 0)
  2988. theProtocol = object()
  2989. endpoints.connectProtocol(endpoint, theProtocol)
  2990. # A TCP connection was made via the given endpoint:
  2991. self.assertEqual(len(reactor.tcpClients), 1)
  2992. # TCP4ClientEndpoint uses a _WrapperFactory around the underlying
  2993. # factory, so we need to unwrap it:
  2994. factory = reactor.tcpClients[0][2]._wrappedFactory
  2995. self.assertIsInstance(factory, protocol.Factory)
  2996. self.assertIs(factory.buildProtocol(None), theProtocol)
  2997. def test_connectProtocolReturnsConnectResult(self):
  2998. """
  2999. C{endpoints.connectProtocol} returns the result of calling the given
  3000. endpoint's C{connect()} method.
  3001. """
  3002. result = defer.Deferred()
  3003. class Endpoint:
  3004. def connect(self, factory):
  3005. """
  3006. Return a marker object for use in our assertion.
  3007. """
  3008. return result
  3009. endpoint = Endpoint()
  3010. self.assertIs(result, endpoints.connectProtocol(endpoint, object()))
  3011. class UppercaseWrapperProtocol(policies.ProtocolWrapper, object):
  3012. """
  3013. A wrapper protocol which uppercases all strings passed through it.
  3014. """
  3015. def dataReceived(self, data):
  3016. """
  3017. Uppercase a string passed in from the transport.
  3018. @param data: The string to uppercase.
  3019. @type data: L{bytes}
  3020. """
  3021. super(UppercaseWrapperProtocol, self).dataReceived(data.upper())
  3022. def write(self, data):
  3023. """
  3024. Uppercase a string passed out to the transport.
  3025. @param data: The string to uppercase.
  3026. @type data: L{bytes}
  3027. """
  3028. super(UppercaseWrapperProtocol, self).write(data.upper())
  3029. def writeSequence(self, seq):
  3030. """
  3031. Uppercase a series of strings passed out to the transport.
  3032. @param seq: An iterable of strings.
  3033. """
  3034. for data in seq:
  3035. self.write(data)
  3036. class UppercaseWrapperFactory(policies.WrappingFactory, object):
  3037. """
  3038. A wrapper factory which uppercases all strings passed through it.
  3039. """
  3040. protocol = UppercaseWrapperProtocol
  3041. class NetstringTracker(basic.NetstringReceiver, object):
  3042. """
  3043. A netstring receiver which keeps track of the strings received.
  3044. @ivar strings: A L{list} of received strings, in order.
  3045. """
  3046. def __init__(self):
  3047. self.strings = []
  3048. def stringReceived(self, string):
  3049. """
  3050. Receive a string and append it to C{self.strings}.
  3051. @param string: The string to be appended to C{self.strings}.
  3052. """
  3053. self.strings.append(string)
  3054. class FakeError(Exception):
  3055. """
  3056. An error which isn't really an error.
  3057. This is raised in the L{wrapClientTLS} tests in place of a
  3058. 'real' exception.
  3059. """
  3060. class WrapperClientEndpointTests(unittest.TestCase):
  3061. """
  3062. Tests for L{_WrapperClientEndpoint}.
  3063. """
  3064. def setUp(self):
  3065. self.endpoint, self.completer = connectableEndpoint()
  3066. self.context = object()
  3067. self.wrapper = endpoints._WrapperEndpoint(self.endpoint,
  3068. UppercaseWrapperFactory)
  3069. self.factory = Factory.forProtocol(NetstringTracker)
  3070. def test_wrappingBehavior(self):
  3071. """
  3072. Any modifications performed by the underlying L{ProtocolWrapper}
  3073. propagate through to the wrapped L{Protocol}.
  3074. """
  3075. connecting = self.wrapper.connect(self.factory)
  3076. pump = self.completer.succeedOnce()
  3077. proto = self.successResultOf(connecting)
  3078. pump.server.transport.write(b'5:hello,')
  3079. pump.flush()
  3080. self.assertEqual(proto.strings, [b'HELLO'])
  3081. def test_methodsAvailable(self):
  3082. """
  3083. Methods defined on the wrapped L{Protocol} are accessible from the
  3084. L{Protocol} returned from C{connect}'s L{Deferred}.
  3085. """
  3086. connecting = self.wrapper.connect(self.factory)
  3087. pump = self.completer.succeedOnce()
  3088. proto = self.successResultOf(connecting)
  3089. proto.sendString(b'spam')
  3090. self.assertEqual(pump.clientIO.getOutBuffer(), b'4:SPAM,')
  3091. def test_connectionFailure(self):
  3092. """
  3093. Connection failures propagate upward to C{connect}'s L{Deferred}.
  3094. """
  3095. d = self.wrapper.connect(self.factory)
  3096. self.assertNoResult(d)
  3097. self.completer.failOnce(FakeError())
  3098. self.failureResultOf(d, FakeError)
  3099. def test_connectionCancellation(self):
  3100. """
  3101. Cancellation propagates upward to C{connect}'s L{Deferred}.
  3102. """
  3103. d = self.wrapper.connect(self.factory)
  3104. self.assertNoResult(d)
  3105. d.cancel()
  3106. self.failureResultOf(d, ConnectingCancelledError)
  3107. def test_transportOfTransportOfWrappedProtocol(self):
  3108. """
  3109. The transport of the wrapped L{Protocol}'s transport is the transport
  3110. passed to C{makeConnection}.
  3111. """
  3112. connecting = self.wrapper.connect(self.factory)
  3113. pump = self.completer.succeedOnce()
  3114. proto = self.successResultOf(connecting)
  3115. self.assertIs(
  3116. proto.transport.transport, pump.clientIO)
  3117. def connectionCreatorFromEndpoint(memoryReactor, tlsEndpoint):
  3118. """
  3119. Given a L{MemoryReactor} and the result of calling L{wrapClientTLS},
  3120. extract the L{IOpenSSLClientConnectionCreator} associated with it.
  3121. Implementation presently uses private attributes but could (and should) be
  3122. refactored to just call C{.connect()} on the endpoint, when
  3123. L{HostnameEndpoint} starts directing its C{getaddrinfo} call through the
  3124. reactor it is passed somehow rather than via the global threadpool.
  3125. @param memoryReactor: the reactor attached to the given endpoint.
  3126. (Presently unused, but included so tests won't need to be modified to
  3127. honor it.)
  3128. @param tlsEndpoint: The result of calling L{wrapClientTLS}.
  3129. @return: the client connection creator associated with the endpoint
  3130. wrapper.
  3131. @rtype: L{IOpenSSLClientConnectionCreator}
  3132. """
  3133. return tlsEndpoint._wrapperFactory(None)._connectionCreator
  3134. class WrapClientTLSParserTests(unittest.TestCase):
  3135. """
  3136. Tests for L{_TLSClientEndpointParser}.
  3137. """
  3138. if skipSSL:
  3139. skip = skipSSL
  3140. def test_hostnameEndpointConstruction(self):
  3141. """
  3142. A L{HostnameEndpoint} is constructed from parameters passed to
  3143. L{clientFromString}.
  3144. """
  3145. reactor = object()
  3146. endpoint = endpoints.clientFromString(
  3147. reactor,
  3148. nativeString(
  3149. 'tls:example.com:443:timeout=10:bindAddress=127.0.0.1'))
  3150. hostnameEndpoint = endpoint._wrappedEndpoint
  3151. self.assertIs(hostnameEndpoint._reactor, reactor)
  3152. self.assertEqual(hostnameEndpoint._hostBytes, b'example.com')
  3153. self.assertEqual(hostnameEndpoint._port, 443)
  3154. self.assertEqual(hostnameEndpoint._timeout, 10)
  3155. self.assertEqual(hostnameEndpoint._bindAddress,
  3156. nativeString('127.0.0.1'))
  3157. def test_utf8Encoding(self):
  3158. """
  3159. The hostname passed to L{clientFromString} is treated as utf-8 bytes;
  3160. it is then encoded as IDNA when it is passed along to
  3161. L{HostnameEndpoint}, and passed as unicode to L{optionsForClientTLS}.
  3162. """
  3163. reactor = object()
  3164. endpoint = endpoints.clientFromString(
  3165. reactor, b'tls:\xc3\xa9xample.example.com:443'
  3166. )
  3167. self.assertEqual(
  3168. endpoint._wrappedEndpoint._hostBytes,
  3169. b'xn--xample-9ua.example.com'
  3170. )
  3171. connectionCreator = connectionCreatorFromEndpoint(
  3172. reactor, endpoint)
  3173. self.assertEqual(connectionCreator._hostname,
  3174. u'\xe9xample.example.com')
  3175. def test_tls(self):
  3176. """
  3177. When passed a string endpoint description beginning with C{tls:},
  3178. L{clientFromString} returns a client endpoint initialized with the
  3179. values from the string.
  3180. """
  3181. # We can't peer into the unknowable chaos of the heart of OpenSSL
  3182. # (there's no public API to extract from a Context what its trust roots
  3183. # or certificate is); instead, we have to somehow extract information
  3184. # about this stuff from how the context behaves. So this test is an
  3185. # integration test.
  3186. # There are good examples of how to construct relevant test-fixture
  3187. # data in
  3188. # twisted.test.test_sslverify.certificatesForAuthorityAndServer; that
  3189. # more directly tests the nuances of this code. Remember that this
  3190. # should test both positive and negative cases.
  3191. reactor = MemoryReactor()
  3192. # The certificate in question here is a self-signed certificate for
  3193. # 'localhost', so use 'localhost' as a hostname and the directory
  3194. # containing the cert itself for the CAs list.
  3195. endpoint = endpoints.clientFromString(
  3196. deterministicResolvingReactor(reactor, ['127.0.0.1']),
  3197. 'tls:localhost:4321:privateKey={}:certificate={}:trustRoots={}'
  3198. .format(
  3199. escapedPEMPathName, escapedPEMPathName,
  3200. endpoints.quoteStringArgument(pemPath.parent().path)
  3201. ).encode('ascii')
  3202. )
  3203. d = endpoint.connect(Factory.forProtocol(Protocol))
  3204. host, port, factory, timeout, bindAddress = reactor.tcpClients.pop()
  3205. clientProtocol = factory.buildProtocol(None)
  3206. self.assertNoResult(d)
  3207. assert clientProtocol is not None
  3208. serverCert = PrivateCertificate.loadPEM(pemPath.getContent())
  3209. serverOptions = CertificateOptions(
  3210. privateKey=serverCert.privateKey.original,
  3211. certificate=serverCert.original,
  3212. extraCertChain=[
  3213. Certificate.loadPEM(chainPath.getContent()).original],
  3214. trustRoot=serverCert,
  3215. )
  3216. plainServer = Protocol()
  3217. serverProtocol = TLSMemoryBIOFactory(
  3218. serverOptions, isClient=False,
  3219. wrappedFactory=Factory.forProtocol(lambda: plainServer)
  3220. ).buildProtocol(None)
  3221. sProto, cProto, pump = connectedServerAndClient(
  3222. lambda: serverProtocol,
  3223. lambda: clientProtocol,
  3224. )
  3225. # verify privateKey
  3226. plainServer.transport.write(b"hello\r\n")
  3227. plainClient = self.successResultOf(d)
  3228. plainClient.transport.write(b"hi you too\r\n")
  3229. pump.flush()
  3230. self.assertFalse(plainServer.transport.disconnecting)
  3231. self.assertFalse(plainClient.transport.disconnecting)
  3232. self.assertFalse(plainServer.transport.disconnected)
  3233. self.assertFalse(plainClient.transport.disconnected)
  3234. peerCertificate = Certificate.peerFromTransport(plainServer.transport)
  3235. self.assertEqual(peerCertificate,
  3236. Certificate.loadPEM(pemPath.getContent()))
  3237. def test_tlsWithDefaults(self):
  3238. """
  3239. When passed a C{tls:} strports description without extra arguments,
  3240. L{clientFromString} returns a client endpoint whose context factory is
  3241. initialized with default values.
  3242. """
  3243. reactor = object()
  3244. endpoint = endpoints.clientFromString(reactor, b'tls:example.com:443')
  3245. creator = connectionCreatorFromEndpoint(reactor, endpoint)
  3246. self.assertEqual(creator._hostname, u'example.com')
  3247. self.assertEqual(endpoint._wrappedEndpoint._hostBytes, b'example.com')
  3248. def replacingGlobals(function, **newGlobals):
  3249. """
  3250. Create a copy of the given function with the given globals substituted.
  3251. The globals must already exist in the function's existing global scope.
  3252. @param function: any function object.
  3253. @type function: L{types.FunctionType}
  3254. @param newGlobals: each keyword argument should be a global to set in the
  3255. new function's returned scope.
  3256. @type newGlobals: L{dict}
  3257. @return: a new function, like C{function}, but with new global scope.
  3258. """
  3259. try:
  3260. codeObject = function.func_code
  3261. funcGlobals = function.func_globals
  3262. except AttributeError:
  3263. codeObject = function.__code__
  3264. funcGlobals = function.__globals__
  3265. for key in newGlobals:
  3266. if key not in funcGlobals:
  3267. raise TypeError(
  3268. "Name bound by replacingGlobals but not present in module: {}"
  3269. .format(key)
  3270. )
  3271. mergedGlobals = {}
  3272. mergedGlobals.update(funcGlobals)
  3273. mergedGlobals.update(newGlobals)
  3274. newFunction = FunctionType(codeObject, mergedGlobals)
  3275. mergedGlobals[function.__name__] = newFunction
  3276. return newFunction
  3277. class WrapClientTLSTests(unittest.TestCase):
  3278. """
  3279. Tests for the error-reporting behavior of L{wrapClientTLS} when
  3280. C{pyOpenSSL} is unavailable.
  3281. """
  3282. def test_noOpenSSL(self):
  3283. """
  3284. If SSL is not supported, L{TLSMemoryBIOFactory} will be L{None}, which
  3285. causes C{_wrapper} to also be L{None}. If C{_wrapper} is L{None}, then
  3286. an exception is raised.
  3287. """
  3288. replaced = replacingGlobals(endpoints.wrapClientTLS,
  3289. TLSMemoryBIOFactory=None)
  3290. notImplemented = self.assertRaises(NotImplementedError, replaced,
  3291. None, None)
  3292. self.assertIn("OpenSSL not available", str(notImplemented))