nbio_interface.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. """Non-blocking I/O interface for pika connection adapters.
  2. I/O interface expected by `pika.adapters.base_connection.BaseConnection`
  3. NOTE: This API is modeled after asyncio in python3 for a couple of reasons
  4. 1. It's a sensible API
  5. 2. To make it easy to implement at least on top of the built-in asyncio
  6. Furthermore, the API caters to the needs of pika core and lack of generalization
  7. is intentional for the sake of reducing complexity of the implementation and
  8. testing and lessening the maintenance burden.
  9. """
  10. import abc
  11. import pika.compat
  12. class AbstractIOServices(pika.compat.AbstractBase):
  13. """Interface to I/O services required by `pika.adapters.BaseConnection` and
  14. related utilities.
  15. NOTE: This is not a public API. Pika users should rely on the native I/O
  16. loop APIs (e.g., asyncio event loop, tornado ioloop, twisted reactor, etc.)
  17. that corresponds to the chosen Connection adapter.
  18. """
  19. @abc.abstractmethod
  20. def get_native_ioloop(self):
  21. """Returns the native I/O loop instance, such as Twisted reactor,
  22. asyncio's or tornado's event loop
  23. """
  24. raise NotImplementedError
  25. @abc.abstractmethod
  26. def close(self):
  27. """Release IOLoop's resources.
  28. the `close()` method is intended to be called by Pika's own test
  29. code only after `start()` returns. After calling `close()`, no other
  30. interaction with the closed instance of `IOLoop` should be performed.
  31. NOTE: This method is provided for Pika's own test scripts that need to
  32. be able to run I/O loops generically to test multiple Connection Adapter
  33. implementations. Pika users should use the native I/O loop's API
  34. instead.
  35. """
  36. raise NotImplementedError
  37. @abc.abstractmethod
  38. def run(self):
  39. """Run the I/O loop. It will loop until requested to exit. See `stop()`.
  40. NOTE: the outcome or restarting an instance that had been stopped is
  41. UNDEFINED!
  42. NOTE: This method is provided for Pika's own test scripts that need to
  43. be able to run I/O loops generically to test multiple Connection Adapter
  44. implementations (not all of the supported I/O Loop frameworks have
  45. methods named start/stop). Pika users should use the native I/O loop's
  46. API instead.
  47. """
  48. raise NotImplementedError
  49. @abc.abstractmethod
  50. def stop(self):
  51. """Request exit from the ioloop. The loop is NOT guaranteed to
  52. stop before this method returns.
  53. NOTE: The outcome of calling `stop()` on a non-running instance is
  54. UNDEFINED!
  55. NOTE: This method is provided for Pika's own test scripts that need to
  56. be able to run I/O loops generically to test multiple Connection Adapter
  57. implementations (not all of the supported I/O Loop frameworks have
  58. methods named start/stop). Pika users should use the native I/O loop's
  59. API instead.
  60. To invoke `stop()` safely from a thread other than this IOLoop's thread,
  61. call it via `add_callback_threadsafe`; e.g.,
  62. `ioloop.add_callback_threadsafe(ioloop.stop)`
  63. """
  64. raise NotImplementedError
  65. @abc.abstractmethod
  66. def add_callback_threadsafe(self, callback):
  67. """Requests a call to the given function as soon as possible. It will be
  68. called from this IOLoop's thread.
  69. NOTE: This is the only thread-safe method offered by the IOLoop adapter.
  70. All other manipulations of the IOLoop adapter and objects governed
  71. by it must be performed from the IOLoop's thread.
  72. NOTE: if you know that the requester is running on the same thread as
  73. the connection it is more efficient to use the
  74. `ioloop.call_later()` method with a delay of 0.
  75. :param callable callback: The callback method; must be callable.
  76. """
  77. raise NotImplementedError
  78. @abc.abstractmethod
  79. def call_later(self, delay, callback):
  80. """Add the callback to the IOLoop timer to be called after delay seconds
  81. from the time of call on best-effort basis. Returns a handle to the
  82. timeout.
  83. If two are scheduled for the same time, it's undefined which one will
  84. be called first.
  85. :param float delay: The number of seconds to wait to call callback
  86. :param callable callback: The callback method
  87. :returns: A handle that can be used to cancel the request.
  88. :rtype: AbstractTimerReference
  89. """
  90. raise NotImplementedError
  91. @abc.abstractmethod
  92. def getaddrinfo(self,
  93. host,
  94. port,
  95. on_done,
  96. family=0,
  97. socktype=0,
  98. proto=0,
  99. flags=0):
  100. """Perform the equivalent of `socket.getaddrinfo()` asynchronously.
  101. See `socket.getaddrinfo()` for the standard args.
  102. :param callable on_done: user callback that takes the return value of
  103. `socket.getaddrinfo()` upon successful completion or exception upon
  104. failure (check for `BaseException`) as its only arg. It will not be
  105. called if the operation was cancelled.
  106. :rtype: AbstractIOReference
  107. """
  108. raise NotImplementedError
  109. @abc.abstractmethod
  110. def connect_socket(self, sock, resolved_addr, on_done):
  111. """Perform the equivalent of `socket.connect()` on a previously-resolved
  112. address asynchronously.
  113. IMPLEMENTATION NOTE: Pika's connection logic resolves the addresses
  114. prior to making socket connections, so we don't need to burden the
  115. implementations of this method with the extra logic of asynchronous
  116. DNS resolution. Implementations can use `socket.inet_pton()` to
  117. verify the address.
  118. :param socket.socket sock: non-blocking socket that needs to be
  119. connected via `socket.socket.connect()`
  120. :param tuple resolved_addr: resolved destination address/port two-tuple
  121. as per `socket.socket.connect()`, except that the first element must
  122. be an actual IP address that's consistent with the given socket's
  123. address family.
  124. :param callable on_done: user callback that takes None upon successful
  125. completion or exception (check for `BaseException`) upon error as
  126. its only arg. It will not be called if the operation was cancelled.
  127. :rtype: AbstractIOReference
  128. :raises ValueError: if host portion of `resolved_addr` is not an IP
  129. address or is inconsistent with the socket's address family as
  130. validated via `socket.inet_pton()`
  131. """
  132. raise NotImplementedError
  133. @abc.abstractmethod
  134. def create_streaming_connection(self,
  135. protocol_factory,
  136. sock,
  137. on_done,
  138. ssl_context=None,
  139. server_hostname=None):
  140. """Perform SSL session establishment, if requested, on the already-
  141. connected socket and link the streaming transport/protocol pair.
  142. NOTE: This method takes ownership of the socket.
  143. :param callable protocol_factory: called without args, returns an
  144. instance with the `AbstractStreamProtocol` interface. The protocol's
  145. `connection_made(transport)` method will be called to link it to
  146. the transport after remaining connection activity (e.g., SSL session
  147. establishment), if any, is completed successfully.
  148. :param socket.socket sock: Already-connected, non-blocking
  149. `socket.SOCK_STREAM` socket to be used by the transport. We take
  150. ownership of this socket.
  151. :param callable on_done: User callback
  152. `on_done(BaseException | (transport, protocol))` to be notified when
  153. the asynchronous operation completes. An exception arg indicates
  154. failure (check for `BaseException`); otherwise the two-tuple will
  155. contain the linked transport/protocol pair having
  156. AbstractStreamTransport and AbstractStreamProtocol interfaces
  157. respectively.
  158. :param None | ssl.SSLContext ssl_context: if None, this will proceed as
  159. a plaintext connection; otherwise, if not None, SSL session
  160. establishment will be performed prior to linking the transport and
  161. protocol.
  162. :param str | None server_hostname: For use during SSL session
  163. establishment to match against the target server's certificate. The
  164. value `None` disables this check (which is a huge security risk)
  165. :rtype: AbstractIOReference
  166. """
  167. raise NotImplementedError
  168. class AbstractFileDescriptorServices(pika.compat.AbstractBase):
  169. """Interface definition of common non-blocking file descriptor services
  170. required by some utility implementations.
  171. NOTE: This is not a public API. Pika users should rely on the native I/O
  172. loop APIs (e.g., asyncio event loop, tornado ioloop, twisted reactor, etc.)
  173. that corresponds to the chosen Connection adapter.
  174. """
  175. @abc.abstractmethod
  176. def set_reader(self, fd, on_readable):
  177. """Call the given callback when the file descriptor is readable.
  178. Replace prior reader, if any, for the given file descriptor.
  179. :param fd: file descriptor
  180. :param callable on_readable: a callback taking no args to be notified
  181. when fd becomes readable.
  182. """
  183. raise NotImplementedError
  184. @abc.abstractmethod
  185. def remove_reader(self, fd):
  186. """Stop watching the given file descriptor for readability
  187. :param fd: file descriptor
  188. :returns: True if reader was removed; False if none was registered.
  189. :rtype: bool
  190. """
  191. raise NotImplementedError
  192. @abc.abstractmethod
  193. def set_writer(self, fd, on_writable):
  194. """Call the given callback whenever the file descriptor is writable.
  195. Replace prior writer callback, if any, for the given file descriptor.
  196. IMPLEMENTATION NOTE: For portability, implementations of
  197. `set_writable()` should also watch for indication of error on the
  198. socket and treat it as equivalent to the writable indication (e.g.,
  199. also adding the socket to the `exceptfds` arg of `socket.select()`
  200. and calling the `on_writable` callback if `select.select()`
  201. indicates that the socket is in error state). Specifically, Windows
  202. (unlike POSIX) only indicates error on the socket (but not writable)
  203. when connection establishment fails.
  204. :param fd: file descriptor
  205. :param callable on_writable: a callback taking no args to be notified
  206. when fd becomes writable.
  207. """
  208. raise NotImplementedError
  209. @abc.abstractmethod
  210. def remove_writer(self, fd):
  211. """Stop watching the given file descriptor for writability
  212. :param fd: file descriptor
  213. :returns: True if reader was removed; False if none was registered.
  214. :rtype: bool
  215. """
  216. raise NotImplementedError
  217. class AbstractTimerReference(pika.compat.AbstractBase):
  218. """Reference to asynchronous operation"""
  219. @abc.abstractmethod
  220. def cancel(self):
  221. """Cancel callback. If already cancelled, has no affect.
  222. """
  223. raise NotImplementedError
  224. class AbstractIOReference(pika.compat.AbstractBase):
  225. """Reference to asynchronous I/O operation"""
  226. @abc.abstractmethod
  227. def cancel(self):
  228. """Cancel pending operation
  229. :returns: False if was already done or cancelled; True otherwise
  230. :rtype: bool
  231. """
  232. raise NotImplementedError
  233. class AbstractStreamProtocol(pika.compat.AbstractBase):
  234. """Stream protocol interface. It's compatible with a subset of
  235. `asyncio.protocols.Protocol` for compatibility with asyncio-based
  236. `AbstractIOServices` implementation.
  237. """
  238. @abc.abstractmethod
  239. def connection_made(self, transport):
  240. """Introduces transport to protocol after transport is connected.
  241. :param AbstractStreamTransport transport:
  242. :raises Exception: Exception-based exception on error
  243. """
  244. raise NotImplementedError
  245. @abc.abstractmethod
  246. def connection_lost(self, error):
  247. """Called upon loss or closing of connection.
  248. NOTE: `connection_made()` and `connection_lost()` are each called just
  249. once and in that order. All other callbacks are called between them.
  250. :param BaseException | None error: An exception (check for
  251. `BaseException`) indicates connection failure. None indicates that
  252. connection was closed on this side, such as when it's aborted or
  253. when `AbstractStreamProtocol.eof_received()` returns a result that
  254. doesn't evaluate to True.
  255. :raises Exception: Exception-based exception on error
  256. """
  257. raise NotImplementedError
  258. @abc.abstractmethod
  259. def eof_received(self):
  260. """Called after the remote peer shuts its write end of the connection.
  261. :returns: A falsy value (including None) will cause the transport to
  262. close itself, resulting in an eventual `connection_lost()` call
  263. from the transport. If a truthy value is returned, it will be the
  264. protocol's responsibility to close/abort the transport.
  265. :rtype: falsy|truthy
  266. :raises Exception: Exception-based exception on error
  267. """
  268. raise NotImplementedError
  269. @abc.abstractmethod
  270. def data_received(self, data):
  271. """Called to deliver incoming data to the protocol.
  272. :param data: Non-empty data bytes.
  273. :raises Exception: Exception-based exception on error
  274. """
  275. raise NotImplementedError
  276. # pylint: disable=W0511
  277. # TODO Undecided whether we need write flow-control yet, although it seems
  278. # like a good idea.
  279. # @abc.abstractmethod
  280. # def pause_writing(self):
  281. # """Called when the transport's write buffer size becomes greater than or
  282. # equal to the transport's high-water mark. It won't be called again until
  283. # the transport's write buffer gets back to its low-water mark and then
  284. # returns to/past the hight-water mark again.
  285. # """
  286. # raise NotImplementedError
  287. #
  288. # @abc.abstractmethod
  289. # def resume_writing(self):
  290. # """Called when the transport's write buffer size becomes less than or
  291. # equal to the transport's low-water mark.
  292. # """
  293. # raise NotImplementedError
  294. class AbstractStreamTransport(pika.compat.AbstractBase):
  295. """Stream transport interface. It's compatible with a subset of
  296. `asyncio.transports.Transport` for compatibility with asyncio-based
  297. `AbstractIOServices` implementation.
  298. """
  299. @abc.abstractmethod
  300. def abort(self):
  301. """Close connection abruptly without waiting for pending I/O to
  302. complete. Will invoke the corresponding protocol's `connection_lost()`
  303. method asynchronously (not in context of the abort() call).
  304. :raises Exception: Exception-based exception on error
  305. """
  306. raise NotImplementedError
  307. @abc.abstractmethod
  308. def get_protocol(self):
  309. """Return the protocol linked to this transport.
  310. :rtype: AbstractStreamProtocol
  311. :raises Exception: Exception-based exception on error
  312. """
  313. raise NotImplementedError
  314. @abc.abstractmethod
  315. def write(self, data):
  316. """Buffer the given data until it can be sent asynchronously.
  317. :param bytes data:
  318. :raises ValueError: if called with empty data
  319. :raises Exception: Exception-based exception on error
  320. """
  321. raise NotImplementedError
  322. @abc.abstractmethod
  323. def get_write_buffer_size(self):
  324. """
  325. :returns: Current size of output data buffered by the transport
  326. :rtype: int
  327. """
  328. raise NotImplementedError
  329. # pylint: disable=W0511
  330. # TODO Udecided whether we need write flow-control yet, although it seems
  331. # like a good idea.
  332. # @abc.abstractmethod
  333. # def set_write_buffer_limits(self, high, low):
  334. # """Set thresholds for calling the protocol's `pause_writing()`
  335. # and `resume_writing()` methods. `low` must be less than or equal to
  336. # `high`.
  337. #
  338. # NOTE The unintuitive order of the args is preserved to match the
  339. # corresponding method in `asyncio.WriteTransport`. I would expect `low`
  340. # to be the first arg, especially since
  341. # `asyncio.WriteTransport.get_write_buffer_limits()` returns them in the
  342. # opposite order. This seems error-prone.
  343. #
  344. # See `asyncio.WriteTransport.get_write_buffer_limits()` for more details
  345. # about the args.
  346. #
  347. # :param int high: non-negative high-water mark.
  348. # :param int low: non-negative low-water mark.
  349. # """
  350. # raise NotImplementedError