tcpclient_test.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. #
  2. # Copyright 2014 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. from __future__ import absolute_import, division, print_function
  16. from contextlib import closing
  17. import os
  18. import socket
  19. from tornado.concurrent import Future
  20. from tornado.netutil import bind_sockets, Resolver
  21. from tornado.queues import Queue
  22. from tornado.tcpclient import TCPClient, _Connector
  23. from tornado.tcpserver import TCPServer
  24. from tornado.testing import AsyncTestCase, gen_test
  25. from tornado.test.util import skipIfNoIPv6, unittest, refusing_port, skipIfNonUnix
  26. from tornado.gen import TimeoutError
  27. # Fake address families for testing. Used in place of AF_INET
  28. # and AF_INET6 because some installations do not have AF_INET6.
  29. AF1, AF2 = 1, 2
  30. class TestTCPServer(TCPServer):
  31. def __init__(self, family):
  32. super(TestTCPServer, self).__init__()
  33. self.streams = []
  34. self.queue = Queue()
  35. sockets = bind_sockets(None, 'localhost', family)
  36. self.add_sockets(sockets)
  37. self.port = sockets[0].getsockname()[1]
  38. def handle_stream(self, stream, address):
  39. self.streams.append(stream)
  40. self.queue.put(stream)
  41. def stop(self):
  42. super(TestTCPServer, self).stop()
  43. for stream in self.streams:
  44. stream.close()
  45. class TCPClientTest(AsyncTestCase):
  46. def setUp(self):
  47. super(TCPClientTest, self).setUp()
  48. self.server = None
  49. self.client = TCPClient()
  50. def start_server(self, family):
  51. if family == socket.AF_UNSPEC and 'TRAVIS' in os.environ:
  52. self.skipTest("dual-stack servers often have port conflicts on travis")
  53. self.server = TestTCPServer(family)
  54. return self.server.port
  55. def stop_server(self):
  56. if self.server is not None:
  57. self.server.stop()
  58. self.server = None
  59. def tearDown(self):
  60. self.client.close()
  61. self.stop_server()
  62. super(TCPClientTest, self).tearDown()
  63. def skipIfLocalhostV4(self):
  64. # The port used here doesn't matter, but some systems require it
  65. # to be non-zero if we do not also pass AI_PASSIVE.
  66. addrinfo = self.io_loop.run_sync(lambda: Resolver().resolve('localhost', 80))
  67. families = set(addr[0] for addr in addrinfo)
  68. if socket.AF_INET6 not in families:
  69. self.skipTest("localhost does not resolve to ipv6")
  70. @gen_test
  71. def do_test_connect(self, family, host, source_ip=None, source_port=None):
  72. port = self.start_server(family)
  73. stream = yield self.client.connect(host, port,
  74. source_ip=source_ip,
  75. source_port=source_port)
  76. server_stream = yield self.server.queue.get()
  77. with closing(stream):
  78. stream.write(b"hello")
  79. data = yield server_stream.read_bytes(5)
  80. self.assertEqual(data, b"hello")
  81. def test_connect_ipv4_ipv4(self):
  82. self.do_test_connect(socket.AF_INET, '127.0.0.1')
  83. def test_connect_ipv4_dual(self):
  84. self.do_test_connect(socket.AF_INET, 'localhost')
  85. @skipIfNoIPv6
  86. def test_connect_ipv6_ipv6(self):
  87. self.skipIfLocalhostV4()
  88. self.do_test_connect(socket.AF_INET6, '::1')
  89. @skipIfNoIPv6
  90. def test_connect_ipv6_dual(self):
  91. self.skipIfLocalhostV4()
  92. if Resolver.configured_class().__name__.endswith('TwistedResolver'):
  93. self.skipTest('TwistedResolver does not support multiple addresses')
  94. self.do_test_connect(socket.AF_INET6, 'localhost')
  95. def test_connect_unspec_ipv4(self):
  96. self.do_test_connect(socket.AF_UNSPEC, '127.0.0.1')
  97. @skipIfNoIPv6
  98. def test_connect_unspec_ipv6(self):
  99. self.skipIfLocalhostV4()
  100. self.do_test_connect(socket.AF_UNSPEC, '::1')
  101. def test_connect_unspec_dual(self):
  102. self.do_test_connect(socket.AF_UNSPEC, 'localhost')
  103. @gen_test
  104. def test_refused_ipv4(self):
  105. cleanup_func, port = refusing_port()
  106. self.addCleanup(cleanup_func)
  107. with self.assertRaises(IOError):
  108. yield self.client.connect('127.0.0.1', port)
  109. def test_source_ip_fail(self):
  110. '''
  111. Fail when trying to use the source IP Address '8.8.8.8'.
  112. '''
  113. self.assertRaises(socket.error,
  114. self.do_test_connect,
  115. socket.AF_INET,
  116. '127.0.0.1',
  117. source_ip='8.8.8.8')
  118. def test_source_ip_success(self):
  119. '''
  120. Success when trying to use the source IP Address '127.0.0.1'
  121. '''
  122. self.do_test_connect(socket.AF_INET, '127.0.0.1', source_ip='127.0.0.1')
  123. @skipIfNonUnix
  124. def test_source_port_fail(self):
  125. '''
  126. Fail when trying to use source port 1.
  127. '''
  128. self.assertRaises(socket.error,
  129. self.do_test_connect,
  130. socket.AF_INET,
  131. '127.0.0.1',
  132. source_port=1)
  133. @gen_test
  134. def test_connect_timeout(self):
  135. timeout = 0.05
  136. class TimeoutResolver(Resolver):
  137. def resolve(self, *args, **kwargs):
  138. return Future() # never completes
  139. with self.assertRaises(TimeoutError):
  140. yield TCPClient(resolver=TimeoutResolver()).connect(
  141. '1.2.3.4', 12345, timeout=timeout)
  142. class TestConnectorSplit(unittest.TestCase):
  143. def test_one_family(self):
  144. # These addresses aren't in the right format, but split doesn't care.
  145. primary, secondary = _Connector.split(
  146. [(AF1, 'a'),
  147. (AF1, 'b')])
  148. self.assertEqual(primary, [(AF1, 'a'),
  149. (AF1, 'b')])
  150. self.assertEqual(secondary, [])
  151. def test_mixed(self):
  152. primary, secondary = _Connector.split(
  153. [(AF1, 'a'),
  154. (AF2, 'b'),
  155. (AF1, 'c'),
  156. (AF2, 'd')])
  157. self.assertEqual(primary, [(AF1, 'a'), (AF1, 'c')])
  158. self.assertEqual(secondary, [(AF2, 'b'), (AF2, 'd')])
  159. class ConnectorTest(AsyncTestCase):
  160. class FakeStream(object):
  161. def __init__(self):
  162. self.closed = False
  163. def close(self):
  164. self.closed = True
  165. def setUp(self):
  166. super(ConnectorTest, self).setUp()
  167. self.connect_futures = {}
  168. self.streams = {}
  169. self.addrinfo = [(AF1, 'a'), (AF1, 'b'),
  170. (AF2, 'c'), (AF2, 'd')]
  171. def tearDown(self):
  172. # Unless explicitly checked (and popped) in the test, we shouldn't
  173. # be closing any streams
  174. for stream in self.streams.values():
  175. self.assertFalse(stream.closed)
  176. super(ConnectorTest, self).tearDown()
  177. def create_stream(self, af, addr):
  178. stream = ConnectorTest.FakeStream()
  179. self.streams[addr] = stream
  180. future = Future()
  181. self.connect_futures[(af, addr)] = future
  182. return stream, future
  183. def assert_pending(self, *keys):
  184. self.assertEqual(sorted(self.connect_futures.keys()), sorted(keys))
  185. def resolve_connect(self, af, addr, success):
  186. future = self.connect_futures.pop((af, addr))
  187. if success:
  188. future.set_result(self.streams[addr])
  189. else:
  190. self.streams.pop(addr)
  191. future.set_exception(IOError())
  192. # Run the loop to allow callbacks to be run.
  193. self.io_loop.add_callback(self.stop)
  194. self.wait()
  195. def assert_connector_streams_closed(self, conn):
  196. for stream in conn.streams:
  197. self.assertTrue(stream.closed)
  198. def start_connect(self, addrinfo):
  199. conn = _Connector(addrinfo, self.create_stream)
  200. # Give it a huge timeout; we'll trigger timeouts manually.
  201. future = conn.start(3600, connect_timeout=self.io_loop.time() + 3600)
  202. return conn, future
  203. def test_immediate_success(self):
  204. conn, future = self.start_connect(self.addrinfo)
  205. self.assertEqual(list(self.connect_futures.keys()),
  206. [(AF1, 'a')])
  207. self.resolve_connect(AF1, 'a', True)
  208. self.assertEqual(future.result(), (AF1, 'a', self.streams['a']))
  209. def test_immediate_failure(self):
  210. # Fail with just one address.
  211. conn, future = self.start_connect([(AF1, 'a')])
  212. self.assert_pending((AF1, 'a'))
  213. self.resolve_connect(AF1, 'a', False)
  214. self.assertRaises(IOError, future.result)
  215. def test_one_family_second_try(self):
  216. conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
  217. self.assert_pending((AF1, 'a'))
  218. self.resolve_connect(AF1, 'a', False)
  219. self.assert_pending((AF1, 'b'))
  220. self.resolve_connect(AF1, 'b', True)
  221. self.assertEqual(future.result(), (AF1, 'b', self.streams['b']))
  222. def test_one_family_second_try_failure(self):
  223. conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
  224. self.assert_pending((AF1, 'a'))
  225. self.resolve_connect(AF1, 'a', False)
  226. self.assert_pending((AF1, 'b'))
  227. self.resolve_connect(AF1, 'b', False)
  228. self.assertRaises(IOError, future.result)
  229. def test_one_family_second_try_timeout(self):
  230. conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
  231. self.assert_pending((AF1, 'a'))
  232. # trigger the timeout while the first lookup is pending;
  233. # nothing happens.
  234. conn.on_timeout()
  235. self.assert_pending((AF1, 'a'))
  236. self.resolve_connect(AF1, 'a', False)
  237. self.assert_pending((AF1, 'b'))
  238. self.resolve_connect(AF1, 'b', True)
  239. self.assertEqual(future.result(), (AF1, 'b', self.streams['b']))
  240. def test_two_families_immediate_failure(self):
  241. conn, future = self.start_connect(self.addrinfo)
  242. self.assert_pending((AF1, 'a'))
  243. self.resolve_connect(AF1, 'a', False)
  244. self.assert_pending((AF1, 'b'), (AF2, 'c'))
  245. self.resolve_connect(AF1, 'b', False)
  246. self.resolve_connect(AF2, 'c', True)
  247. self.assertEqual(future.result(), (AF2, 'c', self.streams['c']))
  248. def test_two_families_timeout(self):
  249. conn, future = self.start_connect(self.addrinfo)
  250. self.assert_pending((AF1, 'a'))
  251. conn.on_timeout()
  252. self.assert_pending((AF1, 'a'), (AF2, 'c'))
  253. self.resolve_connect(AF2, 'c', True)
  254. self.assertEqual(future.result(), (AF2, 'c', self.streams['c']))
  255. # resolving 'a' after the connection has completed doesn't start 'b'
  256. self.resolve_connect(AF1, 'a', False)
  257. self.assert_pending()
  258. def test_success_after_timeout(self):
  259. conn, future = self.start_connect(self.addrinfo)
  260. self.assert_pending((AF1, 'a'))
  261. conn.on_timeout()
  262. self.assert_pending((AF1, 'a'), (AF2, 'c'))
  263. self.resolve_connect(AF1, 'a', True)
  264. self.assertEqual(future.result(), (AF1, 'a', self.streams['a']))
  265. # resolving 'c' after completion closes the connection.
  266. self.resolve_connect(AF2, 'c', True)
  267. self.assertTrue(self.streams.pop('c').closed)
  268. def test_all_fail(self):
  269. conn, future = self.start_connect(self.addrinfo)
  270. self.assert_pending((AF1, 'a'))
  271. conn.on_timeout()
  272. self.assert_pending((AF1, 'a'), (AF2, 'c'))
  273. self.resolve_connect(AF2, 'c', False)
  274. self.assert_pending((AF1, 'a'), (AF2, 'd'))
  275. self.resolve_connect(AF2, 'd', False)
  276. # one queue is now empty
  277. self.assert_pending((AF1, 'a'))
  278. self.resolve_connect(AF1, 'a', False)
  279. self.assert_pending((AF1, 'b'))
  280. self.assertFalse(future.done())
  281. self.resolve_connect(AF1, 'b', False)
  282. self.assertRaises(IOError, future.result)
  283. def test_one_family_timeout_after_connect_timeout(self):
  284. conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
  285. self.assert_pending((AF1, 'a'))
  286. conn.on_connect_timeout()
  287. # the connector will close all streams on connect timeout, we
  288. # should explicitly pop the connect_future.
  289. self.connect_futures.pop((AF1, 'a'))
  290. self.assertTrue(self.streams.pop('a').closed)
  291. conn.on_timeout()
  292. # if the future is set with TimeoutError, we will not iterate next
  293. # possible address.
  294. self.assert_pending()
  295. self.assertEqual(len(conn.streams), 1)
  296. self.assert_connector_streams_closed(conn)
  297. self.assertRaises(TimeoutError, future.result)
  298. def test_one_family_success_before_connect_timeout(self):
  299. conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
  300. self.assert_pending((AF1, 'a'))
  301. self.resolve_connect(AF1, 'a', True)
  302. conn.on_connect_timeout()
  303. self.assert_pending()
  304. self.assertEqual(self.streams['a'].closed, False)
  305. # success stream will be pop
  306. self.assertEqual(len(conn.streams), 0)
  307. # streams in connector should be closed after connect timeout
  308. self.assert_connector_streams_closed(conn)
  309. self.assertEqual(future.result(), (AF1, 'a', self.streams['a']))
  310. def test_one_family_second_try_after_connect_timeout(self):
  311. conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
  312. self.assert_pending((AF1, 'a'))
  313. self.resolve_connect(AF1, 'a', False)
  314. self.assert_pending((AF1, 'b'))
  315. conn.on_connect_timeout()
  316. self.connect_futures.pop((AF1, 'b'))
  317. self.assertTrue(self.streams.pop('b').closed)
  318. self.assert_pending()
  319. self.assertEqual(len(conn.streams), 2)
  320. self.assert_connector_streams_closed(conn)
  321. self.assertRaises(TimeoutError, future.result)
  322. def test_one_family_second_try_failure_before_connect_timeout(self):
  323. conn, future = self.start_connect([(AF1, 'a'), (AF1, 'b')])
  324. self.assert_pending((AF1, 'a'))
  325. self.resolve_connect(AF1, 'a', False)
  326. self.assert_pending((AF1, 'b'))
  327. self.resolve_connect(AF1, 'b', False)
  328. conn.on_connect_timeout()
  329. self.assert_pending()
  330. self.assertEqual(len(conn.streams), 2)
  331. self.assert_connector_streams_closed(conn)
  332. self.assertRaises(IOError, future.result)
  333. def test_two_family_timeout_before_connect_timeout(self):
  334. conn, future = self.start_connect(self.addrinfo)
  335. self.assert_pending((AF1, 'a'))
  336. conn.on_timeout()
  337. self.assert_pending((AF1, 'a'), (AF2, 'c'))
  338. conn.on_connect_timeout()
  339. self.connect_futures.pop((AF1, 'a'))
  340. self.assertTrue(self.streams.pop('a').closed)
  341. self.connect_futures.pop((AF2, 'c'))
  342. self.assertTrue(self.streams.pop('c').closed)
  343. self.assert_pending()
  344. self.assertEqual(len(conn.streams), 2)
  345. self.assert_connector_streams_closed(conn)
  346. self.assertRaises(TimeoutError, future.result)
  347. def test_two_family_success_after_timeout(self):
  348. conn, future = self.start_connect(self.addrinfo)
  349. self.assert_pending((AF1, 'a'))
  350. conn.on_timeout()
  351. self.assert_pending((AF1, 'a'), (AF2, 'c'))
  352. self.resolve_connect(AF1, 'a', True)
  353. # if one of streams succeed, connector will close all other streams
  354. self.connect_futures.pop((AF2, 'c'))
  355. self.assertTrue(self.streams.pop('c').closed)
  356. self.assert_pending()
  357. self.assertEqual(len(conn.streams), 1)
  358. self.assert_connector_streams_closed(conn)
  359. self.assertEqual(future.result(), (AF1, 'a', self.streams['a']))
  360. def test_two_family_timeout_after_connect_timeout(self):
  361. conn, future = self.start_connect(self.addrinfo)
  362. self.assert_pending((AF1, 'a'))
  363. conn.on_connect_timeout()
  364. self.connect_futures.pop((AF1, 'a'))
  365. self.assertTrue(self.streams.pop('a').closed)
  366. self.assert_pending()
  367. conn.on_timeout()
  368. # if the future is set with TimeoutError, connector will not
  369. # trigger secondary address.
  370. self.assert_pending()
  371. self.assertEqual(len(conn.streams), 1)
  372. self.assert_connector_streams_closed(conn)
  373. self.assertRaises(TimeoutError, future.result)