tcpserver.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. #
  2. # Copyright 2011 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """A non-blocking, single-threaded TCP server."""
  16. from __future__ import absolute_import, division, print_function
  17. import errno
  18. import os
  19. import socket
  20. from tornado import gen
  21. from tornado.log import app_log
  22. from tornado.ioloop import IOLoop
  23. from tornado.iostream import IOStream, SSLIOStream
  24. from tornado.netutil import bind_sockets, add_accept_handler, ssl_wrap_socket
  25. from tornado import process
  26. from tornado.util import errno_from_exception
  27. try:
  28. import ssl
  29. except ImportError:
  30. # ssl is not available on Google App Engine.
  31. ssl = None
  32. class TCPServer(object):
  33. r"""A non-blocking, single-threaded TCP server.
  34. To use `TCPServer`, define a subclass which overrides the `handle_stream`
  35. method. For example, a simple echo server could be defined like this::
  36. from tornado.tcpserver import TCPServer
  37. from tornado.iostream import StreamClosedError
  38. from tornado import gen
  39. class EchoServer(TCPServer):
  40. async def handle_stream(self, stream, address):
  41. while True:
  42. try:
  43. data = await stream.read_until(b"\n")
  44. await stream.write(data)
  45. except StreamClosedError:
  46. break
  47. To make this server serve SSL traffic, send the ``ssl_options`` keyword
  48. argument with an `ssl.SSLContext` object. For compatibility with older
  49. versions of Python ``ssl_options`` may also be a dictionary of keyword
  50. arguments for the `ssl.wrap_socket` method.::
  51. ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
  52. ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"),
  53. os.path.join(data_dir, "mydomain.key"))
  54. TCPServer(ssl_options=ssl_ctx)
  55. `TCPServer` initialization follows one of three patterns:
  56. 1. `listen`: simple single-process::
  57. server = TCPServer()
  58. server.listen(8888)
  59. IOLoop.current().start()
  60. 2. `bind`/`start`: simple multi-process::
  61. server = TCPServer()
  62. server.bind(8888)
  63. server.start(0) # Forks multiple sub-processes
  64. IOLoop.current().start()
  65. When using this interface, an `.IOLoop` must *not* be passed
  66. to the `TCPServer` constructor. `start` will always start
  67. the server on the default singleton `.IOLoop`.
  68. 3. `add_sockets`: advanced multi-process::
  69. sockets = bind_sockets(8888)
  70. tornado.process.fork_processes(0)
  71. server = TCPServer()
  72. server.add_sockets(sockets)
  73. IOLoop.current().start()
  74. The `add_sockets` interface is more complicated, but it can be
  75. used with `tornado.process.fork_processes` to give you more
  76. flexibility in when the fork happens. `add_sockets` can
  77. also be used in single-process servers if you want to create
  78. your listening sockets in some way other than
  79. `~tornado.netutil.bind_sockets`.
  80. .. versionadded:: 3.1
  81. The ``max_buffer_size`` argument.
  82. .. versionchanged:: 5.0
  83. The ``io_loop`` argument has been removed.
  84. """
  85. def __init__(self, ssl_options=None, max_buffer_size=None,
  86. read_chunk_size=None):
  87. self.ssl_options = ssl_options
  88. self._sockets = {} # fd -> socket object
  89. self._handlers = {} # fd -> remove_handler callable
  90. self._pending_sockets = []
  91. self._started = False
  92. self._stopped = False
  93. self.max_buffer_size = max_buffer_size
  94. self.read_chunk_size = read_chunk_size
  95. # Verify the SSL options. Otherwise we don't get errors until clients
  96. # connect. This doesn't verify that the keys are legitimate, but
  97. # the SSL module doesn't do that until there is a connected socket
  98. # which seems like too much work
  99. if self.ssl_options is not None and isinstance(self.ssl_options, dict):
  100. # Only certfile is required: it can contain both keys
  101. if 'certfile' not in self.ssl_options:
  102. raise KeyError('missing key "certfile" in ssl_options')
  103. if not os.path.exists(self.ssl_options['certfile']):
  104. raise ValueError('certfile "%s" does not exist' %
  105. self.ssl_options['certfile'])
  106. if ('keyfile' in self.ssl_options and
  107. not os.path.exists(self.ssl_options['keyfile'])):
  108. raise ValueError('keyfile "%s" does not exist' %
  109. self.ssl_options['keyfile'])
  110. def listen(self, port, address=""):
  111. """Starts accepting connections on the given port.
  112. This method may be called more than once to listen on multiple ports.
  113. `listen` takes effect immediately; it is not necessary to call
  114. `TCPServer.start` afterwards. It is, however, necessary to start
  115. the `.IOLoop`.
  116. """
  117. sockets = bind_sockets(port, address=address)
  118. self.add_sockets(sockets)
  119. def add_sockets(self, sockets):
  120. """Makes this server start accepting connections on the given sockets.
  121. The ``sockets`` parameter is a list of socket objects such as
  122. those returned by `~tornado.netutil.bind_sockets`.
  123. `add_sockets` is typically used in combination with that
  124. method and `tornado.process.fork_processes` to provide greater
  125. control over the initialization of a multi-process server.
  126. """
  127. for sock in sockets:
  128. self._sockets[sock.fileno()] = sock
  129. self._handlers[sock.fileno()] = add_accept_handler(
  130. sock, self._handle_connection)
  131. def add_socket(self, socket):
  132. """Singular version of `add_sockets`. Takes a single socket object."""
  133. self.add_sockets([socket])
  134. def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128,
  135. reuse_port=False):
  136. """Binds this server to the given port on the given address.
  137. To start the server, call `start`. If you want to run this server
  138. in a single process, you can call `listen` as a shortcut to the
  139. sequence of `bind` and `start` calls.
  140. Address may be either an IP address or hostname. If it's a hostname,
  141. the server will listen on all IP addresses associated with the
  142. name. Address may be an empty string or None to listen on all
  143. available interfaces. Family may be set to either `socket.AF_INET`
  144. or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
  145. both will be used if available.
  146. The ``backlog`` argument has the same meaning as for
  147. `socket.listen <socket.socket.listen>`. The ``reuse_port`` argument
  148. has the same meaning as for `.bind_sockets`.
  149. This method may be called multiple times prior to `start` to listen
  150. on multiple ports or interfaces.
  151. .. versionchanged:: 4.4
  152. Added the ``reuse_port`` argument.
  153. """
  154. sockets = bind_sockets(port, address=address, family=family,
  155. backlog=backlog, reuse_port=reuse_port)
  156. if self._started:
  157. self.add_sockets(sockets)
  158. else:
  159. self._pending_sockets.extend(sockets)
  160. def start(self, num_processes=1):
  161. """Starts this server in the `.IOLoop`.
  162. By default, we run the server in this process and do not fork any
  163. additional child process.
  164. If num_processes is ``None`` or <= 0, we detect the number of cores
  165. available on this machine and fork that number of child
  166. processes. If num_processes is given and > 1, we fork that
  167. specific number of sub-processes.
  168. Since we use processes and not threads, there is no shared memory
  169. between any server code.
  170. Note that multiple processes are not compatible with the autoreload
  171. module (or the ``autoreload=True`` option to `tornado.web.Application`
  172. which defaults to True when ``debug=True``).
  173. When using multiple processes, no IOLoops can be created or
  174. referenced until after the call to ``TCPServer.start(n)``.
  175. """
  176. assert not self._started
  177. self._started = True
  178. if num_processes != 1:
  179. process.fork_processes(num_processes)
  180. sockets = self._pending_sockets
  181. self._pending_sockets = []
  182. self.add_sockets(sockets)
  183. def stop(self):
  184. """Stops listening for new connections.
  185. Requests currently in progress may still continue after the
  186. server is stopped.
  187. """
  188. if self._stopped:
  189. return
  190. self._stopped = True
  191. for fd, sock in self._sockets.items():
  192. assert sock.fileno() == fd
  193. # Unregister socket from IOLoop
  194. self._handlers.pop(fd)()
  195. sock.close()
  196. def handle_stream(self, stream, address):
  197. """Override to handle a new `.IOStream` from an incoming connection.
  198. This method may be a coroutine; if so any exceptions it raises
  199. asynchronously will be logged. Accepting of incoming connections
  200. will not be blocked by this coroutine.
  201. If this `TCPServer` is configured for SSL, ``handle_stream``
  202. may be called before the SSL handshake has completed. Use
  203. `.SSLIOStream.wait_for_handshake` if you need to verify the client's
  204. certificate or use NPN/ALPN.
  205. .. versionchanged:: 4.2
  206. Added the option for this method to be a coroutine.
  207. """
  208. raise NotImplementedError()
  209. def _handle_connection(self, connection, address):
  210. if self.ssl_options is not None:
  211. assert ssl, "Python 2.6+ and OpenSSL required for SSL"
  212. try:
  213. connection = ssl_wrap_socket(connection,
  214. self.ssl_options,
  215. server_side=True,
  216. do_handshake_on_connect=False)
  217. except ssl.SSLError as err:
  218. if err.args[0] == ssl.SSL_ERROR_EOF:
  219. return connection.close()
  220. else:
  221. raise
  222. except socket.error as err:
  223. # If the connection is closed immediately after it is created
  224. # (as in a port scan), we can get one of several errors.
  225. # wrap_socket makes an internal call to getpeername,
  226. # which may return either EINVAL (Mac OS X) or ENOTCONN
  227. # (Linux). If it returns ENOTCONN, this error is
  228. # silently swallowed by the ssl module, so we need to
  229. # catch another error later on (AttributeError in
  230. # SSLIOStream._do_ssl_handshake).
  231. # To test this behavior, try nmap with the -sT flag.
  232. # https://github.com/tornadoweb/tornado/pull/750
  233. if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
  234. return connection.close()
  235. else:
  236. raise
  237. try:
  238. if self.ssl_options is not None:
  239. stream = SSLIOStream(connection,
  240. max_buffer_size=self.max_buffer_size,
  241. read_chunk_size=self.read_chunk_size)
  242. else:
  243. stream = IOStream(connection,
  244. max_buffer_size=self.max_buffer_size,
  245. read_chunk_size=self.read_chunk_size)
  246. future = self.handle_stream(stream, address)
  247. if future is not None:
  248. IOLoop.current().add_future(gen.convert_yielded(future),
  249. lambda f: f.result())
  250. except Exception:
  251. app_log.error("Error in connection callback", exc_info=True)