12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757 |
- #
- # Copyright 2009 Facebook
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- """Utility classes to write to and read from non-blocking files and sockets.
- Contents:
- * `BaseIOStream`: Generic interface for reading and writing.
- * `IOStream`: Implementation of BaseIOStream using non-blocking sockets.
- * `SSLIOStream`: SSL-aware version of IOStream.
- * `PipeIOStream`: Pipe-based IOStream implementation.
- """
- from __future__ import absolute_import, division, print_function
- import collections
- import errno
- import io
- import numbers
- import os
- import socket
- import sys
- import re
- import warnings
- from tornado.concurrent import Future
- from tornado import ioloop
- from tornado.log import gen_log, app_log
- from tornado.netutil import ssl_wrap_socket, _client_ssl_defaults, _server_ssl_defaults
- from tornado import stack_context
- from tornado.util import errno_from_exception
- try:
- from tornado.platform.posix import _set_nonblocking
- except ImportError:
- _set_nonblocking = None
- try:
- import ssl
- except ImportError:
- # ssl is not available on Google App Engine
- ssl = None
- # These errnos indicate that a non-blocking operation must be retried
- # at a later time. On most platforms they're the same value, but on
- # some they differ.
- _ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
- if hasattr(errno, "WSAEWOULDBLOCK"):
- _ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,) # type: ignore
- # These errnos indicate that a connection has been abruptly terminated.
- # They should be caught and handled less noisily than other errors.
- _ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE,
- errno.ETIMEDOUT)
- if hasattr(errno, "WSAECONNRESET"):
- _ERRNO_CONNRESET += (errno.WSAECONNRESET, errno.WSAECONNABORTED, errno.WSAETIMEDOUT) # type: ignore # noqa: E501
- if sys.platform == 'darwin':
- # OSX appears to have a race condition that causes send(2) to return
- # EPROTOTYPE if called while a socket is being torn down:
- # http://erickt.github.io/blog/2014/11/19/adventures-in-debugging-a-potential-osx-kernel-bug/
- # Since the socket is being closed anyway, treat this as an ECONNRESET
- # instead of an unexpected error.
- _ERRNO_CONNRESET += (errno.EPROTOTYPE,) # type: ignore
- # More non-portable errnos:
- _ERRNO_INPROGRESS = (errno.EINPROGRESS,)
- if hasattr(errno, "WSAEINPROGRESS"):
- _ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,) # type: ignore
- _WINDOWS = sys.platform.startswith('win')
- class StreamClosedError(IOError):
- """Exception raised by `IOStream` methods when the stream is closed.
- Note that the close callback is scheduled to run *after* other
- callbacks on the stream (to allow for buffered data to be processed),
- so you may see this error before you see the close callback.
- The ``real_error`` attribute contains the underlying error that caused
- the stream to close (if any).
- .. versionchanged:: 4.3
- Added the ``real_error`` attribute.
- """
- def __init__(self, real_error=None):
- super(StreamClosedError, self).__init__('Stream is closed')
- self.real_error = real_error
- class UnsatisfiableReadError(Exception):
- """Exception raised when a read cannot be satisfied.
- Raised by ``read_until`` and ``read_until_regex`` with a ``max_bytes``
- argument.
- """
- pass
- class StreamBufferFullError(Exception):
- """Exception raised by `IOStream` methods when the buffer is full.
- """
- class _StreamBuffer(object):
- """
- A specialized buffer that tries to avoid copies when large pieces
- of data are encountered.
- """
- def __init__(self):
- # A sequence of (False, bytearray) and (True, memoryview) objects
- self._buffers = collections.deque()
- # Position in the first buffer
- self._first_pos = 0
- self._size = 0
- def __len__(self):
- return self._size
- # Data above this size will be appended separately instead
- # of extending an existing bytearray
- _large_buf_threshold = 2048
- def append(self, data):
- """
- Append the given piece of data (should be a buffer-compatible object).
- """
- size = len(data)
- if size > self._large_buf_threshold:
- if not isinstance(data, memoryview):
- data = memoryview(data)
- self._buffers.append((True, data))
- elif size > 0:
- if self._buffers:
- is_memview, b = self._buffers[-1]
- new_buf = is_memview or len(b) >= self._large_buf_threshold
- else:
- new_buf = True
- if new_buf:
- self._buffers.append((False, bytearray(data)))
- else:
- b += data
- self._size += size
- def peek(self, size):
- """
- Get a view over at most ``size`` bytes (possibly fewer) at the
- current buffer position.
- """
- assert size > 0
- try:
- is_memview, b = self._buffers[0]
- except IndexError:
- return memoryview(b'')
- pos = self._first_pos
- if is_memview:
- return b[pos:pos + size]
- else:
- return memoryview(b)[pos:pos + size]
- def advance(self, size):
- """
- Advance the current buffer position by ``size`` bytes.
- """
- assert 0 < size <= self._size
- self._size -= size
- pos = self._first_pos
- buffers = self._buffers
- while buffers and size > 0:
- is_large, b = buffers[0]
- b_remain = len(b) - size - pos
- if b_remain <= 0:
- buffers.popleft()
- size -= len(b) - pos
- pos = 0
- elif is_large:
- pos += size
- size = 0
- else:
- # Amortized O(1) shrink for Python 2
- pos += size
- if len(b) <= 2 * pos:
- del b[:pos]
- pos = 0
- size = 0
- assert size == 0
- self._first_pos = pos
- class BaseIOStream(object):
- """A utility class to write to and read from a non-blocking file or socket.
- We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
- All of the methods take an optional ``callback`` argument and return a
- `.Future` only if no callback is given. When the operation completes,
- the callback will be run or the `.Future` will resolve with the data
- read (or ``None`` for ``write()``). All outstanding ``Futures`` will
- resolve with a `StreamClosedError` when the stream is closed; users
- of the callback interface will be notified via
- `.BaseIOStream.set_close_callback` instead.
- When a stream is closed due to an error, the IOStream's ``error``
- attribute contains the exception object.
- Subclasses must implement `fileno`, `close_fd`, `write_to_fd`,
- `read_from_fd`, and optionally `get_fd_error`.
- """
- def __init__(self, max_buffer_size=None,
- read_chunk_size=None, max_write_buffer_size=None):
- """`BaseIOStream` constructor.
- :arg max_buffer_size: Maximum amount of incoming data to buffer;
- defaults to 100MB.
- :arg read_chunk_size: Amount of data to read at one time from the
- underlying transport; defaults to 64KB.
- :arg max_write_buffer_size: Amount of outgoing data to buffer;
- defaults to unlimited.
- .. versionchanged:: 4.0
- Add the ``max_write_buffer_size`` parameter. Changed default
- ``read_chunk_size`` to 64KB.
- .. versionchanged:: 5.0
- The ``io_loop`` argument (deprecated since version 4.1) has been
- removed.
- """
- self.io_loop = ioloop.IOLoop.current()
- self.max_buffer_size = max_buffer_size or 104857600
- # A chunk size that is too close to max_buffer_size can cause
- # spurious failures.
- self.read_chunk_size = min(read_chunk_size or 65536,
- self.max_buffer_size // 2)
- self.max_write_buffer_size = max_write_buffer_size
- self.error = None
- self._read_buffer = bytearray()
- self._read_buffer_pos = 0
- self._read_buffer_size = 0
- self._user_read_buffer = False
- self._after_user_read_buffer = None
- self._write_buffer = _StreamBuffer()
- self._total_write_index = 0
- self._total_write_done_index = 0
- self._read_delimiter = None
- self._read_regex = None
- self._read_max_bytes = None
- self._read_bytes = None
- self._read_partial = False
- self._read_until_close = False
- self._read_callback = None
- self._read_future = None
- self._streaming_callback = None
- self._write_callback = None
- self._write_futures = collections.deque()
- self._close_callback = None
- self._connect_callback = None
- self._connect_future = None
- # _ssl_connect_future should be defined in SSLIOStream
- # but it's here so we can clean it up in maybe_run_close_callback.
- # TODO: refactor that so subclasses can add additional futures
- # to be cancelled.
- self._ssl_connect_future = None
- self._connecting = False
- self._state = None
- self._pending_callbacks = 0
- self._closed = False
- def fileno(self):
- """Returns the file descriptor for this stream."""
- raise NotImplementedError()
- def close_fd(self):
- """Closes the file underlying this stream.
- ``close_fd`` is called by `BaseIOStream` and should not be called
- elsewhere; other users should call `close` instead.
- """
- raise NotImplementedError()
- def write_to_fd(self, data):
- """Attempts to write ``data`` to the underlying file.
- Returns the number of bytes written.
- """
- raise NotImplementedError()
- def read_from_fd(self, buf):
- """Attempts to read from the underlying file.
- Reads up to ``len(buf)`` bytes, storing them in the buffer.
- Returns the number of bytes read. Returns None if there was
- nothing to read (the socket returned `~errno.EWOULDBLOCK` or
- equivalent), and zero on EOF.
- .. versionchanged:: 5.0
- Interface redesigned to take a buffer and return a number
- of bytes instead of a freshly-allocated object.
- """
- raise NotImplementedError()
- def get_fd_error(self):
- """Returns information about any error on the underlying file.
- This method is called after the `.IOLoop` has signaled an error on the
- file descriptor, and should return an Exception (such as `socket.error`
- with additional information, or None if no such information is
- available.
- """
- return None
- def read_until_regex(self, regex, callback=None, max_bytes=None):
- """Asynchronously read until we have matched the given regex.
- The result includes the data that matches the regex and anything
- that came before it. If a callback is given, it will be run
- with the data as an argument; if not, this method returns a
- `.Future`.
- If ``max_bytes`` is not None, the connection will be closed
- if more than ``max_bytes`` bytes have been read and the regex is
- not satisfied.
- .. versionchanged:: 4.0
- Added the ``max_bytes`` argument. The ``callback`` argument is
- now optional and a `.Future` will be returned if it is omitted.
- .. deprecated:: 5.1
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
- """
- future = self._set_read_callback(callback)
- self._read_regex = re.compile(regex)
- self._read_max_bytes = max_bytes
- try:
- self._try_inline_read()
- except UnsatisfiableReadError as e:
- # Handle this the same way as in _handle_events.
- gen_log.info("Unsatisfiable read, closing connection: %s" % e)
- self.close(exc_info=e)
- return future
- except:
- if future is not None:
- # Ensure that the future doesn't log an error because its
- # failure was never examined.
- future.add_done_callback(lambda f: f.exception())
- raise
- return future
- def read_until(self, delimiter, callback=None, max_bytes=None):
- """Asynchronously read until we have found the given delimiter.
- The result includes all the data read including the delimiter.
- If a callback is given, it will be run with the data as an argument;
- if not, this method returns a `.Future`.
- If ``max_bytes`` is not None, the connection will be closed
- if more than ``max_bytes`` bytes have been read and the delimiter
- is not found.
- .. versionchanged:: 4.0
- Added the ``max_bytes`` argument. The ``callback`` argument is
- now optional and a `.Future` will be returned if it is omitted.
- .. deprecated:: 5.1
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
- """
- future = self._set_read_callback(callback)
- self._read_delimiter = delimiter
- self._read_max_bytes = max_bytes
- try:
- self._try_inline_read()
- except UnsatisfiableReadError as e:
- # Handle this the same way as in _handle_events.
- gen_log.info("Unsatisfiable read, closing connection: %s" % e)
- self.close(exc_info=e)
- return future
- except:
- if future is not None:
- future.add_done_callback(lambda f: f.exception())
- raise
- return future
- def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
- partial=False):
- """Asynchronously read a number of bytes.
- If a ``streaming_callback`` is given, it will be called with chunks
- of data as they become available, and the final result will be empty.
- Otherwise, the result is all the data that was read.
- If a callback is given, it will be run with the data as an argument;
- if not, this method returns a `.Future`.
- If ``partial`` is true, the callback is run as soon as we have
- any bytes to return (but never more than ``num_bytes``)
- .. versionchanged:: 4.0
- Added the ``partial`` argument. The callback argument is now
- optional and a `.Future` will be returned if it is omitted.
- .. deprecated:: 5.1
- The ``callback`` and ``streaming_callback`` arguments are
- deprecated and will be removed in Tornado 6.0. Use the
- returned `.Future` (and ``partial=True`` for
- ``streaming_callback``) instead.
- """
- future = self._set_read_callback(callback)
- assert isinstance(num_bytes, numbers.Integral)
- self._read_bytes = num_bytes
- self._read_partial = partial
- if streaming_callback is not None:
- warnings.warn("streaming_callback is deprecated, use partial instead",
- DeprecationWarning)
- self._streaming_callback = stack_context.wrap(streaming_callback)
- try:
- self._try_inline_read()
- except:
- if future is not None:
- future.add_done_callback(lambda f: f.exception())
- raise
- return future
- def read_into(self, buf, callback=None, partial=False):
- """Asynchronously read a number of bytes.
- ``buf`` must be a writable buffer into which data will be read.
- If a callback is given, it will be run with the number of read
- bytes as an argument; if not, this method returns a `.Future`.
- If ``partial`` is true, the callback is run as soon as any bytes
- have been read. Otherwise, it is run when the ``buf`` has been
- entirely filled with read data.
- .. versionadded:: 5.0
- .. deprecated:: 5.1
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
- """
- future = self._set_read_callback(callback)
- # First copy data already in read buffer
- available_bytes = self._read_buffer_size
- n = len(buf)
- if available_bytes >= n:
- end = self._read_buffer_pos + n
- buf[:] = memoryview(self._read_buffer)[self._read_buffer_pos:end]
- del self._read_buffer[:end]
- self._after_user_read_buffer = self._read_buffer
- elif available_bytes > 0:
- buf[:available_bytes] = memoryview(self._read_buffer)[self._read_buffer_pos:]
- # Set up the supplied buffer as our temporary read buffer.
- # The original (if it had any data remaining) has been
- # saved for later.
- self._user_read_buffer = True
- self._read_buffer = buf
- self._read_buffer_pos = 0
- self._read_buffer_size = available_bytes
- self._read_bytes = n
- self._read_partial = partial
- try:
- self._try_inline_read()
- except:
- if future is not None:
- future.add_done_callback(lambda f: f.exception())
- raise
- return future
- def read_until_close(self, callback=None, streaming_callback=None):
- """Asynchronously reads all data from the socket until it is closed.
- If a ``streaming_callback`` is given, it will be called with chunks
- of data as they become available, and the final result will be empty.
- Otherwise, the result is all the data that was read.
- If a callback is given, it will be run with the data as an argument;
- if not, this method returns a `.Future`.
- Note that if a ``streaming_callback`` is used, data will be
- read from the socket as quickly as it becomes available; there
- is no way to apply backpressure or cancel the reads. If flow
- control or cancellation are desired, use a loop with
- `read_bytes(partial=True) <.read_bytes>` instead.
- .. versionchanged:: 4.0
- The callback argument is now optional and a `.Future` will
- be returned if it is omitted.
- .. deprecated:: 5.1
- The ``callback`` and ``streaming_callback`` arguments are
- deprecated and will be removed in Tornado 6.0. Use the
- returned `.Future` (and `read_bytes` with ``partial=True``
- for ``streaming_callback``) instead.
- """
- future = self._set_read_callback(callback)
- if streaming_callback is not None:
- warnings.warn("streaming_callback is deprecated, use read_bytes(partial=True) instead",
- DeprecationWarning)
- self._streaming_callback = stack_context.wrap(streaming_callback)
- if self.closed():
- if self._streaming_callback is not None:
- self._run_read_callback(self._read_buffer_size, True)
- self._run_read_callback(self._read_buffer_size, False)
- return future
- self._read_until_close = True
- try:
- self._try_inline_read()
- except:
- if future is not None:
- future.add_done_callback(lambda f: f.exception())
- raise
- return future
- def write(self, data, callback=None):
- """Asynchronously write the given data to this stream.
- If ``callback`` is given, we call it when all of the buffered write
- data has been successfully written to the stream. If there was
- previously buffered write data and an old write callback, that
- callback is simply overwritten with this new callback.
- If no ``callback`` is given, this method returns a `.Future` that
- resolves (with a result of ``None``) when the write has been
- completed.
- The ``data`` argument may be of type `bytes` or `memoryview`.
- .. versionchanged:: 4.0
- Now returns a `.Future` if no callback is given.
- .. versionchanged:: 4.5
- Added support for `memoryview` arguments.
- .. deprecated:: 5.1
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
- """
- self._check_closed()
- if data:
- if (self.max_write_buffer_size is not None and
- len(self._write_buffer) + len(data) > self.max_write_buffer_size):
- raise StreamBufferFullError("Reached maximum write buffer size")
- self._write_buffer.append(data)
- self._total_write_index += len(data)
- if callback is not None:
- warnings.warn("callback argument is deprecated, use returned Future instead",
- DeprecationWarning)
- self._write_callback = stack_context.wrap(callback)
- future = None
- else:
- future = Future()
- future.add_done_callback(lambda f: f.exception())
- self._write_futures.append((self._total_write_index, future))
- if not self._connecting:
- self._handle_write()
- if self._write_buffer:
- self._add_io_state(self.io_loop.WRITE)
- self._maybe_add_error_listener()
- return future
- def set_close_callback(self, callback):
- """Call the given callback when the stream is closed.
- This mostly is not necessary for applications that use the
- `.Future` interface; all outstanding ``Futures`` will resolve
- with a `StreamClosedError` when the stream is closed. However,
- it is still useful as a way to signal that the stream has been
- closed while no other read or write is in progress.
- Unlike other callback-based interfaces, ``set_close_callback``
- will not be removed in Tornado 6.0.
- """
- self._close_callback = stack_context.wrap(callback)
- self._maybe_add_error_listener()
- def close(self, exc_info=False):
- """Close this stream.
- If ``exc_info`` is true, set the ``error`` attribute to the current
- exception from `sys.exc_info` (or if ``exc_info`` is a tuple,
- use that instead of `sys.exc_info`).
- """
- if not self.closed():
- if exc_info:
- if isinstance(exc_info, tuple):
- self.error = exc_info[1]
- elif isinstance(exc_info, BaseException):
- self.error = exc_info
- else:
- exc_info = sys.exc_info()
- if any(exc_info):
- self.error = exc_info[1]
- if self._read_until_close:
- if (self._streaming_callback is not None and
- self._read_buffer_size):
- self._run_read_callback(self._read_buffer_size, True)
- self._read_until_close = False
- self._run_read_callback(self._read_buffer_size, False)
- if self._state is not None:
- self.io_loop.remove_handler(self.fileno())
- self._state = None
- self.close_fd()
- self._closed = True
- self._maybe_run_close_callback()
- def _maybe_run_close_callback(self):
- # If there are pending callbacks, don't run the close callback
- # until they're done (see _maybe_add_error_handler)
- if self.closed() and self._pending_callbacks == 0:
- futures = []
- if self._read_future is not None:
- futures.append(self._read_future)
- self._read_future = None
- futures += [future for _, future in self._write_futures]
- self._write_futures.clear()
- if self._connect_future is not None:
- futures.append(self._connect_future)
- self._connect_future = None
- if self._ssl_connect_future is not None:
- futures.append(self._ssl_connect_future)
- self._ssl_connect_future = None
- for future in futures:
- future.set_exception(StreamClosedError(real_error=self.error))
- future.exception()
- if self._close_callback is not None:
- cb = self._close_callback
- self._close_callback = None
- self._run_callback(cb)
- # Delete any unfinished callbacks to break up reference cycles.
- self._read_callback = self._write_callback = None
- # Clear the buffers so they can be cleared immediately even
- # if the IOStream object is kept alive by a reference cycle.
- # TODO: Clear the read buffer too; it currently breaks some tests.
- self._write_buffer = None
- def reading(self):
- """Returns true if we are currently reading from the stream."""
- return self._read_callback is not None or self._read_future is not None
- def writing(self):
- """Returns true if we are currently writing to the stream."""
- return bool(self._write_buffer)
- def closed(self):
- """Returns true if the stream has been closed."""
- return self._closed
- def set_nodelay(self, value):
- """Sets the no-delay flag for this stream.
- By default, data written to TCP streams may be held for a time
- to make the most efficient use of bandwidth (according to
- Nagle's algorithm). The no-delay flag requests that data be
- written as soon as possible, even if doing so would consume
- additional bandwidth.
- This flag is currently defined only for TCP-based ``IOStreams``.
- .. versionadded:: 3.1
- """
- pass
- def _handle_events(self, fd, events):
- if self.closed():
- gen_log.warning("Got events for closed stream %s", fd)
- return
- try:
- if self._connecting:
- # Most IOLoops will report a write failed connect
- # with the WRITE event, but SelectIOLoop reports a
- # READ as well so we must check for connecting before
- # either.
- self._handle_connect()
- if self.closed():
- return
- if events & self.io_loop.READ:
- self._handle_read()
- if self.closed():
- return
- if events & self.io_loop.WRITE:
- self._handle_write()
- if self.closed():
- return
- if events & self.io_loop.ERROR:
- self.error = self.get_fd_error()
- # We may have queued up a user callback in _handle_read or
- # _handle_write, so don't close the IOStream until those
- # callbacks have had a chance to run.
- self.io_loop.add_callback(self.close)
- return
- state = self.io_loop.ERROR
- if self.reading():
- state |= self.io_loop.READ
- if self.writing():
- state |= self.io_loop.WRITE
- if state == self.io_loop.ERROR and self._read_buffer_size == 0:
- # If the connection is idle, listen for reads too so
- # we can tell if the connection is closed. If there is
- # data in the read buffer we won't run the close callback
- # yet anyway, so we don't need to listen in this case.
- state |= self.io_loop.READ
- if state != self._state:
- assert self._state is not None, \
- "shouldn't happen: _handle_events without self._state"
- self._state = state
- self.io_loop.update_handler(self.fileno(), self._state)
- except UnsatisfiableReadError as e:
- gen_log.info("Unsatisfiable read, closing connection: %s" % e)
- self.close(exc_info=e)
- except Exception as e:
- gen_log.error("Uncaught exception, closing connection.",
- exc_info=True)
- self.close(exc_info=e)
- raise
- def _run_callback(self, callback, *args):
- def wrapper():
- self._pending_callbacks -= 1
- try:
- return callback(*args)
- except Exception as e:
- app_log.error("Uncaught exception, closing connection.",
- exc_info=True)
- # Close the socket on an uncaught exception from a user callback
- # (It would eventually get closed when the socket object is
- # gc'd, but we don't want to rely on gc happening before we
- # run out of file descriptors)
- self.close(exc_info=e)
- # Re-raise the exception so that IOLoop.handle_callback_exception
- # can see it and log the error
- raise
- finally:
- self._maybe_add_error_listener()
- # We schedule callbacks to be run on the next IOLoop iteration
- # rather than running them directly for several reasons:
- # * Prevents unbounded stack growth when a callback calls an
- # IOLoop operation that immediately runs another callback
- # * Provides a predictable execution context for e.g.
- # non-reentrant mutexes
- # * Ensures that the try/except in wrapper() is run outside
- # of the application's StackContexts
- with stack_context.NullContext():
- # stack_context was already captured in callback, we don't need to
- # capture it again for IOStream's wrapper. This is especially
- # important if the callback was pre-wrapped before entry to
- # IOStream (as in HTTPConnection._header_callback), as we could
- # capture and leak the wrong context here.
- self._pending_callbacks += 1
- self.io_loop.add_callback(wrapper)
- def _read_to_buffer_loop(self):
- # This method is called from _handle_read and _try_inline_read.
- try:
- if self._read_bytes is not None:
- target_bytes = self._read_bytes
- elif self._read_max_bytes is not None:
- target_bytes = self._read_max_bytes
- elif self.reading():
- # For read_until without max_bytes, or
- # read_until_close, read as much as we can before
- # scanning for the delimiter.
- target_bytes = None
- else:
- target_bytes = 0
- next_find_pos = 0
- # Pretend to have a pending callback so that an EOF in
- # _read_to_buffer doesn't trigger an immediate close
- # callback. At the end of this method we'll either
- # establish a real pending callback via
- # _read_from_buffer or run the close callback.
- #
- # We need two try statements here so that
- # pending_callbacks is decremented before the `except`
- # clause below (which calls `close` and does need to
- # trigger the callback)
- self._pending_callbacks += 1
- while not self.closed():
- # Read from the socket until we get EWOULDBLOCK or equivalent.
- # SSL sockets do some internal buffering, and if the data is
- # sitting in the SSL object's buffer select() and friends
- # can't see it; the only way to find out if it's there is to
- # try to read it.
- if self._read_to_buffer() == 0:
- break
- self._run_streaming_callback()
- # If we've read all the bytes we can use, break out of
- # this loop. We can't just call read_from_buffer here
- # because of subtle interactions with the
- # pending_callback and error_listener mechanisms.
- #
- # If we've reached target_bytes, we know we're done.
- if (target_bytes is not None and
- self._read_buffer_size >= target_bytes):
- break
- # Otherwise, we need to call the more expensive find_read_pos.
- # It's inefficient to do this on every read, so instead
- # do it on the first read and whenever the read buffer
- # size has doubled.
- if self._read_buffer_size >= next_find_pos:
- pos = self._find_read_pos()
- if pos is not None:
- return pos
- next_find_pos = self._read_buffer_size * 2
- return self._find_read_pos()
- finally:
- self._pending_callbacks -= 1
- def _handle_read(self):
- try:
- pos = self._read_to_buffer_loop()
- except UnsatisfiableReadError:
- raise
- except Exception as e:
- gen_log.warning("error on read: %s" % e)
- self.close(exc_info=e)
- return
- if pos is not None:
- self._read_from_buffer(pos)
- return
- else:
- self._maybe_run_close_callback()
- def _set_read_callback(self, callback):
- assert self._read_callback is None, "Already reading"
- assert self._read_future is None, "Already reading"
- if callback is not None:
- warnings.warn("callbacks are deprecated, use returned Future instead",
- DeprecationWarning)
- self._read_callback = stack_context.wrap(callback)
- else:
- self._read_future = Future()
- return self._read_future
- def _run_read_callback(self, size, streaming):
- if self._user_read_buffer:
- self._read_buffer = self._after_user_read_buffer or bytearray()
- self._after_user_read_buffer = None
- self._read_buffer_pos = 0
- self._read_buffer_size = len(self._read_buffer)
- self._user_read_buffer = False
- result = size
- else:
- result = self._consume(size)
- if streaming:
- callback = self._streaming_callback
- else:
- callback = self._read_callback
- self._read_callback = self._streaming_callback = None
- if self._read_future is not None:
- assert callback is None
- future = self._read_future
- self._read_future = None
- future.set_result(result)
- if callback is not None:
- assert (self._read_future is None) or streaming
- self._run_callback(callback, result)
- else:
- # If we scheduled a callback, we will add the error listener
- # afterwards. If we didn't, we have to do it now.
- self._maybe_add_error_listener()
- def _try_inline_read(self):
- """Attempt to complete the current read operation from buffered data.
- If the read can be completed without blocking, schedules the
- read callback on the next IOLoop iteration; otherwise starts
- listening for reads on the socket.
- """
- # See if we've already got the data from a previous read
- self._run_streaming_callback()
- pos = self._find_read_pos()
- if pos is not None:
- self._read_from_buffer(pos)
- return
- self._check_closed()
- try:
- pos = self._read_to_buffer_loop()
- except Exception:
- # If there was an in _read_to_buffer, we called close() already,
- # but couldn't run the close callback because of _pending_callbacks.
- # Before we escape from this function, run the close callback if
- # applicable.
- self._maybe_run_close_callback()
- raise
- if pos is not None:
- self._read_from_buffer(pos)
- return
- # We couldn't satisfy the read inline, so either close the stream
- # or listen for new data.
- if self.closed():
- self._maybe_run_close_callback()
- else:
- self._add_io_state(ioloop.IOLoop.READ)
- def _read_to_buffer(self):
- """Reads from the socket and appends the result to the read buffer.
- Returns the number of bytes read. Returns 0 if there is nothing
- to read (i.e. the read returns EWOULDBLOCK or equivalent). On
- error closes the socket and raises an exception.
- """
- try:
- while True:
- try:
- if self._user_read_buffer:
- buf = memoryview(self._read_buffer)[self._read_buffer_size:]
- else:
- buf = bytearray(self.read_chunk_size)
- bytes_read = self.read_from_fd(buf)
- except (socket.error, IOError, OSError) as e:
- if errno_from_exception(e) == errno.EINTR:
- continue
- # ssl.SSLError is a subclass of socket.error
- if self._is_connreset(e):
- # Treat ECONNRESET as a connection close rather than
- # an error to minimize log spam (the exception will
- # be available on self.error for apps that care).
- self.close(exc_info=e)
- return
- self.close(exc_info=e)
- raise
- break
- if bytes_read is None:
- return 0
- elif bytes_read == 0:
- self.close()
- return 0
- if not self._user_read_buffer:
- self._read_buffer += memoryview(buf)[:bytes_read]
- self._read_buffer_size += bytes_read
- finally:
- # Break the reference to buf so we don't waste a chunk's worth of
- # memory in case an exception hangs on to our stack frame.
- buf = None
- if self._read_buffer_size > self.max_buffer_size:
- gen_log.error("Reached maximum read buffer size")
- self.close()
- raise StreamBufferFullError("Reached maximum read buffer size")
- return bytes_read
- def _run_streaming_callback(self):
- if self._streaming_callback is not None and self._read_buffer_size:
- bytes_to_consume = self._read_buffer_size
- if self._read_bytes is not None:
- bytes_to_consume = min(self._read_bytes, bytes_to_consume)
- self._read_bytes -= bytes_to_consume
- self._run_read_callback(bytes_to_consume, True)
- def _read_from_buffer(self, pos):
- """Attempts to complete the currently-pending read from the buffer.
- The argument is either a position in the read buffer or None,
- as returned by _find_read_pos.
- """
- self._read_bytes = self._read_delimiter = self._read_regex = None
- self._read_partial = False
- self._run_read_callback(pos, False)
- def _find_read_pos(self):
- """Attempts to find a position in the read buffer that satisfies
- the currently-pending read.
- Returns a position in the buffer if the current read can be satisfied,
- or None if it cannot.
- """
- if (self._read_bytes is not None and
- (self._read_buffer_size >= self._read_bytes or
- (self._read_partial and self._read_buffer_size > 0))):
- num_bytes = min(self._read_bytes, self._read_buffer_size)
- return num_bytes
- elif self._read_delimiter is not None:
- # Multi-byte delimiters (e.g. '\r\n') may straddle two
- # chunks in the read buffer, so we can't easily find them
- # without collapsing the buffer. However, since protocols
- # using delimited reads (as opposed to reads of a known
- # length) tend to be "line" oriented, the delimiter is likely
- # to be in the first few chunks. Merge the buffer gradually
- # since large merges are relatively expensive and get undone in
- # _consume().
- if self._read_buffer:
- loc = self._read_buffer.find(self._read_delimiter,
- self._read_buffer_pos)
- if loc != -1:
- loc -= self._read_buffer_pos
- delimiter_len = len(self._read_delimiter)
- self._check_max_bytes(self._read_delimiter,
- loc + delimiter_len)
- return loc + delimiter_len
- self._check_max_bytes(self._read_delimiter,
- self._read_buffer_size)
- elif self._read_regex is not None:
- if self._read_buffer:
- m = self._read_regex.search(self._read_buffer,
- self._read_buffer_pos)
- if m is not None:
- loc = m.end() - self._read_buffer_pos
- self._check_max_bytes(self._read_regex, loc)
- return loc
- self._check_max_bytes(self._read_regex, self._read_buffer_size)
- return None
- def _check_max_bytes(self, delimiter, size):
- if (self._read_max_bytes is not None and
- size > self._read_max_bytes):
- raise UnsatisfiableReadError(
- "delimiter %r not found within %d bytes" % (
- delimiter, self._read_max_bytes))
- def _handle_write(self):
- while True:
- size = len(self._write_buffer)
- if not size:
- break
- assert size > 0
- try:
- if _WINDOWS:
- # On windows, socket.send blows up if given a
- # write buffer that's too large, instead of just
- # returning the number of bytes it was able to
- # process. Therefore we must not call socket.send
- # with more than 128KB at a time.
- size = 128 * 1024
- num_bytes = self.write_to_fd(self._write_buffer.peek(size))
- if num_bytes == 0:
- break
- self._write_buffer.advance(num_bytes)
- self._total_write_done_index += num_bytes
- except (socket.error, IOError, OSError) as e:
- if e.args[0] in _ERRNO_WOULDBLOCK:
- break
- else:
- if not self._is_connreset(e):
- # Broken pipe errors are usually caused by connection
- # reset, and its better to not log EPIPE errors to
- # minimize log spam
- gen_log.warning("Write error on %s: %s",
- self.fileno(), e)
- self.close(exc_info=e)
- return
- while self._write_futures:
- index, future = self._write_futures[0]
- if index > self._total_write_done_index:
- break
- self._write_futures.popleft()
- future.set_result(None)
- if not len(self._write_buffer):
- if self._write_callback:
- callback = self._write_callback
- self._write_callback = None
- self._run_callback(callback)
- def _consume(self, loc):
- # Consume loc bytes from the read buffer and return them
- if loc == 0:
- return b""
- assert loc <= self._read_buffer_size
- # Slice the bytearray buffer into bytes, without intermediate copying
- b = (memoryview(self._read_buffer)
- [self._read_buffer_pos:self._read_buffer_pos + loc]
- ).tobytes()
- self._read_buffer_pos += loc
- self._read_buffer_size -= loc
- # Amortized O(1) shrink
- # (this heuristic is implemented natively in Python 3.4+
- # but is replicated here for Python 2)
- if self._read_buffer_pos > self._read_buffer_size:
- del self._read_buffer[:self._read_buffer_pos]
- self._read_buffer_pos = 0
- return b
- def _check_closed(self):
- if self.closed():
- raise StreamClosedError(real_error=self.error)
- def _maybe_add_error_listener(self):
- # This method is part of an optimization: to detect a connection that
- # is closed when we're not actively reading or writing, we must listen
- # for read events. However, it is inefficient to do this when the
- # connection is first established because we are going to read or write
- # immediately anyway. Instead, we insert checks at various times to
- # see if the connection is idle and add the read listener then.
- if self._pending_callbacks != 0:
- return
- if self._state is None or self._state == ioloop.IOLoop.ERROR:
- if self.closed():
- self._maybe_run_close_callback()
- elif (self._read_buffer_size == 0 and
- self._close_callback is not None):
- self._add_io_state(ioloop.IOLoop.READ)
- def _add_io_state(self, state):
- """Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.
- Implementation notes: Reads and writes have a fast path and a
- slow path. The fast path reads synchronously from socket
- buffers, while the slow path uses `_add_io_state` to schedule
- an IOLoop callback. Note that in both cases, the callback is
- run asynchronously with `_run_callback`.
- To detect closed connections, we must have called
- `_add_io_state` at some point, but we want to delay this as
- much as possible so we don't have to set an `IOLoop.ERROR`
- listener that will be overwritten by the next slow-path
- operation. As long as there are callbacks scheduled for
- fast-path ops, those callbacks may do more reads.
- If a sequence of fast-path ops do not end in a slow-path op,
- (e.g. for an @asynchronous long-poll request), we must add
- the error handler. This is done in `_run_callback` and `write`
- (since the write callback is optional so we can have a
- fast-path write with no `_run_callback`)
- """
- if self.closed():
- # connection has been closed, so there can be no future events
- return
- if self._state is None:
- self._state = ioloop.IOLoop.ERROR | state
- with stack_context.NullContext():
- self.io_loop.add_handler(
- self.fileno(), self._handle_events, self._state)
- elif not self._state & state:
- self._state = self._state | state
- self.io_loop.update_handler(self.fileno(), self._state)
- def _is_connreset(self, exc):
- """Return true if exc is ECONNRESET or equivalent.
- May be overridden in subclasses.
- """
- return (isinstance(exc, (socket.error, IOError)) and
- errno_from_exception(exc) in _ERRNO_CONNRESET)
- class IOStream(BaseIOStream):
- r"""Socket-based `IOStream` implementation.
- This class supports the read and write methods from `BaseIOStream`
- plus a `connect` method.
- The ``socket`` parameter may either be connected or unconnected.
- For server operations the socket is the result of calling
- `socket.accept <socket.socket.accept>`. For client operations the
- socket is created with `socket.socket`, and may either be
- connected before passing it to the `IOStream` or connected with
- `IOStream.connect`.
- A very simple (and broken) HTTP client using this class:
- .. testcode::
- import tornado.ioloop
- import tornado.iostream
- import socket
- async def main():
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- stream = tornado.iostream.IOStream(s)
- await stream.connect(("friendfeed.com", 80))
- await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
- header_data = await stream.read_until(b"\r\n\r\n")
- headers = {}
- for line in header_data.split(b"\r\n"):
- parts = line.split(b":")
- if len(parts) == 2:
- headers[parts[0].strip()] = parts[1].strip()
- body_data = await stream.read_bytes(int(headers[b"Content-Length"]))
- print(body_data)
- stream.close()
- if __name__ == '__main__':
- tornado.ioloop.IOLoop.current().run_sync(main)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- stream = tornado.iostream.IOStream(s)
- stream.connect(("friendfeed.com", 80), send_request)
- tornado.ioloop.IOLoop.current().start()
- .. testoutput::
- :hide:
- """
- def __init__(self, socket, *args, **kwargs):
- self.socket = socket
- self.socket.setblocking(False)
- super(IOStream, self).__init__(*args, **kwargs)
- def fileno(self):
- return self.socket
- def close_fd(self):
- self.socket.close()
- self.socket = None
- def get_fd_error(self):
- errno = self.socket.getsockopt(socket.SOL_SOCKET,
- socket.SO_ERROR)
- return socket.error(errno, os.strerror(errno))
- def read_from_fd(self, buf):
- try:
- return self.socket.recv_into(buf)
- except socket.error as e:
- if e.args[0] in _ERRNO_WOULDBLOCK:
- return None
- else:
- raise
- finally:
- buf = None
- def write_to_fd(self, data):
- try:
- return self.socket.send(data)
- finally:
- # Avoid keeping to data, which can be a memoryview.
- # See https://github.com/tornadoweb/tornado/pull/2008
- del data
- def connect(self, address, callback=None, server_hostname=None):
- """Connects the socket to a remote address without blocking.
- May only be called if the socket passed to the constructor was
- not previously connected. The address parameter is in the
- same format as for `socket.connect <socket.socket.connect>` for
- the type of socket passed to the IOStream constructor,
- e.g. an ``(ip, port)`` tuple. Hostnames are accepted here,
- but will be resolved synchronously and block the IOLoop.
- If you have a hostname instead of an IP address, the `.TCPClient`
- class is recommended instead of calling this method directly.
- `.TCPClient` will do asynchronous DNS resolution and handle
- both IPv4 and IPv6.
- If ``callback`` is specified, it will be called with no
- arguments when the connection is completed; if not this method
- returns a `.Future` (whose result after a successful
- connection will be the stream itself).
- In SSL mode, the ``server_hostname`` parameter will be used
- for certificate validation (unless disabled in the
- ``ssl_options``) and SNI (if supported; requires Python
- 2.7.9+).
- Note that it is safe to call `IOStream.write
- <BaseIOStream.write>` while the connection is pending, in
- which case the data will be written as soon as the connection
- is ready. Calling `IOStream` read methods before the socket is
- connected works on some platforms but is non-portable.
- .. versionchanged:: 4.0
- If no callback is given, returns a `.Future`.
- .. versionchanged:: 4.2
- SSL certificates are validated by default; pass
- ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
- suitably-configured `ssl.SSLContext` to the
- `SSLIOStream` constructor to disable.
- .. deprecated:: 5.1
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
- """
- self._connecting = True
- if callback is not None:
- warnings.warn("callback argument is deprecated, use returned Future instead",
- DeprecationWarning)
- self._connect_callback = stack_context.wrap(callback)
- future = None
- else:
- future = self._connect_future = Future()
- try:
- self.socket.connect(address)
- except socket.error as e:
- # In non-blocking mode we expect connect() to raise an
- # exception with EINPROGRESS or EWOULDBLOCK.
- #
- # On freebsd, other errors such as ECONNREFUSED may be
- # returned immediately when attempting to connect to
- # localhost, so handle them the same way as an error
- # reported later in _handle_connect.
- if (errno_from_exception(e) not in _ERRNO_INPROGRESS and
- errno_from_exception(e) not in _ERRNO_WOULDBLOCK):
- if future is None:
- gen_log.warning("Connect error on fd %s: %s",
- self.socket.fileno(), e)
- self.close(exc_info=e)
- return future
- self._add_io_state(self.io_loop.WRITE)
- return future
- def start_tls(self, server_side, ssl_options=None, server_hostname=None):
- """Convert this `IOStream` to an `SSLIOStream`.
- This enables protocols that begin in clear-text mode and
- switch to SSL after some initial negotiation (such as the
- ``STARTTLS`` extension to SMTP and IMAP).
- This method cannot be used if there are outstanding reads
- or writes on the stream, or if there is any data in the
- IOStream's buffer (data in the operating system's socket
- buffer is allowed). This means it must generally be used
- immediately after reading or writing the last clear-text
- data. It can also be used immediately after connecting,
- before any reads or writes.
- The ``ssl_options`` argument may be either an `ssl.SSLContext`
- object or a dictionary of keyword arguments for the
- `ssl.wrap_socket` function. The ``server_hostname`` argument
- will be used for certificate validation unless disabled
- in the ``ssl_options``.
- This method returns a `.Future` whose result is the new
- `SSLIOStream`. After this method has been called,
- any other operation on the original stream is undefined.
- If a close callback is defined on this stream, it will be
- transferred to the new stream.
- .. versionadded:: 4.0
- .. versionchanged:: 4.2
- SSL certificates are validated by default; pass
- ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
- suitably-configured `ssl.SSLContext` to disable.
- """
- if (self._read_callback or self._read_future or
- self._write_callback or self._write_futures or
- self._connect_callback or self._connect_future or
- self._pending_callbacks or self._closed or
- self._read_buffer or self._write_buffer):
- raise ValueError("IOStream is not idle; cannot convert to SSL")
- if ssl_options is None:
- if server_side:
- ssl_options = _server_ssl_defaults
- else:
- ssl_options = _client_ssl_defaults
- socket = self.socket
- self.io_loop.remove_handler(socket)
- self.socket = None
- socket = ssl_wrap_socket(socket, ssl_options,
- server_hostname=server_hostname,
- server_side=server_side,
- do_handshake_on_connect=False)
- orig_close_callback = self._close_callback
- self._close_callback = None
- future = Future()
- ssl_stream = SSLIOStream(socket, ssl_options=ssl_options)
- # Wrap the original close callback so we can fail our Future as well.
- # If we had an "unwrap" counterpart to this method we would need
- # to restore the original callback after our Future resolves
- # so that repeated wrap/unwrap calls don't build up layers.
- def close_callback():
- if not future.done():
- # Note that unlike most Futures returned by IOStream,
- # this one passes the underlying error through directly
- # instead of wrapping everything in a StreamClosedError
- # with a real_error attribute. This is because once the
- # connection is established it's more helpful to raise
- # the SSLError directly than to hide it behind a
- # StreamClosedError (and the client is expecting SSL
- # issues rather than network issues since this method is
- # named start_tls).
- future.set_exception(ssl_stream.error or StreamClosedError())
- if orig_close_callback is not None:
- orig_close_callback()
- ssl_stream.set_close_callback(close_callback)
- ssl_stream._ssl_connect_callback = lambda: future.set_result(ssl_stream)
- ssl_stream.max_buffer_size = self.max_buffer_size
- ssl_stream.read_chunk_size = self.read_chunk_size
- return future
- def _handle_connect(self):
- try:
- err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
- except socket.error as e:
- # Hurd doesn't allow SO_ERROR for loopback sockets because all
- # errors for such sockets are reported synchronously.
- if errno_from_exception(e) == errno.ENOPROTOOPT:
- err = 0
- if err != 0:
- self.error = socket.error(err, os.strerror(err))
- # IOLoop implementations may vary: some of them return
- # an error state before the socket becomes writable, so
- # in that case a connection failure would be handled by the
- # error path in _handle_events instead of here.
- if self._connect_future is None:
- gen_log.warning("Connect error on fd %s: %s",
- self.socket.fileno(), errno.errorcode[err])
- self.close()
- return
- if self._connect_callback is not None:
- callback = self._connect_callback
- self._connect_callback = None
- self._run_callback(callback)
- if self._connect_future is not None:
- future = self._connect_future
- self._connect_future = None
- future.set_result(self)
- self._connecting = False
- def set_nodelay(self, value):
- if (self.socket is not None and
- self.socket.family in (socket.AF_INET, socket.AF_INET6)):
- try:
- self.socket.setsockopt(socket.IPPROTO_TCP,
- socket.TCP_NODELAY, 1 if value else 0)
- except socket.error as e:
- # Sometimes setsockopt will fail if the socket is closed
- # at the wrong time. This can happen with HTTPServer
- # resetting the value to false between requests.
- if e.errno != errno.EINVAL and not self._is_connreset(e):
- raise
- class SSLIOStream(IOStream):
- """A utility class to write to and read from a non-blocking SSL socket.
- If the socket passed to the constructor is already connected,
- it should be wrapped with::
- ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
- before constructing the `SSLIOStream`. Unconnected sockets will be
- wrapped when `IOStream.connect` is finished.
- """
- def __init__(self, *args, **kwargs):
- """The ``ssl_options`` keyword argument may either be an
- `ssl.SSLContext` object or a dictionary of keywords arguments
- for `ssl.wrap_socket`
- """
- self._ssl_options = kwargs.pop('ssl_options', _client_ssl_defaults)
- super(SSLIOStream, self).__init__(*args, **kwargs)
- self._ssl_accepting = True
- self._handshake_reading = False
- self._handshake_writing = False
- self._ssl_connect_callback = None
- self._server_hostname = None
- # If the socket is already connected, attempt to start the handshake.
- try:
- self.socket.getpeername()
- except socket.error:
- pass
- else:
- # Indirectly start the handshake, which will run on the next
- # IOLoop iteration and then the real IO state will be set in
- # _handle_events.
- self._add_io_state(self.io_loop.WRITE)
- def reading(self):
- return self._handshake_reading or super(SSLIOStream, self).reading()
- def writing(self):
- return self._handshake_writing or super(SSLIOStream, self).writing()
- def _do_ssl_handshake(self):
- # Based on code from test_ssl.py in the python stdlib
- try:
- self._handshake_reading = False
- self._handshake_writing = False
- self.socket.do_handshake()
- except ssl.SSLError as err:
- if err.args[0] == ssl.SSL_ERROR_WANT_READ:
- self._handshake_reading = True
- return
- elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
- self._handshake_writing = True
- return
- elif err.args[0] in (ssl.SSL_ERROR_EOF,
- ssl.SSL_ERROR_ZERO_RETURN):
- return self.close(exc_info=err)
- elif err.args[0] == ssl.SSL_ERROR_SSL:
- try:
- peer = self.socket.getpeername()
- except Exception:
- peer = '(not connected)'
- gen_log.warning("SSL Error on %s %s: %s",
- self.socket.fileno(), peer, err)
- return self.close(exc_info=err)
- raise
- except socket.error as err:
- # Some port scans (e.g. nmap in -sT mode) have been known
- # to cause do_handshake to raise EBADF and ENOTCONN, so make
- # those errors quiet as well.
- # https://groups.google.com/forum/?fromgroups#!topic/python-tornado/ApucKJat1_0
- if (self._is_connreset(err) or
- err.args[0] in (errno.EBADF, errno.ENOTCONN)):
- return self.close(exc_info=err)
- raise
- except AttributeError as err:
- # On Linux, if the connection was reset before the call to
- # wrap_socket, do_handshake will fail with an
- # AttributeError.
- return self.close(exc_info=err)
- else:
- self._ssl_accepting = False
- if not self._verify_cert(self.socket.getpeercert()):
- self.close()
- return
- self._run_ssl_connect_callback()
- def _run_ssl_connect_callback(self):
- if self._ssl_connect_callback is not None:
- callback = self._ssl_connect_callback
- self._ssl_connect_callback = None
- self._run_callback(callback)
- if self._ssl_connect_future is not None:
- future = self._ssl_connect_future
- self._ssl_connect_future = None
- future.set_result(self)
- def _verify_cert(self, peercert):
- """Returns True if peercert is valid according to the configured
- validation mode and hostname.
- The ssl handshake already tested the certificate for a valid
- CA signature; the only thing that remains is to check
- the hostname.
- """
- if isinstance(self._ssl_options, dict):
- verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
- elif isinstance(self._ssl_options, ssl.SSLContext):
- verify_mode = self._ssl_options.verify_mode
- assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
- if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
- return True
- cert = self.socket.getpeercert()
- if cert is None and verify_mode == ssl.CERT_REQUIRED:
- gen_log.warning("No SSL certificate given")
- return False
- try:
- ssl.match_hostname(peercert, self._server_hostname)
- except ssl.CertificateError as e:
- gen_log.warning("Invalid SSL certificate: %s" % e)
- return False
- else:
- return True
- def _handle_read(self):
- if self._ssl_accepting:
- self._do_ssl_handshake()
- return
- super(SSLIOStream, self)._handle_read()
- def _handle_write(self):
- if self._ssl_accepting:
- self._do_ssl_handshake()
- return
- super(SSLIOStream, self)._handle_write()
- def connect(self, address, callback=None, server_hostname=None):
- self._server_hostname = server_hostname
- # Ignore the result of connect(). If it fails,
- # wait_for_handshake will raise an error too. This is
- # necessary for the old semantics of the connect callback
- # (which takes no arguments). In 6.0 this can be refactored to
- # be a regular coroutine.
- fut = super(SSLIOStream, self).connect(address)
- fut.add_done_callback(lambda f: f.exception())
- return self.wait_for_handshake(callback)
- def _handle_connect(self):
- # Call the superclass method to check for errors.
- super(SSLIOStream, self)._handle_connect()
- if self.closed():
- return
- # When the connection is complete, wrap the socket for SSL
- # traffic. Note that we do this by overriding _handle_connect
- # instead of by passing a callback to super().connect because
- # user callbacks are enqueued asynchronously on the IOLoop,
- # but since _handle_events calls _handle_connect immediately
- # followed by _handle_write we need this to be synchronous.
- #
- # The IOLoop will get confused if we swap out self.socket while the
- # fd is registered, so remove it now and re-register after
- # wrap_socket().
- self.io_loop.remove_handler(self.socket)
- old_state = self._state
- self._state = None
- self.socket = ssl_wrap_socket(self.socket, self._ssl_options,
- server_hostname=self._server_hostname,
- do_handshake_on_connect=False)
- self._add_io_state(old_state)
- def wait_for_handshake(self, callback=None):
- """Wait for the initial SSL handshake to complete.
- If a ``callback`` is given, it will be called with no
- arguments once the handshake is complete; otherwise this
- method returns a `.Future` which will resolve to the
- stream itself after the handshake is complete.
- Once the handshake is complete, information such as
- the peer's certificate and NPN/ALPN selections may be
- accessed on ``self.socket``.
- This method is intended for use on server-side streams
- or after using `IOStream.start_tls`; it should not be used
- with `IOStream.connect` (which already waits for the
- handshake to complete). It may only be called once per stream.
- .. versionadded:: 4.2
- .. deprecated:: 5.1
- The ``callback`` argument is deprecated and will be removed
- in Tornado 6.0. Use the returned `.Future` instead.
- """
- if (self._ssl_connect_callback is not None or
- self._ssl_connect_future is not None):
- raise RuntimeError("Already waiting")
- if callback is not None:
- warnings.warn("callback argument is deprecated, use returned Future instead",
- DeprecationWarning)
- self._ssl_connect_callback = stack_context.wrap(callback)
- future = None
- else:
- future = self._ssl_connect_future = Future()
- if not self._ssl_accepting:
- self._run_ssl_connect_callback()
- return future
- def write_to_fd(self, data):
- try:
- return self.socket.send(data)
- except ssl.SSLError as e:
- if e.args[0] == ssl.SSL_ERROR_WANT_WRITE:
- # In Python 3.5+, SSLSocket.send raises a WANT_WRITE error if
- # the socket is not writeable; we need to transform this into
- # an EWOULDBLOCK socket.error or a zero return value,
- # either of which will be recognized by the caller of this
- # method. Prior to Python 3.5, an unwriteable socket would
- # simply return 0 bytes written.
- return 0
- raise
- finally:
- # Avoid keeping to data, which can be a memoryview.
- # See https://github.com/tornadoweb/tornado/pull/2008
- del data
- def read_from_fd(self, buf):
- try:
- if self._ssl_accepting:
- # If the handshake hasn't finished yet, there can't be anything
- # to read (attempting to read may or may not raise an exception
- # depending on the SSL version)
- return None
- try:
- return self.socket.recv_into(buf)
- except ssl.SSLError as e:
- # SSLError is a subclass of socket.error, so this except
- # block must come first.
- if e.args[0] == ssl.SSL_ERROR_WANT_READ:
- return None
- else:
- raise
- except socket.error as e:
- if e.args[0] in _ERRNO_WOULDBLOCK:
- return None
- else:
- raise
- finally:
- buf = None
- def _is_connreset(self, e):
- if isinstance(e, ssl.SSLError) and e.args[0] == ssl.SSL_ERROR_EOF:
- return True
- return super(SSLIOStream, self)._is_connreset(e)
- class PipeIOStream(BaseIOStream):
- """Pipe-based `IOStream` implementation.
- The constructor takes an integer file descriptor (such as one returned
- by `os.pipe`) rather than an open file object. Pipes are generally
- one-way, so a `PipeIOStream` can be used for reading or writing but not
- both.
- """
- def __init__(self, fd, *args, **kwargs):
- self.fd = fd
- self._fio = io.FileIO(self.fd, "r+")
- _set_nonblocking(fd)
- super(PipeIOStream, self).__init__(*args, **kwargs)
- def fileno(self):
- return self.fd
- def close_fd(self):
- self._fio.close()
- def write_to_fd(self, data):
- try:
- return os.write(self.fd, data)
- finally:
- # Avoid keeping to data, which can be a memoryview.
- # See https://github.com/tornadoweb/tornado/pull/2008
- del data
- def read_from_fd(self, buf):
- try:
- return self._fio.readinto(buf)
- except (IOError, OSError) as e:
- if errno_from_exception(e) == errno.EBADF:
- # If the writing half of a pipe is closed, select will
- # report it as readable but reads will fail with EBADF.
- self.close(exc_info=e)
- return None
- else:
- raise
- finally:
- buf = None
- def doctests():
- import doctest
- return doctest.DocTestSuite()
|