selector_ioloop_adapter.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. """
  2. Implementation of `nbio_interface.AbstractIOServices` on top of a
  3. selector-based I/O loop, such as tornado's and our home-grown
  4. select_connection's I/O loops.
  5. """
  6. import abc
  7. import logging
  8. import socket
  9. import threading
  10. from pika.adapters.utils import nbio_interface, io_services_utils
  11. from pika.adapters.utils.io_services_utils import (check_callback_arg,
  12. check_fd_arg)
  13. LOGGER = logging.getLogger(__name__)
  14. class AbstractSelectorIOLoop(object):
  15. """Selector-based I/O loop interface expected by
  16. `selector_ioloop_adapter.SelectorIOServicesAdapter`
  17. NOTE: this interface follows the corresponding methods and attributes
  18. of `tornado.ioloop.IOLoop` in order to avoid additional adapter layering
  19. when wrapping tornado's IOLoop.
  20. """
  21. @property
  22. @abc.abstractmethod
  23. def READ(self): # pylint: disable=C0103
  24. """The value of the I/O loop's READ flag; READ/WRITE/ERROR may be used
  25. with bitwise operators as expected.
  26. Implementation note: the implementations can simply replace these
  27. READ/WRITE/ERROR properties with class-level attributes
  28. """
  29. @property
  30. @abc.abstractmethod
  31. def WRITE(self): # pylint: disable=C0103
  32. """The value of the I/O loop's WRITE flag; READ/WRITE/ERROR may be used
  33. with bitwise operators as expected
  34. """
  35. @property
  36. @abc.abstractmethod
  37. def ERROR(self): # pylint: disable=C0103
  38. """The value of the I/O loop's ERROR flag; READ/WRITE/ERROR may be used
  39. with bitwise operators as expected
  40. """
  41. @abc.abstractmethod
  42. def close(self):
  43. """Release IOLoop's resources.
  44. the `close()` method is intended to be called by the application or test
  45. code only after `start()` returns. After calling `close()`, no other
  46. interaction with the closed instance of `IOLoop` should be performed.
  47. """
  48. @abc.abstractmethod
  49. def start(self):
  50. """Run the I/O loop. It will loop until requested to exit. See `stop()`.
  51. """
  52. @abc.abstractmethod
  53. def stop(self):
  54. """Request exit from the ioloop. The loop is NOT guaranteed to
  55. stop before this method returns.
  56. To invoke `stop()` safely from a thread other than this IOLoop's thread,
  57. call it via `add_callback_threadsafe`; e.g.,
  58. `ioloop.add_callback(ioloop.stop)`
  59. """
  60. @abc.abstractmethod
  61. def call_later(self, delay, callback):
  62. """Add the callback to the IOLoop timer to be called after delay seconds
  63. from the time of call on best-effort basis. Returns a handle to the
  64. timeout.
  65. :param float delay: The number of seconds to wait to call callback
  66. :param callable callback: The callback method
  67. :returns: handle to the created timeout that may be passed to
  68. `remove_timeout()`
  69. :rtype: object
  70. """
  71. @abc.abstractmethod
  72. def remove_timeout(self, timeout_handle):
  73. """Remove a timeout
  74. :param timeout_handle: Handle of timeout to remove
  75. """
  76. @abc.abstractmethod
  77. def add_callback(self, callback):
  78. """Requests a call to the given function as soon as possible in the
  79. context of this IOLoop's thread.
  80. NOTE: This is the only thread-safe method in IOLoop. All other
  81. manipulations of IOLoop must be performed from the IOLoop's thread.
  82. For example, a thread may request a call to the `stop` method of an
  83. ioloop that is running in a different thread via
  84. `ioloop.add_callback_threadsafe(ioloop.stop)`
  85. :param callable callback: The callback method
  86. """
  87. @abc.abstractmethod
  88. def add_handler(self, fd, handler, events):
  89. """Start watching the given file descriptor for events
  90. :param int fd: The file descriptor
  91. :param callable handler: When requested event(s) occur,
  92. `handler(fd, events)` will be called.
  93. :param int events: The event mask using READ, WRITE, ERROR.
  94. """
  95. @abc.abstractmethod
  96. def update_handler(self, fd, events):
  97. """Changes the events we watch for
  98. :param int fd: The file descriptor
  99. :param int events: The event mask using READ, WRITE, ERROR
  100. """
  101. @abc.abstractmethod
  102. def remove_handler(self, fd):
  103. """Stop watching the given file descriptor for events
  104. :param int fd: The file descriptor
  105. """
  106. class SelectorIOServicesAdapter(io_services_utils.SocketConnectionMixin,
  107. io_services_utils.StreamingConnectionMixin,
  108. nbio_interface.AbstractIOServices,
  109. nbio_interface.AbstractFileDescriptorServices):
  110. """Implements the
  111. :py:class:`.nbio_interface.AbstractIOServices` interface
  112. on top of selector-style native loop having the
  113. :py:class:`AbstractSelectorIOLoop` interface, such as
  114. :py:class:`pika.selection_connection.IOLoop` and :py:class:`tornado.IOLoop`.
  115. NOTE:
  116. :py:class:`.nbio_interface.AbstractFileDescriptorServices`
  117. interface is only required by the mixins.
  118. """
  119. def __init__(self, native_loop):
  120. """
  121. :param AbstractSelectorIOLoop native_loop: An instance compatible with
  122. the `AbstractSelectorIOLoop` interface, but not necessarily derived
  123. from it.
  124. """
  125. self._loop = native_loop
  126. # Active watchers: maps file descriptors to `_FileDescriptorCallbacks`
  127. self._watchers = dict()
  128. # Native loop-specific event masks of interest
  129. self._readable_mask = self._loop.READ
  130. # NOTE: tying ERROR to WRITE is particularly handy for Windows, whose
  131. # `select.select()` differs from Posix by reporting
  132. # connection-establishment failure only through exceptfds (ERROR event),
  133. # while the typical application workflow is to wait for the socket to
  134. # become writable when waiting for socket connection to be established.
  135. self._writable_mask = self._loop.WRITE | self._loop.ERROR
  136. def get_native_ioloop(self):
  137. """Implement
  138. :py:meth:`.nbio_interface.AbstractIOServices.get_native_ioloop()`.
  139. """
  140. return self._loop
  141. def close(self):
  142. """Implement :py:meth:`.nbio_interface.AbstractIOServices.close()`.
  143. """
  144. self._loop.close()
  145. def run(self):
  146. """Implement :py:meth:`.nbio_interface.AbstractIOServices.run()`.
  147. """
  148. self._loop.start()
  149. def stop(self):
  150. """Implement :py:meth:`.nbio_interface.AbstractIOServices.stop()`.
  151. """
  152. self._loop.stop()
  153. def add_callback_threadsafe(self, callback):
  154. """Implement
  155. :py:meth:`.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.
  156. """
  157. self._loop.add_callback(callback)
  158. def call_later(self, delay, callback):
  159. """Implement :py:meth:`.nbio_interface.AbstractIOServices.call_later()`.
  160. """
  161. return _TimerHandle(self._loop.call_later(delay, callback), self._loop)
  162. def getaddrinfo(self,
  163. host,
  164. port,
  165. on_done,
  166. family=0,
  167. socktype=0,
  168. proto=0,
  169. flags=0):
  170. """Implement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
  171. """
  172. return _SelectorIOLoopIOHandle(
  173. _AddressResolver(
  174. native_loop=self._loop,
  175. host=host,
  176. port=port,
  177. family=family,
  178. socktype=socktype,
  179. proto=proto,
  180. flags=flags,
  181. on_done=on_done).start())
  182. def set_reader(self, fd, on_readable):
  183. """Implement
  184. :py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_reader()`.
  185. """
  186. LOGGER.debug('SelectorIOServicesAdapter.set_reader(%s, %r)', fd,
  187. on_readable)
  188. check_fd_arg(fd)
  189. check_callback_arg(on_readable, 'on_readable')
  190. try:
  191. callbacks = self._watchers[fd]
  192. except KeyError:
  193. self._loop.add_handler(fd, self._on_reader_writer_fd_events,
  194. self._readable_mask)
  195. self._watchers[fd] = _FileDescriptorCallbacks(reader=on_readable)
  196. LOGGER.debug('set_reader(%s, _) added handler Rd', fd)
  197. else:
  198. if callbacks.reader is None:
  199. assert callbacks.writer is not None
  200. self._loop.update_handler(
  201. fd, self._readable_mask | self._writable_mask)
  202. LOGGER.debug('set_reader(%s, _) updated handler RdWr', fd)
  203. else:
  204. LOGGER.debug('set_reader(%s, _) replacing reader', fd)
  205. callbacks.reader = on_readable
  206. def remove_reader(self, fd):
  207. """Implement
  208. :py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.
  209. """
  210. LOGGER.debug('SelectorIOServicesAdapter.remove_reader(%s)', fd)
  211. check_fd_arg(fd)
  212. try:
  213. callbacks = self._watchers[fd]
  214. except KeyError:
  215. LOGGER.debug('remove_reader(%s) neither was set', fd)
  216. return False
  217. if callbacks.reader is None:
  218. assert callbacks.writer is not None
  219. LOGGER.debug('remove_reader(%s) reader wasn\'t set Wr', fd)
  220. return False
  221. callbacks.reader = None
  222. if callbacks.writer is None:
  223. del self._watchers[fd]
  224. self._loop.remove_handler(fd)
  225. LOGGER.debug('remove_reader(%s) removed handler', fd)
  226. else:
  227. self._loop.update_handler(fd, self._writable_mask)
  228. LOGGER.debug('remove_reader(%s) updated handler Wr', fd)
  229. return True
  230. def set_writer(self, fd, on_writable):
  231. """Implement
  232. :py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_writer()`.
  233. """
  234. LOGGER.debug('SelectorIOServicesAdapter.set_writer(%s, %r)', fd,
  235. on_writable)
  236. check_fd_arg(fd)
  237. check_callback_arg(on_writable, 'on_writable')
  238. try:
  239. callbacks = self._watchers[fd]
  240. except KeyError:
  241. self._loop.add_handler(fd, self._on_reader_writer_fd_events,
  242. self._writable_mask)
  243. self._watchers[fd] = _FileDescriptorCallbacks(writer=on_writable)
  244. LOGGER.debug('set_writer(%s, _) added handler Wr', fd)
  245. else:
  246. if callbacks.writer is None:
  247. assert callbacks.reader is not None
  248. self._loop.update_handler(
  249. fd, self._readable_mask | self._writable_mask)
  250. LOGGER.debug('set_writer(%s, _) updated handler RdWr', fd)
  251. else:
  252. LOGGER.debug('set_writer(%s, _) replacing writer', fd)
  253. callbacks.writer = on_writable
  254. def remove_writer(self, fd):
  255. """Implement
  256. :py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.
  257. """
  258. LOGGER.debug('SelectorIOServicesAdapter.remove_writer(%s)', fd)
  259. check_fd_arg(fd)
  260. try:
  261. callbacks = self._watchers[fd]
  262. except KeyError:
  263. LOGGER.debug('remove_writer(%s) neither was set.', fd)
  264. return False
  265. if callbacks.writer is None:
  266. assert callbacks.reader is not None
  267. LOGGER.debug('remove_writer(%s) writer wasn\'t set Rd', fd)
  268. return False
  269. callbacks.writer = None
  270. if callbacks.reader is None:
  271. del self._watchers[fd]
  272. self._loop.remove_handler(fd)
  273. LOGGER.debug('remove_writer(%s) removed handler', fd)
  274. else:
  275. self._loop.update_handler(fd, self._readable_mask)
  276. LOGGER.debug('remove_writer(%s) updated handler Rd', fd)
  277. return True
  278. def _on_reader_writer_fd_events(self, fd, events):
  279. """Handle indicated file descriptor events requested via `set_reader()`
  280. and `set_writer()`.
  281. :param fd: file descriptor
  282. :param events: event mask using native loop's READ/WRITE/ERROR. NOTE:
  283. depending on the underlying poller mechanism, ERROR may be indicated
  284. upon certain file description state even though we don't request it.
  285. We ignore ERROR here since `set_reader()`/`set_writer()` don't
  286. request for it.
  287. """
  288. callbacks = self._watchers[fd]
  289. if events & self._readable_mask and callbacks.reader is None:
  290. # NOTE: we check for consistency here ahead of the writer callback
  291. # because the writer callback, if any, can change the events being
  292. # watched
  293. LOGGER.warning(
  294. 'READ indicated on fd=%s, but reader callback is None; '
  295. 'events=%s', fd, bin(events))
  296. if events & self._writable_mask:
  297. if callbacks.writer is not None:
  298. callbacks.writer()
  299. else:
  300. LOGGER.warning(
  301. 'WRITE indicated on fd=%s, but writer callback is None; '
  302. 'events=%s', fd, bin(events))
  303. if events & self._readable_mask:
  304. if callbacks.reader is not None:
  305. callbacks.reader()
  306. else:
  307. # Reader callback might have been removed in the scope of writer
  308. # callback.
  309. pass
  310. class _FileDescriptorCallbacks(object):
  311. """Holds reader and writer callbacks for a file descriptor"""
  312. __slots__ = ('reader', 'writer')
  313. def __init__(self, reader=None, writer=None):
  314. self.reader = reader
  315. self.writer = writer
  316. class _TimerHandle(nbio_interface.AbstractTimerReference):
  317. """This module's adaptation of `nbio_interface.AbstractTimerReference`.
  318. """
  319. def __init__(self, handle, loop):
  320. """
  321. :param opaque handle: timer handle from the underlying loop
  322. implementation that may be passed to its `remove_timeout()` method
  323. :param AbstractSelectorIOLoop loop: the I/O loop instance that created
  324. the timeout.
  325. """
  326. self._handle = handle
  327. self._loop = loop
  328. def cancel(self):
  329. if self._loop is not None:
  330. self._loop.remove_timeout(self._handle)
  331. self._handle = None
  332. self._loop = None
  333. class _SelectorIOLoopIOHandle(nbio_interface.AbstractIOReference):
  334. """This module's adaptation of `nbio_interface.AbstractIOReference`
  335. """
  336. def __init__(self, subject):
  337. """
  338. :param subject: subject of the reference containing a `cancel()` method
  339. """
  340. self._cancel = subject.cancel
  341. def cancel(self):
  342. """Cancel pending operation
  343. :returns: False if was already done or cancelled; True otherwise
  344. :rtype: bool
  345. """
  346. return self._cancel()
  347. class _AddressResolver(object):
  348. """Performs getaddrinfo asynchronously using a thread, then reports result
  349. via callback from the given I/O loop.
  350. NOTE: at this stage, we're using a thread per request, which may prove
  351. inefficient and even prohibitive if the app performs many of these
  352. operations concurrently.
  353. """
  354. NOT_STARTED = 0
  355. ACTIVE = 1
  356. CANCELED = 2
  357. COMPLETED = 3
  358. def __init__(self, native_loop, host, port, family, socktype, proto, flags,
  359. on_done):
  360. """
  361. :param AbstractSelectorIOLoop native_loop:
  362. :param host: `see socket.getaddrinfo()`
  363. :param port: `see socket.getaddrinfo()`
  364. :param family: `see socket.getaddrinfo()`
  365. :param socktype: `see socket.getaddrinfo()`
  366. :param proto: `see socket.getaddrinfo()`
  367. :param flags: `see socket.getaddrinfo()`
  368. :param on_done: on_done(records|BaseException) callback for reporting
  369. result from the given I/O loop. The single arg will be either an
  370. exception object (check for `BaseException`) in case of failure or
  371. the result returned by `socket.getaddrinfo()`.
  372. """
  373. check_callback_arg(on_done, 'on_done')
  374. self._state = self.NOT_STARTED
  375. self._result = None
  376. self._loop = native_loop
  377. self._host = host
  378. self._port = port
  379. self._family = family
  380. self._socktype = socktype
  381. self._proto = proto
  382. self._flags = flags
  383. self._on_done = on_done
  384. self._mutex = threading.Lock()
  385. self._threading_timer = None
  386. def _cleanup(self):
  387. """Release resources
  388. """
  389. self._loop = None
  390. self._threading_timer = None
  391. self._on_done = None
  392. def start(self):
  393. """Start asynchronous DNS lookup.
  394. :rtype: nbio_interface.AbstractIOReference
  395. """
  396. assert self._state == self.NOT_STARTED, self._state
  397. self._state = self.ACTIVE
  398. self._threading_timer = threading.Timer(0, self._resolve)
  399. self._threading_timer.start()
  400. return _SelectorIOLoopIOHandle(self)
  401. def cancel(self):
  402. """Cancel the pending resolver
  403. :returns: False if was already done or cancelled; True otherwise
  404. :rtype: bool
  405. """
  406. # Try to cancel, but no guarantees
  407. with self._mutex:
  408. if self._state == self.ACTIVE:
  409. LOGGER.debug('Canceling resolver for %s:%s', self._host,
  410. self._port)
  411. self._state = self.CANCELED
  412. # Attempt to cancel, but not guaranteed
  413. self._threading_timer.cancel()
  414. self._cleanup()
  415. return True
  416. else:
  417. LOGGER.debug(
  418. 'Ignoring _AddressResolver cancel request when not ACTIVE; '
  419. '(%s:%s); state=%s', self._host, self._port, self._state)
  420. return False
  421. def _resolve(self):
  422. """Call `socket.getaddrinfo()` and return result via user's callback
  423. function on the given I/O loop
  424. """
  425. try:
  426. # NOTE: on python 2.x, can't pass keyword args to getaddrinfo()
  427. result = socket.getaddrinfo(self._host, self._port, self._family,
  428. self._socktype, self._proto,
  429. self._flags)
  430. except Exception as exc: # pylint: disable=W0703
  431. LOGGER.error('Address resolution failed: %r', exc)
  432. result = exc
  433. self._result = result
  434. # Schedule result to be returned to user via user's event loop
  435. with self._mutex:
  436. if self._state == self.ACTIVE:
  437. self._loop.add_callback(self._dispatch_result)
  438. else:
  439. LOGGER.debug(
  440. 'Asynchronous getaddrinfo cancellation detected; '
  441. 'in thread; host=%r', self._host)
  442. def _dispatch_result(self):
  443. """This is called from the user's I/O loop to pass the result to the
  444. user via the user's on_done callback
  445. """
  446. if self._state == self.ACTIVE:
  447. self._state = self.COMPLETED
  448. try:
  449. LOGGER.debug(
  450. 'Invoking asynchronous getaddrinfo() completion callback; '
  451. 'host=%r', self._host)
  452. self._on_done(self._result)
  453. finally:
  454. self._cleanup()
  455. else:
  456. LOGGER.debug(
  457. 'Asynchronous getaddrinfo cancellation detected; '
  458. 'in I/O loop context; host=%r', self._host)