tcpserver_test.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. from __future__ import absolute_import, division, print_function
  2. import socket
  3. import subprocess
  4. import sys
  5. import textwrap
  6. from tornado.escape import utf8, to_unicode
  7. from tornado import gen
  8. from tornado.iostream import IOStream
  9. from tornado.log import app_log
  10. from tornado.stack_context import NullContext
  11. from tornado.tcpserver import TCPServer
  12. from tornado.test.util import skipBefore35, skipIfNonUnix, exec_test, unittest
  13. from tornado.testing import AsyncTestCase, ExpectLog, bind_unused_port, gen_test
  14. class TCPServerTest(AsyncTestCase):
  15. @gen_test
  16. def test_handle_stream_coroutine_logging(self):
  17. # handle_stream may be a coroutine and any exception in its
  18. # Future will be logged.
  19. class TestServer(TCPServer):
  20. @gen.coroutine
  21. def handle_stream(self, stream, address):
  22. yield stream.read_bytes(len(b'hello'))
  23. stream.close()
  24. 1 / 0
  25. server = client = None
  26. try:
  27. sock, port = bind_unused_port()
  28. with NullContext():
  29. server = TestServer()
  30. server.add_socket(sock)
  31. client = IOStream(socket.socket())
  32. with ExpectLog(app_log, "Exception in callback"):
  33. yield client.connect(('localhost', port))
  34. yield client.write(b'hello')
  35. yield client.read_until_close()
  36. yield gen.moment
  37. finally:
  38. if server is not None:
  39. server.stop()
  40. if client is not None:
  41. client.close()
  42. @skipBefore35
  43. @gen_test
  44. def test_handle_stream_native_coroutine(self):
  45. # handle_stream may be a native coroutine.
  46. namespace = exec_test(globals(), locals(), """
  47. class TestServer(TCPServer):
  48. async def handle_stream(self, stream, address):
  49. stream.write(b'data')
  50. stream.close()
  51. """)
  52. sock, port = bind_unused_port()
  53. server = namespace['TestServer']()
  54. server.add_socket(sock)
  55. client = IOStream(socket.socket())
  56. yield client.connect(('localhost', port))
  57. result = yield client.read_until_close()
  58. self.assertEqual(result, b'data')
  59. server.stop()
  60. client.close()
  61. def test_stop_twice(self):
  62. sock, port = bind_unused_port()
  63. server = TCPServer()
  64. server.add_socket(sock)
  65. server.stop()
  66. server.stop()
  67. @gen_test
  68. def test_stop_in_callback(self):
  69. # Issue #2069: calling server.stop() in a loop callback should not
  70. # raise EBADF when the loop handles other server connection
  71. # requests in the same loop iteration
  72. class TestServer(TCPServer):
  73. @gen.coroutine
  74. def handle_stream(self, stream, address):
  75. server.stop()
  76. yield stream.read_until_close()
  77. sock, port = bind_unused_port()
  78. server = TestServer()
  79. server.add_socket(sock)
  80. server_addr = ('localhost', port)
  81. N = 40
  82. clients = [IOStream(socket.socket()) for i in range(N)]
  83. connected_clients = []
  84. @gen.coroutine
  85. def connect(c):
  86. try:
  87. yield c.connect(server_addr)
  88. except EnvironmentError:
  89. pass
  90. else:
  91. connected_clients.append(c)
  92. yield [connect(c) for c in clients]
  93. self.assertGreater(len(connected_clients), 0,
  94. "all clients failed connecting")
  95. try:
  96. if len(connected_clients) == N:
  97. # Ideally we'd make the test deterministic, but we're testing
  98. # for a race condition in combination with the system's TCP stack...
  99. self.skipTest("at least one client should fail connecting "
  100. "for the test to be meaningful")
  101. finally:
  102. for c in connected_clients:
  103. c.close()
  104. # Here tearDown() would re-raise the EBADF encountered in the IO loop
  105. @skipIfNonUnix
  106. class TestMultiprocess(unittest.TestCase):
  107. # These tests verify that the two multiprocess examples from the
  108. # TCPServer docs work. Both tests start a server with three worker
  109. # processes, each of which prints its task id to stdout (a single
  110. # byte, so we don't have to worry about atomicity of the shared
  111. # stdout stream) and then exits.
  112. def run_subproc(self, code):
  113. proc = subprocess.Popen(sys.executable,
  114. stdin=subprocess.PIPE,
  115. stdout=subprocess.PIPE)
  116. proc.stdin.write(utf8(code))
  117. proc.stdin.close()
  118. proc.wait()
  119. stdout = proc.stdout.read()
  120. proc.stdout.close()
  121. if proc.returncode != 0:
  122. raise RuntimeError("Process returned %d. stdout=%r" % (
  123. proc.returncode, stdout))
  124. return to_unicode(stdout)
  125. def test_single(self):
  126. # As a sanity check, run the single-process version through this test
  127. # harness too.
  128. code = textwrap.dedent("""
  129. from __future__ import print_function
  130. from tornado.ioloop import IOLoop
  131. from tornado.tcpserver import TCPServer
  132. server = TCPServer()
  133. server.listen(0, address='127.0.0.1')
  134. IOLoop.current().run_sync(lambda: None)
  135. print('012', end='')
  136. """)
  137. out = self.run_subproc(code)
  138. self.assertEqual(''.join(sorted(out)), "012")
  139. def test_simple(self):
  140. code = textwrap.dedent("""
  141. from __future__ import print_function
  142. from tornado.ioloop import IOLoop
  143. from tornado.process import task_id
  144. from tornado.tcpserver import TCPServer
  145. server = TCPServer()
  146. server.bind(0, address='127.0.0.1')
  147. server.start(3)
  148. IOLoop.current().run_sync(lambda: None)
  149. print(task_id(), end='')
  150. """)
  151. out = self.run_subproc(code)
  152. self.assertEqual(''.join(sorted(out)), "012")
  153. def test_advanced(self):
  154. code = textwrap.dedent("""
  155. from __future__ import print_function
  156. from tornado.ioloop import IOLoop
  157. from tornado.netutil import bind_sockets
  158. from tornado.process import fork_processes, task_id
  159. from tornado.ioloop import IOLoop
  160. from tornado.tcpserver import TCPServer
  161. sockets = bind_sockets(0, address='127.0.0.1')
  162. fork_processes(3)
  163. server = TCPServer()
  164. server.add_sockets(sockets)
  165. IOLoop.current().run_sync(lambda: None)
  166. print(task_id(), end='')
  167. """)
  168. out = self.run_subproc(code)
  169. self.assertEqual(''.join(sorted(out)), "012")