bufsocket.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # -*- coding: utf-8 -*-
  2. """
  3. hyper/http20/bufsocket.py
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~
  5. This file implements a buffered socket wrapper.
  6. The purpose of this is to avoid the overhead of unnecessary syscalls while
  7. allowing small reads from the network. This represents a potentially massive
  8. performance optimisation at the cost of burning some memory in the userspace
  9. process.
  10. """
  11. import select
  12. from .exceptions import ConnectionResetError, LineTooLongError
  13. class BufferedSocket(object):
  14. """
  15. A buffered socket wrapper.
  16. The purpose of this is to avoid the overhead of unnecessary syscalls while
  17. allowing small reads from the network. This represents a potentially
  18. massive performance optimisation at the cost of burning some memory in the
  19. userspace process.
  20. """
  21. def __init__(self, sck, buffer_size=1000):
  22. """
  23. Create the buffered socket.
  24. :param sck: The socket to wrap.
  25. :param buffer_size: The size of the backing buffer in bytes. This
  26. parameter should be set to an appropriate value for your use case.
  27. Small values of ``buffer_size`` increase the overhead of buffer
  28. management: large values cause more memory to be used.
  29. """
  30. # The wrapped socket.
  31. self._sck = sck
  32. # The buffer we're using.
  33. self._backing_buffer = bytearray(buffer_size)
  34. self._buffer_view = memoryview(self._backing_buffer)
  35. # The size of the buffer.
  36. self._buffer_size = buffer_size
  37. # The start index in the memory view.
  38. self._index = 0
  39. # The number of bytes in the buffer.
  40. self._bytes_in_buffer = 0
  41. @property
  42. def _remaining_capacity(self):
  43. """
  44. The maximum number of bytes the buffer could still contain.
  45. """
  46. return self._buffer_size - self._index
  47. @property
  48. def _buffer_end(self):
  49. """
  50. The index of the first free byte in the buffer.
  51. """
  52. return self._index + self._bytes_in_buffer
  53. @property
  54. def can_read(self):
  55. """
  56. Whether or not there is more data to read from the socket.
  57. """
  58. read = select.select([self._sck], [], [], 0)[0]
  59. if read:
  60. return True
  61. return False
  62. @property
  63. def buffer(self):
  64. """
  65. Get access to the buffer itself.
  66. """
  67. return self._buffer_view[self._index:self._buffer_end]
  68. def advance_buffer(self, count):
  69. """
  70. Advances the buffer by the amount of data consumed outside the socket.
  71. """
  72. self._index += count
  73. self._bytes_in_buffer -= count
  74. def new_buffer(self):
  75. """
  76. This method moves all the data in the backing buffer to the start of
  77. a new, fresh buffer. This gives the ability to read much more data.
  78. """
  79. def read_all_from_buffer():
  80. end = self._index + self._bytes_in_buffer
  81. return self._buffer_view[self._index:end]
  82. new_buffer = bytearray(self._buffer_size)
  83. new_buffer_view = memoryview(new_buffer)
  84. new_buffer_view[0:self._bytes_in_buffer] = read_all_from_buffer()
  85. self._index = 0
  86. self._backing_buffer = new_buffer
  87. self._buffer_view = new_buffer_view
  88. return
  89. def recv(self, amt):
  90. """
  91. Read some data from the socket.
  92. :param amt: The amount of data to read.
  93. :returns: A ``memoryview`` object containing the appropriate number of
  94. bytes. The data *must* be copied out by the caller before the next
  95. call to this function.
  96. """
  97. # In this implementation you can never read more than the number of
  98. # bytes in the buffer.
  99. if amt > self._buffer_size:
  100. amt = self._buffer_size
  101. # If the amount of data we've been asked to read is less than the
  102. # remaining space in the buffer, we need to clear out the buffer and
  103. # start over.
  104. if amt > self._remaining_capacity:
  105. self.new_buffer()
  106. # If there's still some room in the buffer, opportunistically attempt
  107. # to read into it.
  108. # If we don't actually _need_ the data (i.e. there's enough in the
  109. # buffer to satisfy the request), use select to work out if the read
  110. # attempt will block. If it will, don't bother reading. If we need the
  111. # data, always do the read.
  112. if self._bytes_in_buffer >= amt:
  113. should_read = select.select([self._sck], [], [], 0)[0]
  114. else:
  115. should_read = True
  116. if (self._remaining_capacity > self._bytes_in_buffer and should_read):
  117. count = self._sck.recv_into(self._buffer_view[self._buffer_end:])
  118. # The socket just got closed. We should throw an exception if we
  119. # were asked for more data than we can return.
  120. if not count and amt > self._bytes_in_buffer:
  121. raise ConnectionResetError()
  122. self._bytes_in_buffer += count
  123. # Read out the bytes and update the index.
  124. amt = min(amt, self._bytes_in_buffer)
  125. data = self._buffer_view[self._index:self._index+amt]
  126. self._index += amt
  127. self._bytes_in_buffer -= amt
  128. return data
  129. def fill(self):
  130. """
  131. Attempts to fill the buffer as much as possible. It will block for at
  132. most the time required to have *one* ``recv_into`` call return.
  133. """
  134. if not self._remaining_capacity:
  135. self.new_buffer()
  136. count = self._sck.recv_into(self._buffer_view[self._buffer_end:])
  137. if not count:
  138. raise ConnectionResetError()
  139. self._bytes_in_buffer += count
  140. return
  141. def readline(self):
  142. """
  143. Read up to a newline from the network and returns it. The implicit
  144. maximum line length is the buffer size of the buffered socket.
  145. Note that, unlike recv, this method absolutely *does* block until it
  146. can read the line.
  147. :returns: A ``memoryview`` object containing the appropriate number of
  148. bytes. The data *must* be copied out by the caller before the next
  149. call to this function.
  150. """
  151. # First, check if there's anything in the buffer. This is one of those
  152. # rare circumstances where this will work correctly on all platforms.
  153. index = self._backing_buffer.find(
  154. b'\n',
  155. self._index,
  156. self._index + self._bytes_in_buffer
  157. )
  158. if index != -1:
  159. length = index + 1 - self._index
  160. data = self._buffer_view[self._index:self._index+length]
  161. self._index += length
  162. self._bytes_in_buffer -= length
  163. return data
  164. # In this case, we didn't find a newline in the buffer. To fix that,
  165. # read some data into the buffer. To do our best to satisfy the read,
  166. # we should shunt the data down in the buffer so that it's right at
  167. # the start. We don't bother if we're already at the start of the
  168. # buffer.
  169. if self._index != 0:
  170. self.new_buffer()
  171. while self._bytes_in_buffer < self._buffer_size:
  172. count = self._sck.recv_into(self._buffer_view[self._buffer_end:])
  173. if not count:
  174. raise ConnectionResetError()
  175. # We have some more data. Again, look for a newline in that gap.
  176. first_new_byte = self._buffer_end
  177. self._bytes_in_buffer += count
  178. index = self._backing_buffer.find(
  179. b'\n',
  180. first_new_byte,
  181. first_new_byte + count,
  182. )
  183. if index != -1:
  184. # The length of the buffer is the index into the
  185. # buffer at which we found the newline plus 1, minus the start
  186. # index of the buffer, which really should be zero.
  187. assert not self._index
  188. length = index + 1
  189. data = self._buffer_view[:length]
  190. self._index += length
  191. self._bytes_in_buffer -= length
  192. return data
  193. # If we got here, it means we filled the buffer without ever getting
  194. # a newline. Time to throw an exception.
  195. raise LineTooLongError()
  196. def __getattr__(self, name):
  197. return getattr(self._sck, name)