123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065 |
- # Port of Python 3.3's socket module to gevent
- """
- Python 3 socket module.
- """
- # Our import magic sadly makes this warning useless
- # pylint: disable=undefined-variable
- # pylint: disable=too-many-statements,too-many-branches
- # pylint: disable=too-many-public-methods,unused-argument
- from __future__ import absolute_import
- import io
- import os
- import sys
- import time
- from gevent import _socketcommon
- from gevent._util import copy_globals
- from gevent._compat import PYPY
- import _socket
- from os import dup
- copy_globals(_socketcommon, globals(),
- names_to_ignore=_socketcommon.__extensions__,
- dunder_names_to_keep=())
- __socket__ = _socketcommon.__socket__
- __implements__ = _socketcommon._implements
- __extensions__ = _socketcommon.__extensions__
- __imports__ = _socketcommon.__imports__
- __dns__ = _socketcommon.__dns__
- SocketIO = __socket__.SocketIO # pylint:disable=no-member
- def _get_memory(data):
- mv = memoryview(data)
- if mv.shape:
- return mv
- # No shape, probably working with a ctypes object,
- # or something else exotic that supports the buffer interface
- return mv.tobytes()
- timeout_default = object()
- class _wrefsocket(_socket.socket):
- # Plain stdlib socket.socket objects subclass _socket.socket
- # and add weakref ability. The ssl module, for one, counts on this.
- # We don't create socket.socket objects (because they may have been
- # monkey patched to be the object from this module), but we still
- # need to make sure what we do create can be weakrefd.
- __slots__ = ("__weakref__", )
- if PYPY:
- # server.py unwraps the socket object to get the raw _sock;
- # it depends on having a timeout property alias, which PyPy does not
- # provide.
- timeout = property(lambda s: s.gettimeout(),
- lambda s, nv: s.settimeout(nv))
- class socket(object):
- """
- gevent `socket.socket <https://docs.python.org/3/library/socket.html#socket-objects>`_
- for Python 3.
- This object should have the same API as the standard library socket linked to above. Not all
- methods are specifically documented here; when they are they may point out a difference
- to be aware of or may document a method the standard library does not.
- """
- # Subclasses can set this to customize the type of the
- # native _socket.socket we create. It MUST be a subclass
- # of _wrefsocket. (gevent internal usage only)
- _gevent_sock_class = _wrefsocket
- def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None):
- # Take the same approach as socket2: wrap a real socket object,
- # don't subclass it. This lets code that needs the raw _sock (not tied to the hub)
- # get it. This shows up in tests like test__example_udp_server.
- self._sock = self._gevent_sock_class(family, type, proto, fileno)
- self._io_refs = 0
- self._closed = False
- _socket.socket.setblocking(self._sock, False)
- fileno = _socket.socket.fileno(self._sock)
- self.hub = get_hub()
- io_class = self.hub.loop.io
- self._read_event = io_class(fileno, 1)
- self._write_event = io_class(fileno, 2)
- self.timeout = _socket.getdefaulttimeout()
- def __getattr__(self, name):
- return getattr(self._sock, name)
- if hasattr(_socket, 'SOCK_NONBLOCK'):
- # Only defined under Linux
- @property
- def type(self):
- # See https://github.com/gevent/gevent/pull/399
- if self.timeout != 0.0:
- return self._sock.type & ~_socket.SOCK_NONBLOCK # pylint:disable=no-member
- return self._sock.type
- def __enter__(self):
- return self
- def __exit__(self, *args):
- if not self._closed:
- self.close()
- def __repr__(self):
- """Wrap __repr__() to reveal the real class name."""
- try:
- s = _socket.socket.__repr__(self._sock)
- except Exception as ex: # pylint:disable=broad-except
- # Observed on Windows Py3.3, printing the repr of a socket
- # that just sufferred a ConnectionResetError [WinError 10054]:
- # "OverflowError: no printf formatter to display the socket descriptor in decimal"
- # Not sure what the actual cause is or if there's a better way to handle this
- s = '<socket [%r]>' % ex
- if s.startswith("<socket object"):
- s = "<%s.%s%s%s" % (self.__class__.__module__,
- self.__class__.__name__,
- getattr(self, '_closed', False) and " [closed] " or "",
- s[7:])
- return s
- def __getstate__(self):
- raise TypeError("Cannot serialize socket object")
- def _get_ref(self):
- return self._read_event.ref or self._write_event.ref
- def _set_ref(self, value):
- self._read_event.ref = value
- self._write_event.ref = value
- ref = property(_get_ref, _set_ref)
- def _wait(self, watcher, timeout_exc=timeout('timed out')):
- """Block the current greenlet until *watcher* has pending events.
- If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
- By default *timeout_exc* is ``socket.timeout('timed out')``.
- If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
- """
- if watcher.callback is not None:
- raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
- if self.timeout is not None:
- timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
- else:
- timeout = None
- try:
- self.hub.wait(watcher)
- finally:
- if timeout is not None:
- timeout.cancel()
- def dup(self):
- """dup() -> socket object
- Return a new socket object connected to the same system resource.
- """
- fd = dup(self.fileno())
- sock = self.__class__(self.family, self.type, self.proto, fileno=fd)
- sock.settimeout(self.gettimeout())
- return sock
- def accept(self):
- """accept() -> (socket object, address info)
- Wait for an incoming connection. Return a new socket
- representing the connection, and the address of the client.
- For IP sockets, the address info is a pair (hostaddr, port).
- """
- while True:
- try:
- fd, addr = self._accept()
- break
- except BlockingIOError:
- if self.timeout == 0.0:
- raise
- self._wait(self._read_event)
- sock = socket(self.family, self.type, self.proto, fileno=fd)
- # Python Issue #7995: if no default timeout is set and the listening
- # socket had a (non-zero) timeout, force the new socket in blocking
- # mode to override platform-specific socket flags inheritance.
- # XXX do we need to do this?
- if getdefaulttimeout() is None and self.gettimeout():
- sock.setblocking(True)
- return sock, addr
- def makefile(self, mode="r", buffering=None, *,
- encoding=None, errors=None, newline=None):
- """Return an I/O stream connected to the socket
- The arguments are as for io.open() after the filename,
- except the only mode characters supported are 'r', 'w' and 'b'.
- The semantics are similar too.
- """
- # (XXX refactor to share code?)
- for c in mode:
- if c not in {"r", "w", "b"}:
- raise ValueError("invalid mode %r (only r, w, b allowed)")
- writing = "w" in mode
- reading = "r" in mode or not writing
- assert reading or writing
- binary = "b" in mode
- rawmode = ""
- if reading:
- rawmode += "r"
- if writing:
- rawmode += "w"
- raw = SocketIO(self, rawmode)
- self._io_refs += 1
- if buffering is None:
- buffering = -1
- if buffering < 0:
- buffering = io.DEFAULT_BUFFER_SIZE
- if buffering == 0:
- if not binary:
- raise ValueError("unbuffered streams must be binary")
- return raw
- if reading and writing:
- buffer = io.BufferedRWPair(raw, raw, buffering)
- elif reading:
- buffer = io.BufferedReader(raw, buffering)
- else:
- assert writing
- buffer = io.BufferedWriter(raw, buffering)
- if binary:
- return buffer
- text = io.TextIOWrapper(buffer, encoding, errors, newline)
- text.mode = mode
- return text
- def _decref_socketios(self):
- if self._io_refs > 0:
- self._io_refs -= 1
- if self._closed:
- self.close()
- def _real_close(self, _ss=_socket.socket, cancel_wait_ex=cancel_wait_ex):
- # This function should not reference any globals. See Python issue #808164.
- self.hub.cancel_wait(self._read_event, cancel_wait_ex)
- self.hub.cancel_wait(self._write_event, cancel_wait_ex)
- _ss.close(self._sock)
- # Break any references to the underlying socket object. Tested
- # by test__refcount. (Why does this matter?). Be sure to
- # preserve our same family/type/proto if possible (if we
- # don't, we can get TypeError instead of OSError; see
- # test_socket.SendmsgUDP6Test.testSendmsgAfterClose)... but
- # this isn't always possible (see test_socket.test_unknown_socket_family_repr)
- # TODO: Can we use a simpler proxy, like _socket2 does?
- try:
- self._sock = self._gevent_sock_class(self.family, self.type, self.proto)
- except OSError:
- pass
- else:
- _ss.close(self._sock)
- def close(self):
- # This function should not reference any globals. See Python issue #808164.
- self._closed = True
- if self._io_refs <= 0:
- self._real_close()
- @property
- def closed(self):
- return self._closed
- def detach(self):
- """detach() -> file descriptor
- Close the socket object without closing the underlying file descriptor.
- The object cannot be used after this call, but the file descriptor
- can be reused for other purposes. The file descriptor is returned.
- """
- self._closed = True
- return self._sock.detach()
- def connect(self, address):
- if self.timeout == 0.0:
- return _socket.socket.connect(self._sock, address)
- if isinstance(address, tuple):
- r = getaddrinfo(address[0], address[1], self.family)
- address = r[0][-1]
- if self.timeout is not None:
- timer = Timeout.start_new(self.timeout, timeout('timed out'))
- else:
- timer = None
- try:
- while True:
- err = self.getsockopt(SOL_SOCKET, SO_ERROR)
- if err:
- raise error(err, strerror(err))
- result = _socket.socket.connect_ex(self._sock, address)
- if not result or result == EISCONN:
- break
- elif (result in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or (result == EINVAL and is_windows):
- self._wait(self._write_event)
- else:
- raise error(result, strerror(result))
- finally:
- if timer is not None:
- timer.cancel()
- def connect_ex(self, address):
- try:
- return self.connect(address) or 0
- except timeout:
- return EAGAIN
- except gaierror:
- # gaierror/overflowerror/typerror is not silenced by connect_ex;
- # gaierror extends OSError (aka error) so catch it first
- raise
- except error as ex:
- # error is now OSError and it has various subclasses.
- # Only those that apply to actually connecting are silenced by
- # connect_ex.
- if ex.errno:
- return ex.errno
- raise # pragma: no cover
- def recv(self, *args):
- while True:
- try:
- return _socket.socket.recv(self._sock, *args)
- except error as ex:
- if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
- raise
- self._wait(self._read_event)
- if hasattr(_socket.socket, 'sendmsg'):
- # Only on Unix
- def recvmsg(self, *args):
- while True:
- try:
- return _socket.socket.recvmsg(self._sock, *args)
- except error as ex:
- if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
- raise
- self._wait(self._read_event)
- def recvmsg_into(self, *args):
- while True:
- try:
- return _socket.socket.recvmsg_into(self._sock, *args)
- except error as ex:
- if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
- raise
- self._wait(self._read_event)
- def recvfrom(self, *args):
- while True:
- try:
- return _socket.socket.recvfrom(self._sock, *args)
- except error as ex:
- if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
- raise
- self._wait(self._read_event)
- def recvfrom_into(self, *args):
- while True:
- try:
- return _socket.socket.recvfrom_into(self._sock, *args)
- except error as ex:
- if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
- raise
- self._wait(self._read_event)
- def recv_into(self, *args):
- while True:
- try:
- return _socket.socket.recv_into(self._sock, *args)
- except error as ex:
- if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
- raise
- self._wait(self._read_event)
- def send(self, data, flags=0, timeout=timeout_default):
- if timeout is timeout_default:
- timeout = self.timeout
- try:
- return _socket.socket.send(self._sock, data, flags)
- except error as ex:
- if ex.args[0] != EWOULDBLOCK or timeout == 0.0:
- raise
- self._wait(self._write_event)
- try:
- return _socket.socket.send(self._sock, data, flags)
- except error as ex2:
- if ex2.args[0] == EWOULDBLOCK:
- return 0
- raise
- def sendall(self, data, flags=0):
- # XXX Now that we run on PyPy3, see the notes in _socket2.py's sendall()
- # and implement that here if needed.
- # PyPy3 is not optimized for performance yet, and is known to be slower than
- # PyPy2, so it's possibly premature to do this. However, there is a 3.5 test case that
- # possibly exposes this in a severe way.
- data_memory = _get_memory(data)
- len_data_memory = len(data_memory)
- if not len_data_memory:
- # Don't try to send empty data at all, no point, and breaks ssl
- # See issue 719
- return 0
- if self.timeout is None:
- data_sent = 0
- while data_sent < len_data_memory:
- data_sent += self.send(data_memory[data_sent:], flags)
- else:
- timeleft = self.timeout
- end = time.time() + timeleft
- data_sent = 0
- while True:
- data_sent += self.send(data_memory[data_sent:], flags, timeout=timeleft)
- if data_sent >= len_data_memory:
- break
- timeleft = end - time.time()
- if timeleft <= 0:
- raise timeout('timed out')
- def sendto(self, *args):
- try:
- return _socket.socket.sendto(self._sock, *args)
- except error as ex:
- if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
- raise
- self._wait(self._write_event)
- try:
- return _socket.socket.sendto(self._sock, *args)
- except error as ex2:
- if ex2.args[0] == EWOULDBLOCK:
- return 0
- raise
- if hasattr(_socket.socket, 'sendmsg'):
- # Only on Unix
- def sendmsg(self, buffers, ancdata=(), flags=0, address=None):
- try:
- return _socket.socket.sendmsg(self._sock, buffers, ancdata, flags, address)
- except error as ex:
- if flags & getattr(_socket, 'MSG_DONTWAIT', 0):
- # Enable non-blocking behaviour
- # XXX: Do all platforms that have sendmsg have MSG_DONTWAIT?
- raise
- if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
- raise
- self._wait(self._write_event)
- try:
- return _socket.socket.sendmsg(self._sock, buffers, ancdata, flags, address)
- except error as ex2:
- if ex2.args[0] == EWOULDBLOCK:
- return 0
- raise
- def setblocking(self, flag):
- if flag:
- self.timeout = None
- else:
- self.timeout = 0.0
- def settimeout(self, howlong):
- if howlong is not None:
- try:
- f = howlong.__float__
- except AttributeError:
- raise TypeError('a float is required')
- howlong = f()
- if howlong < 0.0:
- raise ValueError('Timeout value out of range')
- self.__dict__['timeout'] = howlong
- def gettimeout(self):
- return self.__dict__['timeout']
- def shutdown(self, how):
- if how == 0: # SHUT_RD
- self.hub.cancel_wait(self._read_event, cancel_wait_ex)
- elif how == 1: # SHUT_WR
- self.hub.cancel_wait(self._write_event, cancel_wait_ex)
- else:
- self.hub.cancel_wait(self._read_event, cancel_wait_ex)
- self.hub.cancel_wait(self._write_event, cancel_wait_ex)
- self._sock.shutdown(how)
- # sendfile: new in 3.5. But there's no real reason to not
- # support it everywhere. Note that we can't use os.sendfile()
- # because it's not cooperative.
- def _sendfile_use_sendfile(self, file, offset=0, count=None):
- # This is called directly by tests
- raise __socket__._GiveupOnSendfile() # pylint:disable=no-member
- def _sendfile_use_send(self, file, offset=0, count=None):
- self._check_sendfile_params(file, offset, count)
- if self.gettimeout() == 0:
- raise ValueError("non-blocking sockets are not supported")
- if offset:
- file.seek(offset)
- blocksize = min(count, 8192) if count else 8192
- total_sent = 0
- # localize variable access to minimize overhead
- file_read = file.read
- sock_send = self.send
- try:
- while True:
- if count:
- blocksize = min(count - total_sent, blocksize)
- if blocksize <= 0:
- break
- data = memoryview(file_read(blocksize))
- if not data:
- break # EOF
- while True:
- try:
- sent = sock_send(data)
- except BlockingIOError:
- continue
- else:
- total_sent += sent
- if sent < len(data):
- data = data[sent:]
- else:
- break
- return total_sent
- finally:
- if total_sent > 0 and hasattr(file, 'seek'):
- file.seek(offset + total_sent)
- def _check_sendfile_params(self, file, offset, count):
- if 'b' not in getattr(file, 'mode', 'b'):
- raise ValueError("file should be opened in binary mode")
- if not self.type & SOCK_STREAM:
- raise ValueError("only SOCK_STREAM type sockets are supported")
- if count is not None:
- if not isinstance(count, int):
- raise TypeError(
- "count must be a positive integer (got {!r})".format(count))
- if count <= 0:
- raise ValueError(
- "count must be a positive integer (got {!r})".format(count))
- def sendfile(self, file, offset=0, count=None):
- """sendfile(file[, offset[, count]]) -> sent
- Send a file until EOF is reached by using high-performance
- os.sendfile() and return the total number of bytes which
- were sent.
- *file* must be a regular file object opened in binary mode.
- If os.sendfile() is not available (e.g. Windows) or file is
- not a regular file socket.send() will be used instead.
- *offset* tells from where to start reading the file.
- If specified, *count* is the total number of bytes to transmit
- as opposed to sending the file until EOF is reached.
- File position is updated on return or also in case of error in
- which case file.tell() can be used to figure out the number of
- bytes which were sent.
- The socket must be of SOCK_STREAM type.
- Non-blocking sockets are not supported.
- .. versionadded:: 1.1rc4
- Added in Python 3.5, but available under all Python 3 versions in
- gevent.
- """
- return self._sendfile_use_send(file, offset, count)
- # get/set_inheritable new in 3.4
- if hasattr(os, 'get_inheritable') or hasattr(os, 'get_handle_inheritable'):
- # pylint:disable=no-member
- if os.name == 'nt':
- def get_inheritable(self):
- return os.get_handle_inheritable(self.fileno())
- def set_inheritable(self, inheritable):
- os.set_handle_inheritable(self.fileno(), inheritable)
- else:
- def get_inheritable(self):
- return os.get_inheritable(self.fileno())
- def set_inheritable(self, inheritable):
- os.set_inheritable(self.fileno(), inheritable)
- _added = "\n\n.. versionadded:: 1.1rc4 Added in Python 3.4"
- get_inheritable.__doc__ = "Get the inheritable flag of the socket" + _added
- set_inheritable.__doc__ = "Set the inheritable flag of the socket" + _added
- del _added
- if sys.version_info[:2] == (3, 4) and sys.version_info[:3] <= (3, 4, 2):
- # Python 3.4, up to and including 3.4.2, had a bug where the
- # SocketType enumeration overwrote the SocketType class imported
- # from _socket. This was fixed in 3.4.3 (http://bugs.python.org/issue20386
- # and https://github.com/python/cpython/commit/0d2f85f38a9691efdfd1e7285c4262cab7f17db7).
- # Prior to that, if we replace SocketType with our own class, the implementation
- # of socket.type breaks with "OSError: [Errno 97] Address family not supported by protocol".
- # Therefore, on these old versions, we must preserve it as an enum; while this
- # seems like it could lead to non-green behaviour, code on those versions
- # cannot possibly be using SocketType as a class anyway.
- SocketType = __socket__.SocketType # pylint:disable=no-member
- # Fixup __all__; note that we get exec'd multiple times during unit tests
- if 'SocketType' in __implements__:
- __implements__.remove('SocketType')
- if 'SocketType' not in __imports__:
- __imports__.append('SocketType')
- else:
- SocketType = socket
- def fromfd(fd, family, type, proto=0):
- """ fromfd(fd, family, type[, proto]) -> socket object
- Create a socket object from a duplicate of the given file
- descriptor. The remaining arguments are the same as for socket().
- """
- nfd = dup(fd)
- return socket(family, type, proto, nfd)
- if hasattr(_socket.socket, "share"):
- def fromshare(info):
- """ fromshare(info) -> socket object
- Create a socket object from a the bytes object returned by
- socket.share(pid).
- """
- return socket(0, 0, 0, info)
- __implements__.append('fromshare')
- if hasattr(_socket, "socketpair"):
- def socketpair(family=None, type=SOCK_STREAM, proto=0):
- """socketpair([family[, type[, proto]]]) -> (socket object, socket object)
- Create a pair of socket objects from the sockets returned by the platform
- socketpair() function.
- The arguments are the same as for socket() except the default family is
- AF_UNIX if defined on the platform; otherwise, the default is AF_INET.
- .. versionchanged:: 1.2
- All Python 3 versions on Windows supply this function (natively
- supplied by Python 3.5 and above).
- """
- if family is None:
- try:
- family = AF_UNIX
- except NameError:
- family = AF_INET
- a, b = _socket.socketpair(family, type, proto)
- a = socket(family, type, proto, a.detach())
- b = socket(family, type, proto, b.detach())
- return a, b
- else:
- # Origin: https://gist.github.com/4325783, by Geert Jansen. Public domain.
- # gevent: taken from 3.6 release. Expected to be used only on Win. Added to Win/3.5
- # gevent: for < 3.5, pass the default value of 128 to lsock.listen()
- # (3.5+ uses this as a default and the original code passed no value)
- _LOCALHOST = '127.0.0.1'
- _LOCALHOST_V6 = '::1'
- def socketpair(family=AF_INET, type=SOCK_STREAM, proto=0):
- if family == AF_INET:
- host = _LOCALHOST
- elif family == AF_INET6:
- host = _LOCALHOST_V6
- else:
- raise ValueError("Only AF_INET and AF_INET6 socket address families "
- "are supported")
- if type != SOCK_STREAM:
- raise ValueError("Only SOCK_STREAM socket type is supported")
- if proto != 0:
- raise ValueError("Only protocol zero is supported")
- # We create a connected TCP socket. Note the trick with
- # setblocking(False) that prevents us from having to create a thread.
- lsock = socket(family, type, proto)
- try:
- lsock.bind((host, 0))
- lsock.listen(128)
- # On IPv6, ignore flow_info and scope_id
- addr, port = lsock.getsockname()[:2]
- csock = socket(family, type, proto)
- try:
- csock.setblocking(False)
- try:
- csock.connect((addr, port))
- except (BlockingIOError, InterruptedError):
- pass
- csock.setblocking(True)
- ssock, _ = lsock.accept()
- except:
- csock.close()
- raise
- finally:
- lsock.close()
- return (ssock, csock)
- if sys.version_info[:2] < (3, 5):
- # Not provided natively
- if 'socketpair' in __implements__:
- # Multiple imports can cause this to be missing if _socketcommon
- # was successfully imported, leading to subsequent imports to cause
- # ValueError
- __implements__.remove('socketpair')
- # PyPy needs drop and reuse
- def _do_reuse_or_drop(sock, methname):
- try:
- method = getattr(sock, methname)
- except (AttributeError, TypeError):
- pass
- else:
- method()
- from io import BytesIO
- class _basefileobject(object):
- """Faux file object attached to a socket object."""
- default_bufsize = 8192
- name = "<socket>"
- __slots__ = ["mode", "bufsize", "softspace",
- # "closed" is a property, see below
- "_sock", "_rbufsize", "_wbufsize", "_rbuf", "_wbuf", "_wbuf_len",
- "_close"]
- def __init__(self, sock, mode='rb', bufsize=-1, close=False):
- _do_reuse_or_drop(sock, '_reuse')
- self._sock = sock
- self.mode = mode # Not actually used in this version
- if bufsize < 0:
- bufsize = self.default_bufsize
- self.bufsize = bufsize
- self.softspace = False
- # _rbufsize is the suggested recv buffer size. It is *strictly*
- # obeyed within readline() for recv calls. If it is larger than
- # default_bufsize it will be used for recv calls within read().
- if bufsize == 0:
- self._rbufsize = 1
- elif bufsize == 1:
- self._rbufsize = self.default_bufsize
- else:
- self._rbufsize = bufsize
- self._wbufsize = bufsize
- # We use BytesIO for the read buffer to avoid holding a list
- # of variously sized string objects which have been known to
- # fragment the heap due to how they are malloc()ed and often
- # realloc()ed down much smaller than their original allocation.
- self._rbuf = BytesIO()
- self._wbuf = [] # A list of strings
- self._wbuf_len = 0
- self._close = close
- def _getclosed(self):
- return self._sock is None
- closed = property(_getclosed, doc="True if the file is closed")
- def close(self):
- try:
- if self._sock:
- self.flush()
- finally:
- s = self._sock
- self._sock = None
- if s is not None:
- if self._close:
- s.close()
- else:
- _do_reuse_or_drop(s, '_drop')
- def __del__(self):
- try:
- self.close()
- except: # pylint:disable=bare-except
- # close() may fail if __init__ didn't complete
- pass
- def flush(self):
- if self._wbuf:
- data = b"".join(self._wbuf)
- self._wbuf = []
- self._wbuf_len = 0
- buffer_size = max(self._rbufsize, self.default_bufsize)
- data_size = len(data)
- write_offset = 0
- view = memoryview(data)
- try:
- while write_offset < data_size:
- self._sock.sendall(view[write_offset:write_offset + buffer_size])
- write_offset += buffer_size
- finally:
- if write_offset < data_size:
- remainder = data[write_offset:]
- del view, data # explicit free
- self._wbuf.append(remainder)
- self._wbuf_len = len(remainder)
- def fileno(self):
- return self._sock.fileno()
- def write(self, data):
- if not isinstance(data, bytes):
- raise TypeError("Non-bytes data")
- if not data:
- return
- self._wbuf.append(data)
- self._wbuf_len += len(data)
- if (self._wbufsize == 0 or (self._wbufsize == 1 and b'\n' in data) or
- (self._wbufsize > 1 and self._wbuf_len >= self._wbufsize)):
- self.flush()
- def writelines(self, list):
- # XXX We could do better here for very long lists
- # XXX Should really reject non-string non-buffers
- lines = filter(None, map(str, list))
- self._wbuf_len += sum(map(len, lines))
- self._wbuf.extend(lines)
- if self._wbufsize <= 1 or self._wbuf_len >= self._wbufsize:
- self.flush()
- def read(self, size=-1):
- # Use max, disallow tiny reads in a loop as they are very inefficient.
- # We never leave read() with any leftover data from a new recv() call
- # in our internal buffer.
- rbufsize = max(self._rbufsize, self.default_bufsize)
- # Our use of BytesIO rather than lists of string objects returned by
- # recv() minimizes memory usage and fragmentation that occurs when
- # rbufsize is large compared to the typical return value of recv().
- buf = self._rbuf
- buf.seek(0, 2) # seek end
- if size < 0:
- # Read until EOF
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- while True:
- try:
- data = self._sock.recv(rbufsize)
- except InterruptedError:
- continue
- if not data:
- break
- buf.write(data)
- return buf.getvalue()
- else:
- # Read until size bytes or EOF seen, whichever comes first
- buf_len = buf.tell()
- if buf_len >= size:
- # Already have size bytes in our buffer? Extract and return.
- buf.seek(0)
- rv = buf.read(size)
- self._rbuf = BytesIO()
- self._rbuf.write(buf.read())
- return rv
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- while True:
- left = size - buf_len
- # recv() will malloc the amount of memory given as its
- # parameter even though it often returns much less data
- # than that. The returned data string is short lived
- # as we copy it into a BytesIO and free it. This avoids
- # fragmentation issues on many platforms.
- try:
- data = self._sock.recv(left)
- except InterruptedError:
- continue
- if not data:
- break
- n = len(data)
- if n == size and not buf_len:
- # Shortcut. Avoid buffer data copies when:
- # - We have no data in our buffer.
- # AND
- # - Our call to recv returned exactly the
- # number of bytes we were asked to read.
- return data
- if n == left:
- buf.write(data)
- del data # explicit free
- break
- assert n <= left, "recv(%d) returned %d bytes" % (left, n)
- buf.write(data)
- buf_len += n
- del data # explicit free
- #assert buf_len == buf.tell()
- return buf.getvalue()
- def readline(self, size=-1):
- # pylint:disable=too-many-return-statements
- buf = self._rbuf
- buf.seek(0, 2) # seek end
- if buf.tell() > 0:
- # check if we already have it in our buffer
- buf.seek(0)
- bline = buf.readline(size)
- if bline.endswith(b'\n') or len(bline) == size:
- self._rbuf = BytesIO()
- self._rbuf.write(buf.read())
- return bline
- del bline
- if size < 0: # pylint:disable=too-many-nested-blocks
- # Read until \n or EOF, whichever comes first
- if self._rbufsize <= 1:
- # Speed up unbuffered case
- buf.seek(0)
- buffers = [buf.read()]
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- data = None
- recv = self._sock.recv
- while True:
- try:
- while data != b"\n":
- data = recv(1)
- if not data:
- break
- buffers.append(data)
- except InterruptedError:
- # The try..except to catch EINTR was moved outside the
- # recv loop to avoid the per byte overhead.
- continue
- break
- return b"".join(buffers)
- buf.seek(0, 2) # seek end
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- while True:
- try:
- data = self._sock.recv(self._rbufsize)
- except InterruptedError:
- continue
- if not data:
- break
- nl = data.find(b'\n')
- if nl >= 0:
- nl += 1
- buf.write(data[:nl])
- self._rbuf.write(data[nl:])
- del data
- break
- buf.write(data)
- return buf.getvalue()
- else:
- # Read until size bytes or \n or EOF seen, whichever comes first
- buf.seek(0, 2) # seek end
- buf_len = buf.tell()
- if buf_len >= size:
- buf.seek(0)
- rv = buf.read(size)
- self._rbuf = BytesIO()
- self._rbuf.write(buf.read())
- return rv
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- while True:
- try:
- data = self._sock.recv(self._rbufsize)
- except InterruptedError:
- continue
- if not data:
- break
- left = size - buf_len
- # did we just receive a newline?
- nl = data.find(b'\n', 0, left)
- if nl >= 0:
- nl += 1
- # save the excess data to _rbuf
- self._rbuf.write(data[nl:])
- if buf_len:
- buf.write(data[:nl])
- break
- else:
- # Shortcut. Avoid data copy through buf when returning
- # a substring of our first recv().
- return data[:nl]
- n = len(data)
- if n == size and not buf_len:
- # Shortcut. Avoid data copy through buf when
- # returning exactly all of our first recv().
- return data
- if n >= left:
- buf.write(data[:left])
- self._rbuf.write(data[left:])
- break
- buf.write(data)
- buf_len += n
- #assert buf_len == buf.tell()
- return buf.getvalue()
- def readlines(self, sizehint=0):
- total = 0
- list = []
- while True:
- line = self.readline()
- if not line:
- break
- list.append(line)
- total += len(line)
- if sizehint and total >= sizehint:
- break
- return list
- # Iterator protocols
- def __iter__(self):
- return self
- def next(self):
- line = self.readline()
- if not line:
- raise StopIteration
- return line
- __next__ = next
- try:
- from gevent.fileobject import FileObjectPosix
- except ImportError:
- # Manual implementation
- _fileobject = _basefileobject
- else:
- class _fileobject(FileObjectPosix):
- # Add the proper drop/reuse support for pypy, and match
- # the close=False default in the constructor
- def __init__(self, sock, mode='rb', bufsize=-1, close=False):
- _do_reuse_or_drop(sock, '_reuse')
- self._sock = sock
- FileObjectPosix.__init__(self, sock, mode, bufsize, close)
- def close(self):
- try:
- if self._sock:
- self.flush()
- finally:
- s = self._sock
- self._sock = None
- if s is not None:
- if self._close:
- FileObjectPosix.close(self)
- else:
- _do_reuse_or_drop(s, '_drop')
- def __del__(self):
- try:
- self.close()
- except: # pylint:disable=bare-except
- # close() may fail if __init__ didn't complete
- pass
- __all__ = __implements__ + __extensions__ + __imports__
|