| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.test.proto_helpers}.
- """
- from zope.interface.verify import verifyObject
- from twisted.internet.interfaces import (ITransport, IPushProducer, IConsumer,
- IReactorTCP, IReactorSSL, IReactorUNIX, IAddress, IListeningPort,
- IConnector)
- from twisted.internet.address import IPv4Address
- from twisted.trial.unittest import TestCase
- from twisted.test.proto_helpers import (StringTransport, MemoryReactor,
- RaisingMemoryReactor, NonStreamingProducer)
- from twisted.internet.protocol import ClientFactory, Factory
- class StringTransportTests(TestCase):
- """
- Tests for L{twisted.test.proto_helpers.StringTransport}.
- """
- def setUp(self):
- self.transport = StringTransport()
- def test_interfaces(self):
- """
- L{StringTransport} instances provide L{ITransport}, L{IPushProducer},
- and L{IConsumer}.
- """
- self.assertTrue(verifyObject(ITransport, self.transport))
- self.assertTrue(verifyObject(IPushProducer, self.transport))
- self.assertTrue(verifyObject(IConsumer, self.transport))
- def test_registerProducer(self):
- """
- L{StringTransport.registerProducer} records the arguments supplied to
- it as instance attributes.
- """
- producer = object()
- streaming = object()
- self.transport.registerProducer(producer, streaming)
- self.assertIs(self.transport.producer, producer)
- self.assertIs(self.transport.streaming, streaming)
- def test_disallowedRegisterProducer(self):
- """
- L{StringTransport.registerProducer} raises L{RuntimeError} if a
- producer is already registered.
- """
- producer = object()
- self.transport.registerProducer(producer, True)
- self.assertRaises(
- RuntimeError, self.transport.registerProducer, object(), False)
- self.assertIs(self.transport.producer, producer)
- self.assertTrue(self.transport.streaming)
- def test_unregisterProducer(self):
- """
- L{StringTransport.unregisterProducer} causes the transport to forget
- about the registered producer and makes it possible to register a new
- one.
- """
- oldProducer = object()
- newProducer = object()
- self.transport.registerProducer(oldProducer, False)
- self.transport.unregisterProducer()
- self.assertIsNone(self.transport.producer)
- self.transport.registerProducer(newProducer, True)
- self.assertIs(self.transport.producer, newProducer)
- self.assertTrue(self.transport.streaming)
- def test_invalidUnregisterProducer(self):
- """
- L{StringTransport.unregisterProducer} raises L{RuntimeError} if called
- when no producer is registered.
- """
- self.assertRaises(RuntimeError, self.transport.unregisterProducer)
- def test_initialProducerState(self):
- """
- L{StringTransport.producerState} is initially C{'producing'}.
- """
- self.assertEqual(self.transport.producerState, 'producing')
- def test_pauseProducing(self):
- """
- L{StringTransport.pauseProducing} changes the C{producerState} of the
- transport to C{'paused'}.
- """
- self.transport.pauseProducing()
- self.assertEqual(self.transport.producerState, 'paused')
- def test_resumeProducing(self):
- """
- L{StringTransport.resumeProducing} changes the C{producerState} of the
- transport to C{'producing'}.
- """
- self.transport.pauseProducing()
- self.transport.resumeProducing()
- self.assertEqual(self.transport.producerState, 'producing')
- def test_stopProducing(self):
- """
- L{StringTransport.stopProducing} changes the C{'producerState'} of the
- transport to C{'stopped'}.
- """
- self.transport.stopProducing()
- self.assertEqual(self.transport.producerState, 'stopped')
- def test_stoppedTransportCannotPause(self):
- """
- L{StringTransport.pauseProducing} raises L{RuntimeError} if the
- transport has been stopped.
- """
- self.transport.stopProducing()
- self.assertRaises(RuntimeError, self.transport.pauseProducing)
- def test_stoppedTransportCannotResume(self):
- """
- L{StringTransport.resumeProducing} raises L{RuntimeError} if the
- transport has been stopped.
- """
- self.transport.stopProducing()
- self.assertRaises(RuntimeError, self.transport.resumeProducing)
- def test_disconnectingTransportCannotPause(self):
- """
- L{StringTransport.pauseProducing} raises L{RuntimeError} if the
- transport is being disconnected.
- """
- self.transport.loseConnection()
- self.assertRaises(RuntimeError, self.transport.pauseProducing)
- def test_disconnectingTransportCannotResume(self):
- """
- L{StringTransport.resumeProducing} raises L{RuntimeError} if the
- transport is being disconnected.
- """
- self.transport.loseConnection()
- self.assertRaises(RuntimeError, self.transport.resumeProducing)
- def test_loseConnectionSetsDisconnecting(self):
- """
- L{StringTransport.loseConnection} toggles the C{disconnecting} instance
- variable to C{True}.
- """
- self.assertFalse(self.transport.disconnecting)
- self.transport.loseConnection()
- self.assertTrue(self.transport.disconnecting)
- def test_specifiedHostAddress(self):
- """
- If a host address is passed to L{StringTransport.__init__}, that
- value is returned from L{StringTransport.getHost}.
- """
- address = object()
- self.assertIs(StringTransport(address).getHost(), address)
- def test_specifiedPeerAddress(self):
- """
- If a peer address is passed to L{StringTransport.__init__}, that
- value is returned from L{StringTransport.getPeer}.
- """
- address = object()
- self.assertIs(
- StringTransport(peerAddress=address).getPeer(), address)
- def test_defaultHostAddress(self):
- """
- If no host address is passed to L{StringTransport.__init__}, an
- L{IPv4Address} is returned from L{StringTransport.getHost}.
- """
- address = StringTransport().getHost()
- self.assertIsInstance(address, IPv4Address)
- def test_defaultPeerAddress(self):
- """
- If no peer address is passed to L{StringTransport.__init__}, an
- L{IPv4Address} is returned from L{StringTransport.getPeer}.
- """
- address = StringTransport().getPeer()
- self.assertIsInstance(address, IPv4Address)
- class ReactorTests(TestCase):
- """
- Tests for L{MemoryReactor} and L{RaisingMemoryReactor}.
- """
- def test_memoryReactorProvides(self):
- """
- L{MemoryReactor} provides all of the attributes described by the
- interfaces it advertises.
- """
- memoryReactor = MemoryReactor()
- verifyObject(IReactorTCP, memoryReactor)
- verifyObject(IReactorSSL, memoryReactor)
- verifyObject(IReactorUNIX, memoryReactor)
- def test_raisingReactorProvides(self):
- """
- L{RaisingMemoryReactor} provides all of the attributes described by the
- interfaces it advertises.
- """
- raisingReactor = RaisingMemoryReactor()
- verifyObject(IReactorTCP, raisingReactor)
- verifyObject(IReactorSSL, raisingReactor)
- verifyObject(IReactorUNIX, raisingReactor)
- def test_connectDestination(self):
- """
- L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and
- L{MemoryReactor.connectUNIX} will return an L{IConnector} whose
- C{getDestination} method returns an L{IAddress} with attributes which
- reflect the values passed.
- """
- memoryReactor = MemoryReactor()
- for connector in [memoryReactor.connectTCP(
- "test.example.com", 8321, ClientFactory()),
- memoryReactor.connectSSL(
- "test.example.com", 8321, ClientFactory(),
- None)]:
- verifyObject(IConnector, connector)
- address = connector.getDestination()
- verifyObject(IAddress, address)
- self.assertEqual(address.host, "test.example.com")
- self.assertEqual(address.port, 8321)
- connector = memoryReactor.connectUNIX(b"/fake/path", ClientFactory())
- verifyObject(IConnector, connector)
- address = connector.getDestination()
- verifyObject(IAddress, address)
- self.assertEqual(address.name, b"/fake/path")
- def test_listenDefaultHost(self):
- """
- L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and
- L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose
- C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL}
- will have a default host of C{'0.0.0.0'}, and a port that reflects the
- value passed, and C{listenUNIX} will have a name that reflects the path
- passed.
- """
- memoryReactor = MemoryReactor()
- for port in [memoryReactor.listenTCP(8242, Factory()),
- memoryReactor.listenSSL(8242, Factory(), None)]:
- verifyObject(IListeningPort, port)
- address = port.getHost()
- verifyObject(IAddress, address)
- self.assertEqual(address.host, '0.0.0.0')
- self.assertEqual(address.port, 8242)
- port = memoryReactor.listenUNIX(b"/path/to/socket", Factory())
- verifyObject(IListeningPort, port)
- address = port.getHost()
- verifyObject(IAddress, address)
- self.assertEqual(address.name, b"/path/to/socket")
- def test_readers(self):
- """
- Adding, removing, and listing readers works.
- """
- reader = object()
- reactor = MemoryReactor()
- reactor.addReader(reader)
- reactor.addReader(reader)
- self.assertEqual(reactor.getReaders(), [reader])
- reactor.removeReader(reader)
- self.assertEqual(reactor.getReaders(), [])
- def test_writers(self):
- """
- Adding, removing, and listing writers works.
- """
- writer = object()
- reactor = MemoryReactor()
- reactor.addWriter(writer)
- reactor.addWriter(writer)
- self.assertEqual(reactor.getWriters(), [writer])
- reactor.removeWriter(writer)
- self.assertEqual(reactor.getWriters(), [])
- class TestConsumer(object):
- """
- A very basic test consumer for use with the NonStreamingProducerTests.
- """
- def __init__(self):
- self.writes = []
- self.producer = None
- self.producerStreaming = None
- def registerProducer(self, producer, streaming):
- """
- Registers a single producer with this consumer. Just keeps track of it.
- @param producer: The producer to register.
- @param streaming: Whether the producer is a streaming one or not.
- """
- self.producer = producer
- self.producerStreaming = streaming
- def unregisterProducer(self):
- """
- Forget the producer we had previously registered.
- """
- self.producer = None
- self.producerStreaming = None
- def write(self, data):
- """
- Some data was written to the consumer: stores it for later use.
- @param data: The data to write.
- """
- self.writes.append(data)
- class NonStreamingProducerTests(TestCase):
- """
- Tests for the L{NonStreamingProducer} to validate behaviour.
- """
- def test_producesOnly10Times(self):
- """
- When the L{NonStreamingProducer} has resumeProducing called 10 times,
- it writes the counter each time and then fails.
- """
- consumer = TestConsumer()
- producer = NonStreamingProducer(consumer)
- consumer.registerProducer(producer, False)
- self.assertIs(consumer.producer, producer)
- self.assertIs(producer.consumer, consumer)
- self.assertFalse(consumer.producerStreaming)
- for _ in range(10):
- producer.resumeProducing()
- # We should have unregistered the producer and printed the 10 results.
- expectedWrites = [
- b'0', b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9'
- ]
- self.assertIsNone(consumer.producer)
- self.assertIsNone(consumer.producerStreaming)
- self.assertIsNone(producer.consumer)
- self.assertEqual(consumer.writes, expectedWrites)
- # Another attempt to produce fails.
- self.assertRaises(RuntimeError, producer.resumeProducing)
- def test_cannotPauseProduction(self):
- """
- When the L{NonStreamingProducer} is paused, it raises a
- L{RuntimeError}.
- """
- consumer = TestConsumer()
- producer = NonStreamingProducer(consumer)
- consumer.registerProducer(producer, False)
- # Produce once, just to be safe.
- producer.resumeProducing()
- self.assertRaises(RuntimeError, producer.pauseProducing)
|