123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- # -*- coding: utf-8 -*-
- """
- hyper/http20/bufsocket.py
- ~~~~~~~~~~~~~~~~~~~~~~~~~
- This file implements a buffered socket wrapper.
- The purpose of this is to avoid the overhead of unnecessary syscalls while
- allowing small reads from the network. This represents a potentially massive
- performance optimisation at the cost of burning some memory in the userspace
- process.
- """
- import select
- from .exceptions import ConnectionResetError, LineTooLongError
- class BufferedSocket(object):
- """
- A buffered socket wrapper.
- The purpose of this is to avoid the overhead of unnecessary syscalls while
- allowing small reads from the network. This represents a potentially
- massive performance optimisation at the cost of burning some memory in the
- userspace process.
- """
- def __init__(self, sck, buffer_size=1000):
- """
- Create the buffered socket.
- :param sck: The socket to wrap.
- :param buffer_size: The size of the backing buffer in bytes. This
- parameter should be set to an appropriate value for your use case.
- Small values of ``buffer_size`` increase the overhead of buffer
- management: large values cause more memory to be used.
- """
- # The wrapped socket.
- self._sck = sck
- # The buffer we're using.
- self._backing_buffer = bytearray(buffer_size)
- self._buffer_view = memoryview(self._backing_buffer)
- # The size of the buffer.
- self._buffer_size = buffer_size
- # The start index in the memory view.
- self._index = 0
- # The number of bytes in the buffer.
- self._bytes_in_buffer = 0
- @property
- def _remaining_capacity(self):
- """
- The maximum number of bytes the buffer could still contain.
- """
- return self._buffer_size - self._index
- @property
- def _buffer_end(self):
- """
- The index of the first free byte in the buffer.
- """
- return self._index + self._bytes_in_buffer
- @property
- def can_read(self):
- """
- Whether or not there is more data to read from the socket.
- """
- read = select.select([self._sck], [], [], 0)[0]
- if read:
- return True
- return False
- @property
- def buffer(self):
- """
- Get access to the buffer itself.
- """
- return self._buffer_view[self._index:self._buffer_end]
- def advance_buffer(self, count):
- """
- Advances the buffer by the amount of data consumed outside the socket.
- """
- self._index += count
- self._bytes_in_buffer -= count
- def new_buffer(self):
- """
- This method moves all the data in the backing buffer to the start of
- a new, fresh buffer. This gives the ability to read much more data.
- """
- def read_all_from_buffer():
- end = self._index + self._bytes_in_buffer
- return self._buffer_view[self._index:end]
- new_buffer = bytearray(self._buffer_size)
- new_buffer_view = memoryview(new_buffer)
- new_buffer_view[0:self._bytes_in_buffer] = read_all_from_buffer()
- self._index = 0
- self._backing_buffer = new_buffer
- self._buffer_view = new_buffer_view
- return
- def recv(self, amt):
- """
- Read some data from the socket.
- :param amt: The amount of data to read.
- :returns: A ``memoryview`` object containing the appropriate number of
- bytes. The data *must* be copied out by the caller before the next
- call to this function.
- """
- # In this implementation you can never read more than the number of
- # bytes in the buffer.
- if amt > self._buffer_size:
- amt = self._buffer_size
- # If the amount of data we've been asked to read is less than the
- # remaining space in the buffer, we need to clear out the buffer and
- # start over.
- if amt > self._remaining_capacity:
- self.new_buffer()
- # If there's still some room in the buffer, opportunistically attempt
- # to read into it.
- # If we don't actually _need_ the data (i.e. there's enough in the
- # buffer to satisfy the request), use select to work out if the read
- # attempt will block. If it will, don't bother reading. If we need the
- # data, always do the read.
- if self._bytes_in_buffer >= amt:
- should_read = select.select([self._sck], [], [], 0)[0]
- else:
- should_read = True
- if (self._remaining_capacity > self._bytes_in_buffer and should_read):
- count = self._sck.recv_into(self._buffer_view[self._buffer_end:])
- # The socket just got closed. We should throw an exception if we
- # were asked for more data than we can return.
- if not count and amt > self._bytes_in_buffer:
- raise ConnectionResetError()
- self._bytes_in_buffer += count
- # Read out the bytes and update the index.
- amt = min(amt, self._bytes_in_buffer)
- data = self._buffer_view[self._index:self._index+amt]
- self._index += amt
- self._bytes_in_buffer -= amt
- return data
- def fill(self):
- """
- Attempts to fill the buffer as much as possible. It will block for at
- most the time required to have *one* ``recv_into`` call return.
- """
- if not self._remaining_capacity:
- self.new_buffer()
- count = self._sck.recv_into(self._buffer_view[self._buffer_end:])
- if not count:
- raise ConnectionResetError()
- self._bytes_in_buffer += count
- return
- def readline(self):
- """
- Read up to a newline from the network and returns it. The implicit
- maximum line length is the buffer size of the buffered socket.
- Note that, unlike recv, this method absolutely *does* block until it
- can read the line.
- :returns: A ``memoryview`` object containing the appropriate number of
- bytes. The data *must* be copied out by the caller before the next
- call to this function.
- """
- # First, check if there's anything in the buffer. This is one of those
- # rare circumstances where this will work correctly on all platforms.
- index = self._backing_buffer.find(
- b'\n',
- self._index,
- self._index + self._bytes_in_buffer
- )
- if index != -1:
- length = index + 1 - self._index
- data = self._buffer_view[self._index:self._index+length]
- self._index += length
- self._bytes_in_buffer -= length
- return data
- # In this case, we didn't find a newline in the buffer. To fix that,
- # read some data into the buffer. To do our best to satisfy the read,
- # we should shunt the data down in the buffer so that it's right at
- # the start. We don't bother if we're already at the start of the
- # buffer.
- if self._index != 0:
- self.new_buffer()
- while self._bytes_in_buffer < self._buffer_size:
- count = self._sck.recv_into(self._buffer_view[self._buffer_end:])
- if not count:
- raise ConnectionResetError()
- # We have some more data. Again, look for a newline in that gap.
- first_new_byte = self._buffer_end
- self._bytes_in_buffer += count
- index = self._backing_buffer.find(
- b'\n',
- first_new_byte,
- first_new_byte + count,
- )
- if index != -1:
- # The length of the buffer is the index into the
- # buffer at which we found the newline plus 1, minus the start
- # index of the buffer, which really should be zero.
- assert not self._index
- length = index + 1
- data = self._buffer_view[:length]
- self._index += length
- self._bytes_in_buffer -= length
- return data
- # If we got here, it means we filled the buffer without ever getting
- # a newline. Time to throw an exception.
- raise LineTooLongError()
- def __getattr__(self, name):
- return getattr(self._sck, name)
|