123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- #
- # Copyright 2011 Facebook
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- """A non-blocking, single-threaded TCP server."""
- from __future__ import absolute_import, division, print_function
- import errno
- import os
- import socket
- from tornado import gen
- from tornado.log import app_log
- from tornado.ioloop import IOLoop
- from tornado.iostream import IOStream, SSLIOStream
- from tornado.netutil import bind_sockets, add_accept_handler, ssl_wrap_socket
- from tornado import process
- from tornado.util import errno_from_exception
- try:
- import ssl
- except ImportError:
- # ssl is not available on Google App Engine.
- ssl = None
- class TCPServer(object):
- r"""A non-blocking, single-threaded TCP server.
- To use `TCPServer`, define a subclass which overrides the `handle_stream`
- method. For example, a simple echo server could be defined like this::
- from tornado.tcpserver import TCPServer
- from tornado.iostream import StreamClosedError
- from tornado import gen
- class EchoServer(TCPServer):
- async def handle_stream(self, stream, address):
- while True:
- try:
- data = await stream.read_until(b"\n")
- await stream.write(data)
- except StreamClosedError:
- break
- To make this server serve SSL traffic, send the ``ssl_options`` keyword
- argument with an `ssl.SSLContext` object. For compatibility with older
- versions of Python ``ssl_options`` may also be a dictionary of keyword
- arguments for the `ssl.wrap_socket` method.::
- ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"),
- os.path.join(data_dir, "mydomain.key"))
- TCPServer(ssl_options=ssl_ctx)
- `TCPServer` initialization follows one of three patterns:
- 1. `listen`: simple single-process::
- server = TCPServer()
- server.listen(8888)
- IOLoop.current().start()
- 2. `bind`/`start`: simple multi-process::
- server = TCPServer()
- server.bind(8888)
- server.start(0) # Forks multiple sub-processes
- IOLoop.current().start()
- When using this interface, an `.IOLoop` must *not* be passed
- to the `TCPServer` constructor. `start` will always start
- the server on the default singleton `.IOLoop`.
- 3. `add_sockets`: advanced multi-process::
- sockets = bind_sockets(8888)
- tornado.process.fork_processes(0)
- server = TCPServer()
- server.add_sockets(sockets)
- IOLoop.current().start()
- The `add_sockets` interface is more complicated, but it can be
- used with `tornado.process.fork_processes` to give you more
- flexibility in when the fork happens. `add_sockets` can
- also be used in single-process servers if you want to create
- your listening sockets in some way other than
- `~tornado.netutil.bind_sockets`.
- .. versionadded:: 3.1
- The ``max_buffer_size`` argument.
- .. versionchanged:: 5.0
- The ``io_loop`` argument has been removed.
- """
- def __init__(self, ssl_options=None, max_buffer_size=None,
- read_chunk_size=None):
- self.ssl_options = ssl_options
- self._sockets = {} # fd -> socket object
- self._handlers = {} # fd -> remove_handler callable
- self._pending_sockets = []
- self._started = False
- self._stopped = False
- self.max_buffer_size = max_buffer_size
- self.read_chunk_size = read_chunk_size
- # Verify the SSL options. Otherwise we don't get errors until clients
- # connect. This doesn't verify that the keys are legitimate, but
- # the SSL module doesn't do that until there is a connected socket
- # which seems like too much work
- if self.ssl_options is not None and isinstance(self.ssl_options, dict):
- # Only certfile is required: it can contain both keys
- if 'certfile' not in self.ssl_options:
- raise KeyError('missing key "certfile" in ssl_options')
- if not os.path.exists(self.ssl_options['certfile']):
- raise ValueError('certfile "%s" does not exist' %
- self.ssl_options['certfile'])
- if ('keyfile' in self.ssl_options and
- not os.path.exists(self.ssl_options['keyfile'])):
- raise ValueError('keyfile "%s" does not exist' %
- self.ssl_options['keyfile'])
- def listen(self, port, address=""):
- """Starts accepting connections on the given port.
- This method may be called more than once to listen on multiple ports.
- `listen` takes effect immediately; it is not necessary to call
- `TCPServer.start` afterwards. It is, however, necessary to start
- the `.IOLoop`.
- """
- sockets = bind_sockets(port, address=address)
- self.add_sockets(sockets)
- def add_sockets(self, sockets):
- """Makes this server start accepting connections on the given sockets.
- The ``sockets`` parameter is a list of socket objects such as
- those returned by `~tornado.netutil.bind_sockets`.
- `add_sockets` is typically used in combination with that
- method and `tornado.process.fork_processes` to provide greater
- control over the initialization of a multi-process server.
- """
- for sock in sockets:
- self._sockets[sock.fileno()] = sock
- self._handlers[sock.fileno()] = add_accept_handler(
- sock, self._handle_connection)
- def add_socket(self, socket):
- """Singular version of `add_sockets`. Takes a single socket object."""
- self.add_sockets([socket])
- def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128,
- reuse_port=False):
- """Binds this server to the given port on the given address.
- To start the server, call `start`. If you want to run this server
- in a single process, you can call `listen` as a shortcut to the
- sequence of `bind` and `start` calls.
- Address may be either an IP address or hostname. If it's a hostname,
- the server will listen on all IP addresses associated with the
- name. Address may be an empty string or None to listen on all
- available interfaces. Family may be set to either `socket.AF_INET`
- or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
- both will be used if available.
- The ``backlog`` argument has the same meaning as for
- `socket.listen <socket.socket.listen>`. The ``reuse_port`` argument
- has the same meaning as for `.bind_sockets`.
- This method may be called multiple times prior to `start` to listen
- on multiple ports or interfaces.
- .. versionchanged:: 4.4
- Added the ``reuse_port`` argument.
- """
- sockets = bind_sockets(port, address=address, family=family,
- backlog=backlog, reuse_port=reuse_port)
- if self._started:
- self.add_sockets(sockets)
- else:
- self._pending_sockets.extend(sockets)
- def start(self, num_processes=1):
- """Starts this server in the `.IOLoop`.
- By default, we run the server in this process and do not fork any
- additional child process.
- If num_processes is ``None`` or <= 0, we detect the number of cores
- available on this machine and fork that number of child
- processes. If num_processes is given and > 1, we fork that
- specific number of sub-processes.
- Since we use processes and not threads, there is no shared memory
- between any server code.
- Note that multiple processes are not compatible with the autoreload
- module (or the ``autoreload=True`` option to `tornado.web.Application`
- which defaults to True when ``debug=True``).
- When using multiple processes, no IOLoops can be created or
- referenced until after the call to ``TCPServer.start(n)``.
- """
- assert not self._started
- self._started = True
- if num_processes != 1:
- process.fork_processes(num_processes)
- sockets = self._pending_sockets
- self._pending_sockets = []
- self.add_sockets(sockets)
- def stop(self):
- """Stops listening for new connections.
- Requests currently in progress may still continue after the
- server is stopped.
- """
- if self._stopped:
- return
- self._stopped = True
- for fd, sock in self._sockets.items():
- assert sock.fileno() == fd
- # Unregister socket from IOLoop
- self._handlers.pop(fd)()
- sock.close()
- def handle_stream(self, stream, address):
- """Override to handle a new `.IOStream` from an incoming connection.
- This method may be a coroutine; if so any exceptions it raises
- asynchronously will be logged. Accepting of incoming connections
- will not be blocked by this coroutine.
- If this `TCPServer` is configured for SSL, ``handle_stream``
- may be called before the SSL handshake has completed. Use
- `.SSLIOStream.wait_for_handshake` if you need to verify the client's
- certificate or use NPN/ALPN.
- .. versionchanged:: 4.2
- Added the option for this method to be a coroutine.
- """
- raise NotImplementedError()
- def _handle_connection(self, connection, address):
- if self.ssl_options is not None:
- assert ssl, "Python 2.6+ and OpenSSL required for SSL"
- try:
- connection = ssl_wrap_socket(connection,
- self.ssl_options,
- server_side=True,
- do_handshake_on_connect=False)
- except ssl.SSLError as err:
- if err.args[0] == ssl.SSL_ERROR_EOF:
- return connection.close()
- else:
- raise
- except socket.error as err:
- # If the connection is closed immediately after it is created
- # (as in a port scan), we can get one of several errors.
- # wrap_socket makes an internal call to getpeername,
- # which may return either EINVAL (Mac OS X) or ENOTCONN
- # (Linux). If it returns ENOTCONN, this error is
- # silently swallowed by the ssl module, so we need to
- # catch another error later on (AttributeError in
- # SSLIOStream._do_ssl_handshake).
- # To test this behavior, try nmap with the -sT flag.
- # https://github.com/tornadoweb/tornado/pull/750
- if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
- return connection.close()
- else:
- raise
- try:
- if self.ssl_options is not None:
- stream = SSLIOStream(connection,
- max_buffer_size=self.max_buffer_size,
- read_chunk_size=self.read_chunk_size)
- else:
- stream = IOStream(connection,
- max_buffer_size=self.max_buffer_size,
- read_chunk_size=self.read_chunk_size)
- future = self.handle_stream(stream, address)
- if future is not None:
- IOLoop.current().add_future(gen.convert_yielded(future),
- lambda f: f.result())
- except Exception:
- app_log.error("Error in connection callback", exc_info=True)
|