123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.internet.epollreactor}.
- """
- from __future__ import division, absolute_import
- from twisted.trial.unittest import TestCase
- try:
- from twisted.internet.epollreactor import _ContinuousPolling
- except ImportError:
- _ContinuousPolling = None
- from twisted.internet.task import Clock
- from twisted.internet.error import ConnectionDone
- class Descriptor(object):
- """
- Records reads and writes, as if it were a C{FileDescriptor}.
- """
- def __init__(self):
- self.events = []
- def fileno(self):
- return 1
- def doRead(self):
- self.events.append("read")
- def doWrite(self):
- self.events.append("write")
- def connectionLost(self, reason):
- reason.trap(ConnectionDone)
- self.events.append("lost")
- class ContinuousPollingTests(TestCase):
- """
- L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
- objects.
- """
- def test_addReader(self):
- """
- Adding a reader when there was previously no reader starts up a
- C{LoopingCall}.
- """
- poller = _ContinuousPolling(Clock())
- self.assertIsNone(poller._loop)
- reader = object()
- self.assertFalse(poller.isReading(reader))
- poller.addReader(reader)
- self.assertIsNotNone(poller._loop)
- self.assertTrue(poller._loop.running)
- self.assertIs(poller._loop.clock, poller._reactor)
- self.assertTrue(poller.isReading(reader))
- def test_addWriter(self):
- """
- Adding a writer when there was previously no writer starts up a
- C{LoopingCall}.
- """
- poller = _ContinuousPolling(Clock())
- self.assertIsNone(poller._loop)
- writer = object()
- self.assertFalse(poller.isWriting(writer))
- poller.addWriter(writer)
- self.assertIsNotNone(poller._loop)
- self.assertTrue(poller._loop.running)
- self.assertIs(poller._loop.clock, poller._reactor)
- self.assertTrue(poller.isWriting(writer))
- def test_removeReader(self):
- """
- Removing a reader stops the C{LoopingCall}.
- """
- poller = _ContinuousPolling(Clock())
- reader = object()
- poller.addReader(reader)
- poller.removeReader(reader)
- self.assertIsNone(poller._loop)
- self.assertEqual(poller._reactor.getDelayedCalls(), [])
- self.assertFalse(poller.isReading(reader))
- def test_removeWriter(self):
- """
- Removing a writer stops the C{LoopingCall}.
- """
- poller = _ContinuousPolling(Clock())
- writer = object()
- poller.addWriter(writer)
- poller.removeWriter(writer)
- self.assertIsNone(poller._loop)
- self.assertEqual(poller._reactor.getDelayedCalls(), [])
- self.assertFalse(poller.isWriting(writer))
- def test_removeUnknown(self):
- """
- Removing unknown readers and writers silently does nothing.
- """
- poller = _ContinuousPolling(Clock())
- poller.removeWriter(object())
- poller.removeReader(object())
- def test_multipleReadersAndWriters(self):
- """
- Adding multiple readers and writers results in a single
- C{LoopingCall}.
- """
- poller = _ContinuousPolling(Clock())
- writer = object()
- poller.addWriter(writer)
- self.assertIsNotNone(poller._loop)
- poller.addWriter(object())
- self.assertIsNotNone(poller._loop)
- poller.addReader(object())
- self.assertIsNotNone(poller._loop)
- poller.addReader(object())
- poller.removeWriter(writer)
- self.assertIsNotNone(poller._loop)
- self.assertTrue(poller._loop.running)
- self.assertEqual(len(poller._reactor.getDelayedCalls()), 1)
- def test_readerPolling(self):
- """
- Adding a reader causes its C{doRead} to be called every 1
- milliseconds.
- """
- reactor = Clock()
- poller = _ContinuousPolling(reactor)
- desc = Descriptor()
- poller.addReader(desc)
- self.assertEqual(desc.events, [])
- reactor.advance(0.00001)
- self.assertEqual(desc.events, ["read"])
- reactor.advance(0.00001)
- self.assertEqual(desc.events, ["read", "read"])
- reactor.advance(0.00001)
- self.assertEqual(desc.events, ["read", "read", "read"])
- def test_writerPolling(self):
- """
- Adding a writer causes its C{doWrite} to be called every 1
- milliseconds.
- """
- reactor = Clock()
- poller = _ContinuousPolling(reactor)
- desc = Descriptor()
- poller.addWriter(desc)
- self.assertEqual(desc.events, [])
- reactor.advance(0.001)
- self.assertEqual(desc.events, ["write"])
- reactor.advance(0.001)
- self.assertEqual(desc.events, ["write", "write"])
- reactor.advance(0.001)
- self.assertEqual(desc.events, ["write", "write", "write"])
- def test_connectionLostOnRead(self):
- """
- If a C{doRead} returns a value indicating disconnection,
- C{connectionLost} is called on it.
- """
- reactor = Clock()
- poller = _ContinuousPolling(reactor)
- desc = Descriptor()
- desc.doRead = lambda: ConnectionDone()
- poller.addReader(desc)
- self.assertEqual(desc.events, [])
- reactor.advance(0.001)
- self.assertEqual(desc.events, ["lost"])
- def test_connectionLostOnWrite(self):
- """
- If a C{doWrite} returns a value indicating disconnection,
- C{connectionLost} is called on it.
- """
- reactor = Clock()
- poller = _ContinuousPolling(reactor)
- desc = Descriptor()
- desc.doWrite = lambda: ConnectionDone()
- poller.addWriter(desc)
- self.assertEqual(desc.events, [])
- reactor.advance(0.001)
- self.assertEqual(desc.events, ["lost"])
- def test_removeAll(self):
- """
- L{_ContinuousPolling.removeAll} removes all descriptors and returns
- the readers and writers.
- """
- poller = _ContinuousPolling(Clock())
- reader = object()
- writer = object()
- both = object()
- poller.addReader(reader)
- poller.addReader(both)
- poller.addWriter(writer)
- poller.addWriter(both)
- removed = poller.removeAll()
- self.assertEqual(poller.getReaders(), [])
- self.assertEqual(poller.getWriters(), [])
- self.assertEqual(len(removed), 3)
- self.assertEqual(set(removed), set([reader, writer, both]))
- def test_getReaders(self):
- """
- L{_ContinuousPolling.getReaders} returns a list of the read
- descriptors.
- """
- poller = _ContinuousPolling(Clock())
- reader = object()
- poller.addReader(reader)
- self.assertIn(reader, poller.getReaders())
- def test_getWriters(self):
- """
- L{_ContinuousPolling.getWriters} returns a list of the write
- descriptors.
- """
- poller = _ContinuousPolling(Clock())
- writer = object()
- poller.addWriter(writer)
- self.assertIn(writer, poller.getWriters())
- if _ContinuousPolling is None:
- skip = "epoll not supported in this environment."
|