123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- #
- # Copyright 2014 Facebook
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- """A non-blocking TCP connection factory.
- """
- from __future__ import absolute_import, division, print_function
- import functools
- import socket
- import numbers
- import datetime
- from tornado.concurrent import Future, future_add_done_callback
- from tornado.ioloop import IOLoop
- from tornado.iostream import IOStream
- from tornado import gen
- from tornado.netutil import Resolver
- from tornado.platform.auto import set_close_exec
- from tornado.gen import TimeoutError
- from tornado.util import timedelta_to_seconds
- _INITIAL_CONNECT_TIMEOUT = 0.3
- class _Connector(object):
- """A stateless implementation of the "Happy Eyeballs" algorithm.
- "Happy Eyeballs" is documented in RFC6555 as the recommended practice
- for when both IPv4 and IPv6 addresses are available.
- In this implementation, we partition the addresses by family, and
- make the first connection attempt to whichever address was
- returned first by ``getaddrinfo``. If that connection fails or
- times out, we begin a connection in parallel to the first address
- of the other family. If there are additional failures we retry
- with other addresses, keeping one connection attempt per family
- in flight at a time.
- http://tools.ietf.org/html/rfc6555
- """
- def __init__(self, addrinfo, connect):
- self.io_loop = IOLoop.current()
- self.connect = connect
- self.future = Future()
- self.timeout = None
- self.connect_timeout = None
- self.last_error = None
- self.remaining = len(addrinfo)
- self.primary_addrs, self.secondary_addrs = self.split(addrinfo)
- self.streams = set()
- @staticmethod
- def split(addrinfo):
- """Partition the ``addrinfo`` list by address family.
- Returns two lists. The first list contains the first entry from
- ``addrinfo`` and all others with the same family, and the
- second list contains all other addresses (normally one list will
- be AF_INET and the other AF_INET6, although non-standard resolvers
- may return additional families).
- """
- primary = []
- secondary = []
- primary_af = addrinfo[0][0]
- for af, addr in addrinfo:
- if af == primary_af:
- primary.append((af, addr))
- else:
- secondary.append((af, addr))
- return primary, secondary
- def start(self, timeout=_INITIAL_CONNECT_TIMEOUT, connect_timeout=None):
- self.try_connect(iter(self.primary_addrs))
- self.set_timeout(timeout)
- if connect_timeout is not None:
- self.set_connect_timeout(connect_timeout)
- return self.future
- def try_connect(self, addrs):
- try:
- af, addr = next(addrs)
- except StopIteration:
- # We've reached the end of our queue, but the other queue
- # might still be working. Send a final error on the future
- # only when both queues are finished.
- if self.remaining == 0 and not self.future.done():
- self.future.set_exception(self.last_error or
- IOError("connection failed"))
- return
- stream, future = self.connect(af, addr)
- self.streams.add(stream)
- future_add_done_callback(
- future, functools.partial(self.on_connect_done, addrs, af, addr))
- def on_connect_done(self, addrs, af, addr, future):
- self.remaining -= 1
- try:
- stream = future.result()
- except Exception as e:
- if self.future.done():
- return
- # Error: try again (but remember what happened so we have an
- # error to raise in the end)
- self.last_error = e
- self.try_connect(addrs)
- if self.timeout is not None:
- # If the first attempt failed, don't wait for the
- # timeout to try an address from the secondary queue.
- self.io_loop.remove_timeout(self.timeout)
- self.on_timeout()
- return
- self.clear_timeouts()
- if self.future.done():
- # This is a late arrival; just drop it.
- stream.close()
- else:
- self.streams.discard(stream)
- self.future.set_result((af, addr, stream))
- self.close_streams()
- def set_timeout(self, timeout):
- self.timeout = self.io_loop.add_timeout(self.io_loop.time() + timeout,
- self.on_timeout)
- def on_timeout(self):
- self.timeout = None
- if not self.future.done():
- self.try_connect(iter(self.secondary_addrs))
- def clear_timeout(self):
- if self.timeout is not None:
- self.io_loop.remove_timeout(self.timeout)
- def set_connect_timeout(self, connect_timeout):
- self.connect_timeout = self.io_loop.add_timeout(
- connect_timeout, self.on_connect_timeout)
- def on_connect_timeout(self):
- if not self.future.done():
- self.future.set_exception(TimeoutError())
- self.close_streams()
- def clear_timeouts(self):
- if self.timeout is not None:
- self.io_loop.remove_timeout(self.timeout)
- if self.connect_timeout is not None:
- self.io_loop.remove_timeout(self.connect_timeout)
- def close_streams(self):
- for stream in self.streams:
- stream.close()
- class TCPClient(object):
- """A non-blocking TCP connection factory.
- .. versionchanged:: 5.0
- The ``io_loop`` argument (deprecated since version 4.1) has been removed.
- """
- def __init__(self, resolver=None):
- if resolver is not None:
- self.resolver = resolver
- self._own_resolver = False
- else:
- self.resolver = Resolver()
- self._own_resolver = True
- def close(self):
- if self._own_resolver:
- self.resolver.close()
- @gen.coroutine
- def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
- max_buffer_size=None, source_ip=None, source_port=None,
- timeout=None):
- """Connect to the given host and port.
- Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
- ``ssl_options`` is not None).
- Using the ``source_ip`` kwarg, one can specify the source
- IP address to use when establishing the connection.
- In case the user needs to resolve and
- use a specific interface, it has to be handled outside
- of Tornado as this depends very much on the platform.
- Raises `TimeoutError` if the input future does not complete before
- ``timeout``, which may be specified in any form allowed by
- `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or an absolute time
- relative to `.IOLoop.time`)
- Similarly, when the user requires a certain source port, it can
- be specified using the ``source_port`` arg.
- .. versionchanged:: 4.5
- Added the ``source_ip`` and ``source_port`` arguments.
- .. versionchanged:: 5.0
- Added the ``timeout`` argument.
- """
- if timeout is not None:
- if isinstance(timeout, numbers.Real):
- timeout = IOLoop.current().time() + timeout
- elif isinstance(timeout, datetime.timedelta):
- timeout = IOLoop.current().time() + timedelta_to_seconds(timeout)
- else:
- raise TypeError("Unsupported timeout %r" % timeout)
- if timeout is not None:
- addrinfo = yield gen.with_timeout(
- timeout, self.resolver.resolve(host, port, af))
- else:
- addrinfo = yield self.resolver.resolve(host, port, af)
- connector = _Connector(
- addrinfo,
- functools.partial(self._create_stream, max_buffer_size,
- source_ip=source_ip, source_port=source_port)
- )
- af, addr, stream = yield connector.start(connect_timeout=timeout)
- # TODO: For better performance we could cache the (af, addr)
- # information here and re-use it on subsequent connections to
- # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
- if ssl_options is not None:
- if timeout is not None:
- stream = yield gen.with_timeout(timeout, stream.start_tls(
- False, ssl_options=ssl_options, server_hostname=host))
- else:
- stream = yield stream.start_tls(False, ssl_options=ssl_options,
- server_hostname=host)
- raise gen.Return(stream)
- def _create_stream(self, max_buffer_size, af, addr, source_ip=None,
- source_port=None):
- # Always connect in plaintext; we'll convert to ssl if necessary
- # after one connection has completed.
- source_port_bind = source_port if isinstance(source_port, int) else 0
- source_ip_bind = source_ip
- if source_port_bind and not source_ip:
- # User required a specific port, but did not specify
- # a certain source IP, will bind to the default loopback.
- source_ip_bind = '::1' if af == socket.AF_INET6 else '127.0.0.1'
- # Trying to use the same address family as the requested af socket:
- # - 127.0.0.1 for IPv4
- # - ::1 for IPv6
- socket_obj = socket.socket(af)
- set_close_exec(socket_obj.fileno())
- if source_port_bind or source_ip_bind:
- # If the user requires binding also to a specific IP/port.
- try:
- socket_obj.bind((source_ip_bind, source_port_bind))
- except socket.error:
- socket_obj.close()
- # Fail loudly if unable to use the IP/port.
- raise
- try:
- stream = IOStream(socket_obj,
- max_buffer_size=max_buffer_size)
- except socket.error as e:
- fu = Future()
- fu.set_exception(e)
- return fu
- else:
- return stream, stream.connect(addr)
|