123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.internet.base}.
- """
- import socket
- try:
- from Queue import Queue
- except ImportError:
- from queue import Queue
- from zope.interface import implementer
- from twisted.python.threadpool import ThreadPool
- from twisted.internet.interfaces import (IReactorTime, IReactorThreads,
- IResolverSimple)
- from twisted.internet.error import DNSLookupError
- from twisted.internet._resolver import FirstOneWins
- from twisted.internet.defer import Deferred
- from twisted.internet.base import ThreadedResolver, DelayedCall, ReactorBase
- from twisted.internet.task import Clock
- from twisted.trial.unittest import TestCase
- @implementer(IReactorTime, IReactorThreads)
- class FakeReactor(object):
- """
- A fake reactor implementation which just supports enough reactor APIs for
- L{ThreadedResolver}.
- """
- def __init__(self):
- self._clock = Clock()
- self.callLater = self._clock.callLater
- self._threadpool = ThreadPool()
- self._threadpool.start()
- self.getThreadPool = lambda: self._threadpool
- self._threadCalls = Queue()
- def callFromThread(self, f, *args, **kwargs):
- self._threadCalls.put((f, args, kwargs))
- def _runThreadCalls(self):
- f, args, kwargs = self._threadCalls.get()
- f(*args, **kwargs)
- def _stop(self):
- self._threadpool.stop()
- class ThreadedResolverTests(TestCase):
- """
- Tests for L{ThreadedResolver}.
- """
- def test_success(self):
- """
- L{ThreadedResolver.getHostByName} returns a L{Deferred} which fires
- with the value returned by the call to L{socket.gethostbyname} in the
- threadpool of the reactor passed to L{ThreadedResolver.__init__}.
- """
- ip = "10.0.0.17"
- name = "foo.bar.example.com"
- timeout = 30
- reactor = FakeReactor()
- self.addCleanup(reactor._stop)
- lookedUp = []
- resolvedTo = []
- def fakeGetHostByName(name):
- lookedUp.append(name)
- return ip
- self.patch(socket, 'gethostbyname', fakeGetHostByName)
- resolver = ThreadedResolver(reactor)
- d = resolver.getHostByName(name, (timeout,))
- d.addCallback(resolvedTo.append)
- reactor._runThreadCalls()
- self.assertEqual(lookedUp, [name])
- self.assertEqual(resolvedTo, [ip])
- # Make sure that any timeout-related stuff gets cleaned up.
- reactor._clock.advance(timeout + 1)
- self.assertEqual(reactor._clock.calls, [])
- def test_failure(self):
- """
- L{ThreadedResolver.getHostByName} returns a L{Deferred} which fires a
- L{Failure} if the call to L{socket.gethostbyname} raises an exception.
- """
- timeout = 30
- reactor = FakeReactor()
- self.addCleanup(reactor._stop)
- def fakeGetHostByName(name):
- raise IOError("ENOBUFS (this is a funny joke)")
- self.patch(socket, 'gethostbyname', fakeGetHostByName)
- failedWith = []
- resolver = ThreadedResolver(reactor)
- d = resolver.getHostByName("some.name", (timeout,))
- self.assertFailure(d, DNSLookupError)
- d.addCallback(failedWith.append)
- reactor._runThreadCalls()
- self.assertEqual(len(failedWith), 1)
- # Make sure that any timeout-related stuff gets cleaned up.
- reactor._clock.advance(timeout + 1)
- self.assertEqual(reactor._clock.calls, [])
- def test_timeout(self):
- """
- If L{socket.gethostbyname} does not complete before the specified
- timeout elapsed, the L{Deferred} returned by
- L{ThreadedResolver.getHostByName} fails with L{DNSLookupError}.
- """
- timeout = 10
- reactor = FakeReactor()
- self.addCleanup(reactor._stop)
- result = Queue()
- def fakeGetHostByName(name):
- raise result.get()
- self.patch(socket, 'gethostbyname', fakeGetHostByName)
- failedWith = []
- resolver = ThreadedResolver(reactor)
- d = resolver.getHostByName("some.name", (timeout,))
- self.assertFailure(d, DNSLookupError)
- d.addCallback(failedWith.append)
- reactor._clock.advance(timeout - 1)
- self.assertEqual(failedWith, [])
- reactor._clock.advance(1)
- self.assertEqual(len(failedWith), 1)
- # Eventually the socket.gethostbyname does finish - in this case, with
- # an exception. Nobody cares, though.
- result.put(IOError("The I/O was errorful"))
- def test_resolverGivenStr(self):
- """
- L{ThreadedResolver.getHostByName} is passed L{str}, encoded using IDNA
- if required.
- """
- calls = []
- @implementer(IResolverSimple)
- class FakeResolver(object):
- def getHostByName(self, name, timeouts=()):
- calls.append(name)
- return Deferred()
- class JustEnoughReactor(ReactorBase):
- def installWaker(self):
- pass
- fake = FakeResolver()
- reactor = JustEnoughReactor()
- reactor.installResolver(fake)
- rec = FirstOneWins(Deferred())
- reactor.nameResolver.resolveHostName(
- rec, u"example.example")
- reactor.nameResolver.resolveHostName(
- rec, "example.example")
- reactor.nameResolver.resolveHostName(
- rec, u"v\xe4\xe4ntynyt.example")
- reactor.nameResolver.resolveHostName(
- rec, u"\u0440\u0444.example")
- reactor.nameResolver.resolveHostName(
- rec, "xn----7sbb4ac0ad0be6cf.xn--p1ai")
- self.assertEqual(len(calls), 5)
- self.assertEqual(list(map(type, calls)), [str]*5)
- self.assertEqual("example.example", calls[0])
- self.assertEqual("example.example", calls[1])
- self.assertEqual("xn--vntynyt-5waa.example", calls[2])
- self.assertEqual("xn--p1ai.example", calls[3])
- self.assertEqual("xn----7sbb4ac0ad0be6cf.xn--p1ai", calls[4])
- def nothing():
- """
- Function used by L{DelayedCallTests.test_str}.
- """
- class DelayedCallMixin(object):
- """
- L{DelayedCall}
- """
- def _getDelayedCallAt(self, time):
- """
- Get a L{DelayedCall} instance at a given C{time}.
- @param time: The absolute time at which the returned L{DelayedCall}
- will be scheduled.
- """
- def noop(call):
- pass
- return DelayedCall(time, lambda: None, (), {}, noop, noop, None)
- def setUp(self):
- """
- Create two L{DelayedCall} instanced scheduled to run at different
- times.
- """
- self.zero = self._getDelayedCallAt(0)
- self.one = self._getDelayedCallAt(1)
- def test_str(self):
- """
- The string representation of a L{DelayedCall} instance, as returned by
- L{str}, includes the unsigned id of the instance, as well as its state,
- the function to be called, and the function arguments.
- """
- dc = DelayedCall(12, nothing, (3, ), {"A": 5}, None, None, lambda: 1.5)
- self.assertEqual(
- str(dc),
- "<DelayedCall 0x%x [10.5s] called=0 cancelled=0 nothing(3, A=5)>"
- % (id(dc),))
- def test_lt(self):
- """
- For two instances of L{DelayedCall} C{a} and C{b}, C{a < b} is true
- if and only if C{a} is scheduled to run before C{b}.
- """
- zero, one = self.zero, self.one
- self.assertTrue(zero < one)
- self.assertFalse(one < zero)
- self.assertFalse(zero < zero)
- self.assertFalse(one < one)
- def test_le(self):
- """
- For two instances of L{DelayedCall} C{a} and C{b}, C{a <= b} is true
- if and only if C{a} is scheduled to run before C{b} or at the same
- time as C{b}.
- """
- zero, one = self.zero, self.one
- self.assertTrue(zero <= one)
- self.assertFalse(one <= zero)
- self.assertTrue(zero <= zero)
- self.assertTrue(one <= one)
- def test_gt(self):
- """
- For two instances of L{DelayedCall} C{a} and C{b}, C{a > b} is true
- if and only if C{a} is scheduled to run after C{b}.
- """
- zero, one = self.zero, self.one
- self.assertTrue(one > zero)
- self.assertFalse(zero > one)
- self.assertFalse(zero > zero)
- self.assertFalse(one > one)
- def test_ge(self):
- """
- For two instances of L{DelayedCall} C{a} and C{b}, C{a > b} is true
- if and only if C{a} is scheduled to run after C{b} or at the same
- time as C{b}.
- """
- zero, one = self.zero, self.one
- self.assertTrue(one >= zero)
- self.assertFalse(zero >= one)
- self.assertTrue(zero >= zero)
- self.assertTrue(one >= one)
- def test_eq(self):
- """
- A L{DelayedCall} instance is only equal to itself.
- """
- # Explicitly use == here, instead of assertEqual, to be more
- # confident __eq__ is being tested.
- self.assertFalse(self.zero == self.one)
- self.assertTrue(self.zero == self.zero)
- self.assertTrue(self.one == self.one)
- def test_ne(self):
- """
- A L{DelayedCall} instance is not equal to any other object.
- """
- # Explicitly use != here, instead of assertEqual, to be more
- # confident __ne__ is being tested.
- self.assertTrue(self.zero != self.one)
- self.assertFalse(self.zero != self.zero)
- self.assertFalse(self.one != self.one)
- class DelayedCallNoDebugTests(DelayedCallMixin, TestCase):
- """
- L{DelayedCall}
- """
- def setUp(self):
- """
- Turn debug off.
- """
- self.patch(DelayedCall, 'debug', False)
- DelayedCallMixin.setUp(self)
- def test_str(self):
- """
- The string representation of a L{DelayedCall} instance, as returned by
- L{str}, includes the unsigned id of the instance, as well as its state,
- the function to be called, and the function arguments.
- """
- dc = DelayedCall(12, nothing, (3, ), {"A": 5}, None, None, lambda: 1.5)
- expected = (
- "<DelayedCall 0x{:x} [10.5s] called=0 cancelled=0 "
- "nothing(3, A=5)>".format(id(dc)))
- self.assertEqual(str(dc), expected)
- class DelayedCallDebugTests(DelayedCallMixin, TestCase):
- """
- L{DelayedCall}
- """
- def setUp(self):
- """
- Turn debug on.
- """
- self.patch(DelayedCall, 'debug', True)
- DelayedCallMixin.setUp(self)
- def test_str(self):
- """
- The string representation of a L{DelayedCall} instance, as returned by
- L{str}, includes the unsigned id of the instance, as well as its state,
- the function to be called, and the function arguments.
- """
- dc = DelayedCall(12, nothing, (3, ), {"A": 5}, None, None, lambda: 1.5)
- expectedRegexp = (
- "<DelayedCall 0x{:x} \\[10.5s\\] called=0 cancelled=0 "
- "nothing\\(3, A=5\\)\n\n"
- "traceback at creation:".format(id(dc)))
- self.assertRegex(
- str(dc), expectedRegexp)
|