123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- from __future__ import absolute_import, division, print_function
- import errno
- import os
- import signal
- import socket
- from subprocess import Popen
- import sys
- import time
- from tornado.netutil import (
- BlockingResolver, OverrideResolver, ThreadedResolver, is_valid_ip, bind_sockets
- )
- from tornado.stack_context import ExceptionStackContext
- from tornado.testing import AsyncTestCase, gen_test, bind_unused_port
- from tornado.test.util import unittest, skipIfNoNetwork, ignore_deprecation
- try:
- from concurrent import futures
- except ImportError:
- futures = None
- try:
- import pycares # type: ignore
- except ImportError:
- pycares = None
- else:
- from tornado.platform.caresresolver import CaresResolver
- try:
- import twisted # type: ignore
- import twisted.names # type: ignore
- except ImportError:
- twisted = None
- else:
- from tornado.platform.twisted import TwistedResolver
- class _ResolverTestMixin(object):
- def test_localhost(self):
- with ignore_deprecation():
- self.resolver.resolve('localhost', 80, callback=self.stop)
- result = self.wait()
- self.assertIn((socket.AF_INET, ('127.0.0.1', 80)), result)
- @gen_test
- def test_future_interface(self):
- addrinfo = yield self.resolver.resolve('localhost', 80,
- socket.AF_UNSPEC)
- self.assertIn((socket.AF_INET, ('127.0.0.1', 80)),
- addrinfo)
- # It is impossible to quickly and consistently generate an error in name
- # resolution, so test this case separately, using mocks as needed.
- class _ResolverErrorTestMixin(object):
- def test_bad_host(self):
- def handler(exc_typ, exc_val, exc_tb):
- self.stop(exc_val)
- return True # Halt propagation.
- with ignore_deprecation():
- with ExceptionStackContext(handler):
- self.resolver.resolve('an invalid domain', 80, callback=self.stop)
- result = self.wait()
- self.assertIsInstance(result, Exception)
- @gen_test
- def test_future_interface_bad_host(self):
- with self.assertRaises(IOError):
- yield self.resolver.resolve('an invalid domain', 80,
- socket.AF_UNSPEC)
- def _failing_getaddrinfo(*args):
- """Dummy implementation of getaddrinfo for use in mocks"""
- raise socket.gaierror(errno.EIO, "mock: lookup failed")
- @skipIfNoNetwork
- class BlockingResolverTest(AsyncTestCase, _ResolverTestMixin):
- def setUp(self):
- super(BlockingResolverTest, self).setUp()
- self.resolver = BlockingResolver()
- # getaddrinfo-based tests need mocking to reliably generate errors;
- # some configurations are slow to produce errors and take longer than
- # our default timeout.
- class BlockingResolverErrorTest(AsyncTestCase, _ResolverErrorTestMixin):
- def setUp(self):
- super(BlockingResolverErrorTest, self).setUp()
- self.resolver = BlockingResolver()
- self.real_getaddrinfo = socket.getaddrinfo
- socket.getaddrinfo = _failing_getaddrinfo
- def tearDown(self):
- socket.getaddrinfo = self.real_getaddrinfo
- super(BlockingResolverErrorTest, self).tearDown()
- class OverrideResolverTest(AsyncTestCase, _ResolverTestMixin):
- def setUp(self):
- super(OverrideResolverTest, self).setUp()
- mapping = {
- ('google.com', 80): ('1.2.3.4', 80),
- ('google.com', 80, socket.AF_INET): ('1.2.3.4', 80),
- ('google.com', 80, socket.AF_INET6): ('2a02:6b8:7c:40c:c51e:495f:e23a:3', 80)
- }
- self.resolver = OverrideResolver(BlockingResolver(), mapping)
- @gen_test
- def test_resolve_multiaddr(self):
- result = yield self.resolver.resolve('google.com', 80, socket.AF_INET)
- self.assertIn((socket.AF_INET, ('1.2.3.4', 80)), result)
- result = yield self.resolver.resolve('google.com', 80, socket.AF_INET6)
- self.assertIn((socket.AF_INET6, ('2a02:6b8:7c:40c:c51e:495f:e23a:3', 80, 0, 0)), result)
- @skipIfNoNetwork
- @unittest.skipIf(futures is None, "futures module not present")
- class ThreadedResolverTest(AsyncTestCase, _ResolverTestMixin):
- def setUp(self):
- super(ThreadedResolverTest, self).setUp()
- self.resolver = ThreadedResolver()
- def tearDown(self):
- self.resolver.close()
- super(ThreadedResolverTest, self).tearDown()
- class ThreadedResolverErrorTest(AsyncTestCase, _ResolverErrorTestMixin):
- def setUp(self):
- super(ThreadedResolverErrorTest, self).setUp()
- self.resolver = BlockingResolver()
- self.real_getaddrinfo = socket.getaddrinfo
- socket.getaddrinfo = _failing_getaddrinfo
- def tearDown(self):
- socket.getaddrinfo = self.real_getaddrinfo
- super(ThreadedResolverErrorTest, self).tearDown()
- @skipIfNoNetwork
- @unittest.skipIf(futures is None, "futures module not present")
- @unittest.skipIf(sys.platform == 'win32', "preexec_fn not available on win32")
- class ThreadedResolverImportTest(unittest.TestCase):
- def test_import(self):
- TIMEOUT = 5
- # Test for a deadlock when importing a module that runs the
- # ThreadedResolver at import-time. See resolve_test.py for
- # full explanation.
- command = [
- sys.executable,
- '-c',
- 'import tornado.test.resolve_test_helper']
- start = time.time()
- popen = Popen(command, preexec_fn=lambda: signal.alarm(TIMEOUT))
- while time.time() - start < TIMEOUT:
- return_code = popen.poll()
- if return_code is not None:
- self.assertEqual(0, return_code)
- return # Success.
- time.sleep(0.05)
- self.fail("import timed out")
- # We do not test errors with CaresResolver:
- # Some DNS-hijacking ISPs (e.g. Time Warner) return non-empty results
- # with an NXDOMAIN status code. Most resolvers treat this as an error;
- # C-ares returns the results, making the "bad_host" tests unreliable.
- # C-ares will try to resolve even malformed names, such as the
- # name with spaces used in this test.
- @skipIfNoNetwork
- @unittest.skipIf(pycares is None, "pycares module not present")
- class CaresResolverTest(AsyncTestCase, _ResolverTestMixin):
- def setUp(self):
- super(CaresResolverTest, self).setUp()
- self.resolver = CaresResolver()
- # TwistedResolver produces consistent errors in our test cases so we
- # could test the regular and error cases in the same class. However,
- # in the error cases it appears that cleanup of socket objects is
- # handled asynchronously and occasionally results in "unclosed socket"
- # warnings if not given time to shut down (and there is no way to
- # explicitly shut it down). This makes the test flaky, so we do not
- # test error cases here.
- @skipIfNoNetwork
- @unittest.skipIf(twisted is None, "twisted module not present")
- @unittest.skipIf(getattr(twisted, '__version__', '0.0') < "12.1", "old version of twisted")
- class TwistedResolverTest(AsyncTestCase, _ResolverTestMixin):
- def setUp(self):
- super(TwistedResolverTest, self).setUp()
- self.resolver = TwistedResolver()
- class IsValidIPTest(unittest.TestCase):
- def test_is_valid_ip(self):
- self.assertTrue(is_valid_ip('127.0.0.1'))
- self.assertTrue(is_valid_ip('4.4.4.4'))
- self.assertTrue(is_valid_ip('::1'))
- self.assertTrue(is_valid_ip('2620:0:1cfe:face:b00c::3'))
- self.assertTrue(not is_valid_ip('www.google.com'))
- self.assertTrue(not is_valid_ip('localhost'))
- self.assertTrue(not is_valid_ip('4.4.4.4<'))
- self.assertTrue(not is_valid_ip(' 127.0.0.1'))
- self.assertTrue(not is_valid_ip(''))
- self.assertTrue(not is_valid_ip(' '))
- self.assertTrue(not is_valid_ip('\n'))
- self.assertTrue(not is_valid_ip('\x00'))
- class TestPortAllocation(unittest.TestCase):
- def test_same_port_allocation(self):
- if 'TRAVIS' in os.environ:
- self.skipTest("dual-stack servers often have port conflicts on travis")
- sockets = bind_sockets(None, 'localhost')
- try:
- port = sockets[0].getsockname()[1]
- self.assertTrue(all(s.getsockname()[1] == port
- for s in sockets[1:]))
- finally:
- for sock in sockets:
- sock.close()
- @unittest.skipIf(not hasattr(socket, "SO_REUSEPORT"), "SO_REUSEPORT is not supported")
- def test_reuse_port(self):
- sockets = []
- socket, port = bind_unused_port(reuse_port=True)
- try:
- sockets = bind_sockets(port, '127.0.0.1', reuse_port=True)
- self.assertTrue(all(s.getsockname()[1] == port for s in sockets))
- finally:
- socket.close()
- for sock in sockets:
- sock.close()
|