select_connection.py 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264
  1. """A connection adapter that tries to use the best polling method for the
  2. platform pika is running on.
  3. """
  4. import abc
  5. import collections
  6. import errno
  7. import heapq
  8. import logging
  9. import select
  10. import time
  11. import threading
  12. import pika.compat
  13. from pika.adapters.utils import nbio_interface
  14. from pika.adapters.base_connection import BaseConnection
  15. from pika.adapters.utils.selector_ioloop_adapter import (
  16. SelectorIOServicesAdapter, AbstractSelectorIOLoop)
  17. LOGGER = logging.getLogger(__name__)
  18. # One of select, epoll, kqueue or poll
  19. SELECT_TYPE = None
  20. # Reason for this unconventional dict initialization is the fact that on some
  21. # platforms select.error is an aliases for OSError. We don't want the lambda
  22. # for select.error to win over one for OSError.
  23. _SELECT_ERROR_CHECKERS = {}
  24. if pika.compat.PY3:
  25. # InterruptedError is undefined in PY2
  26. # pylint: disable=E0602
  27. _SELECT_ERROR_CHECKERS[InterruptedError] = lambda e: True
  28. _SELECT_ERROR_CHECKERS[select.error] = lambda e: e.args[0] == errno.EINTR
  29. _SELECT_ERROR_CHECKERS[IOError] = lambda e: e.errno == errno.EINTR
  30. _SELECT_ERROR_CHECKERS[OSError] = lambda e: e.errno == errno.EINTR
  31. # We can reduce the number of elements in the list by looking at super-sub
  32. # class relationship because only the most generic ones needs to be caught.
  33. # For now the optimization is left out.
  34. # Following is better but still incomplete.
  35. # _SELECT_ERRORS = tuple(filter(lambda e: not isinstance(e, OSError),
  36. # _SELECT_ERROR_CHECKERS.keys())
  37. # + [OSError])
  38. _SELECT_ERRORS = tuple(_SELECT_ERROR_CHECKERS.keys())
  39. def _is_resumable(exc):
  40. """Check if caught exception represents EINTR error.
  41. :param exc: exception; must be one of classes in _SELECT_ERRORS
  42. """
  43. checker = _SELECT_ERROR_CHECKERS.get(exc.__class__, None)
  44. if checker is not None:
  45. return checker(exc)
  46. else:
  47. return False
  48. class SelectConnection(BaseConnection):
  49. """An asynchronous connection adapter that attempts to use the fastest
  50. event loop adapter for the given platform.
  51. """
  52. def __init__(
  53. self, # pylint: disable=R0913
  54. parameters=None,
  55. on_open_callback=None,
  56. on_open_error_callback=None,
  57. on_close_callback=None,
  58. custom_ioloop=None,
  59. internal_connection_workflow=True):
  60. """Create a new instance of the Connection object.
  61. :param pika.connection.Parameters parameters: Connection parameters
  62. :param callable on_open_callback: Method to call on connection open
  63. :param None | method on_open_error_callback: Called if the connection
  64. can't be established or connection establishment is interrupted by
  65. `Connection.close()`: on_open_error_callback(Connection, exception).
  66. :param None | method on_close_callback: Called when a previously fully
  67. open connection is closed:
  68. `on_close_callback(Connection, exception)`, where `exception` is
  69. either an instance of `exceptions.ConnectionClosed` if closed by
  70. user or broker or exception of another type that describes the cause
  71. of connection failure.
  72. :param None | IOLoop | nbio_interface.AbstractIOServices custom_ioloop:
  73. Provide a custom I/O Loop object.
  74. :param bool internal_connection_workflow: True for autonomous connection
  75. establishment which is default; False for externally-managed
  76. connection workflow via the `create_connection()` factory.
  77. :raises: RuntimeError
  78. """
  79. if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
  80. nbio = custom_ioloop
  81. else:
  82. nbio = SelectorIOServicesAdapter(custom_ioloop or IOLoop())
  83. super(SelectConnection, self).__init__(
  84. parameters,
  85. on_open_callback,
  86. on_open_error_callback,
  87. on_close_callback,
  88. nbio,
  89. internal_connection_workflow=internal_connection_workflow)
  90. @classmethod
  91. def create_connection(cls,
  92. connection_configs,
  93. on_done,
  94. custom_ioloop=None,
  95. workflow=None):
  96. """Implement
  97. :py:classmethod:`pika.adapters.BaseConnection.create_connection()`.
  98. """
  99. nbio = SelectorIOServicesAdapter(custom_ioloop or IOLoop())
  100. def connection_factory(params):
  101. """Connection factory."""
  102. if params is None:
  103. raise ValueError('Expected pika.connection.Parameters '
  104. 'instance, but got None in params arg.')
  105. return cls(
  106. parameters=params,
  107. custom_ioloop=nbio,
  108. internal_connection_workflow=False)
  109. return cls._start_connection_workflow(
  110. connection_configs=connection_configs,
  111. connection_factory=connection_factory,
  112. nbio=nbio,
  113. workflow=workflow,
  114. on_done=on_done)
  115. def _get_write_buffer_size(self):
  116. """
  117. :returns: Current size of output data buffered by the transport
  118. :rtype: int
  119. """
  120. return self._transport.get_write_buffer_size()
  121. class _Timeout(object):
  122. """Represents a timeout"""
  123. __slots__ = (
  124. 'deadline',
  125. 'callback',
  126. )
  127. def __init__(self, deadline, callback):
  128. """
  129. :param float deadline: timer expiration as non-negative epoch number
  130. :param callable callback: callback to call when timeout expires
  131. :raises ValueError, TypeError:
  132. """
  133. if deadline < 0:
  134. raise ValueError(
  135. 'deadline must be non-negative epoch number, but got %r' %
  136. (deadline,))
  137. if not callable(callback):
  138. raise TypeError(
  139. 'callback must be a callable, but got %r' % (callback,))
  140. self.deadline = deadline
  141. self.callback = callback
  142. def __eq__(self, other):
  143. """NOTE: not supporting sort stability"""
  144. if isinstance(other, _Timeout):
  145. return self.deadline == other.deadline
  146. return NotImplemented
  147. def __ne__(self, other):
  148. """NOTE: not supporting sort stability"""
  149. result = self.__eq__(other)
  150. if result is not NotImplemented:
  151. return not result
  152. return NotImplemented
  153. def __lt__(self, other):
  154. """NOTE: not supporting sort stability"""
  155. if isinstance(other, _Timeout):
  156. return self.deadline < other.deadline
  157. return NotImplemented
  158. def __gt__(self, other):
  159. """NOTE: not supporting sort stability"""
  160. if isinstance(other, _Timeout):
  161. return self.deadline > other.deadline
  162. return NotImplemented
  163. def __le__(self, other):
  164. """NOTE: not supporting sort stability"""
  165. if isinstance(other, _Timeout):
  166. return self.deadline <= other.deadline
  167. return NotImplemented
  168. def __ge__(self, other):
  169. """NOTE: not supporting sort stability"""
  170. if isinstance(other, _Timeout):
  171. return self.deadline >= other.deadline
  172. return NotImplemented
  173. class _Timer(object):
  174. """Manage timeouts for use in ioloop"""
  175. # Cancellation count threshold for triggering garbage collection of
  176. # cancelled timers
  177. _GC_CANCELLATION_THRESHOLD = 1024
  178. def __init__(self):
  179. self._timeout_heap = []
  180. # Number of canceled timeouts on heap; for scheduling garbage
  181. # collection of canceled timeouts
  182. self._num_cancellations = 0
  183. def close(self):
  184. """Release resources. Don't use the `_Timer` instance after closing
  185. it
  186. """
  187. # Eliminate potential reference cycles to aid garbage-collection
  188. if self._timeout_heap is not None:
  189. for timeout in self._timeout_heap:
  190. timeout.callback = None
  191. self._timeout_heap = None
  192. def call_later(self, delay, callback):
  193. """Schedule a one-shot timeout given delay seconds.
  194. NOTE: you may cancel the timer before dispatch of the callback. Timer
  195. Manager cancels the timer upon dispatch of the callback.
  196. :param float delay: Non-negative number of seconds from now until
  197. expiration
  198. :param callable callback: The callback method, having the signature
  199. `callback()`
  200. :rtype: _Timeout
  201. :raises ValueError, TypeError
  202. """
  203. if delay < 0:
  204. raise ValueError(
  205. 'call_later: delay must be non-negative, but got %r' % (delay,))
  206. now = pika.compat.time_now()
  207. timeout = _Timeout(now + delay, callback)
  208. heapq.heappush(self._timeout_heap, timeout)
  209. LOGGER.debug(
  210. 'call_later: added timeout %r with deadline=%r and '
  211. 'callback=%r; now=%s; delay=%s', timeout, timeout.deadline,
  212. timeout.callback, now, delay)
  213. return timeout
  214. def remove_timeout(self, timeout):
  215. """Cancel the timeout
  216. :param _Timeout timeout: The timer to cancel
  217. """
  218. # NOTE removing from the heap is difficult, so we just deactivate the
  219. # timeout and garbage-collect it at a later time; see discussion
  220. # in http://docs.python.org/library/heapq.html
  221. if timeout.callback is None:
  222. LOGGER.debug(
  223. 'remove_timeout: timeout was already removed or called %r',
  224. timeout)
  225. else:
  226. LOGGER.debug(
  227. 'remove_timeout: removing timeout %r with deadline=%r '
  228. 'and callback=%r', timeout, timeout.deadline, timeout.callback)
  229. timeout.callback = None
  230. self._num_cancellations += 1
  231. def get_remaining_interval(self):
  232. """Get the interval to the next timeout expiration
  233. :returns: non-negative number of seconds until next timer expiration;
  234. None if there are no timers
  235. :rtype: float
  236. """
  237. if self._timeout_heap:
  238. now = pika.compat.time_now()
  239. interval = max(0, self._timeout_heap[0].deadline - now)
  240. else:
  241. interval = None
  242. return interval
  243. def process_timeouts(self):
  244. """Process pending timeouts, invoking callbacks for those whose time has
  245. come
  246. """
  247. if self._timeout_heap:
  248. now = pika.compat.time_now()
  249. # Remove ready timeouts from the heap now to prevent IO starvation
  250. # from timeouts added during callback processing
  251. ready_timeouts = []
  252. while self._timeout_heap and self._timeout_heap[0].deadline <= now:
  253. timeout = heapq.heappop(self._timeout_heap)
  254. if timeout.callback is not None:
  255. ready_timeouts.append(timeout)
  256. else:
  257. self._num_cancellations -= 1
  258. # Invoke ready timeout callbacks
  259. for timeout in ready_timeouts:
  260. if timeout.callback is None:
  261. # Must have been canceled from a prior callback
  262. self._num_cancellations -= 1
  263. continue
  264. timeout.callback()
  265. timeout.callback = None
  266. # Garbage-collect canceled timeouts if they exceed threshold
  267. if (self._num_cancellations >= self._GC_CANCELLATION_THRESHOLD and
  268. self._num_cancellations > (len(self._timeout_heap) >> 1)):
  269. self._num_cancellations = 0
  270. self._timeout_heap = [
  271. t for t in self._timeout_heap if t.callback is not None
  272. ]
  273. heapq.heapify(self._timeout_heap)
  274. class PollEvents(object):
  275. """Event flags for I/O"""
  276. # Use epoll's constants to keep life easy
  277. READ = getattr(select, 'POLLIN', 0x01) # available for read
  278. WRITE = getattr(select, 'POLLOUT', 0x04) # available for write
  279. ERROR = getattr(select, 'POLLERR', 0x08) # error on associated fd
  280. class IOLoop(AbstractSelectorIOLoop):
  281. """I/O loop implementation that picks a suitable poller (`select`,
  282. `poll`, `epoll`, `kqueue`) to use based on platform.
  283. Implements the
  284. `pika.adapters.utils.selector_ioloop_adapter.AbstractSelectorIOLoop`
  285. interface.
  286. """
  287. # READ/WRITE/ERROR per `AbstractSelectorIOLoop` requirements
  288. READ = PollEvents.READ
  289. WRITE = PollEvents.WRITE
  290. ERROR = PollEvents.ERROR
  291. def __init__(self):
  292. self._timer = _Timer()
  293. # Callbacks requested via `add_callback`
  294. self._callbacks = collections.deque()
  295. self._poller = self._get_poller(self._get_remaining_interval,
  296. self.process_timeouts)
  297. def close(self):
  298. """Release IOLoop's resources.
  299. `IOLoop.close` is intended to be called by the application or test code
  300. only after `IOLoop.start()` returns. After calling `close()`, no other
  301. interaction with the closed instance of `IOLoop` should be performed.
  302. """
  303. if self._callbacks is not None:
  304. self._poller.close()
  305. self._timer.close()
  306. # Set _callbacks to empty list rather than None so that race from
  307. # another thread calling add_callback_threadsafe() won't result in
  308. # AttributeError
  309. self._callbacks = []
  310. @staticmethod
  311. def _get_poller(get_wait_seconds, process_timeouts):
  312. """Determine the best poller to use for this environment and instantiate
  313. it.
  314. :param get_wait_seconds: Function for getting the maximum number of
  315. seconds to wait for IO for use by the poller
  316. :param process_timeouts: Function for processing timeouts for use by the
  317. poller
  318. :returns: The instantiated poller instance supporting `_PollerBase` API
  319. :rtype: object
  320. """
  321. poller = None
  322. kwargs = dict(
  323. get_wait_seconds=get_wait_seconds,
  324. process_timeouts=process_timeouts)
  325. if hasattr(select, 'epoll'):
  326. if not SELECT_TYPE or SELECT_TYPE == 'epoll':
  327. LOGGER.debug('Using EPollPoller')
  328. poller = EPollPoller(**kwargs)
  329. if not poller and hasattr(select, 'kqueue'):
  330. if not SELECT_TYPE or SELECT_TYPE == 'kqueue':
  331. LOGGER.debug('Using KQueuePoller')
  332. poller = KQueuePoller(**kwargs)
  333. if (not poller and hasattr(select, 'poll') and
  334. hasattr(select.poll(), 'modify')): # pylint: disable=E1101
  335. if not SELECT_TYPE or SELECT_TYPE == 'poll':
  336. LOGGER.debug('Using PollPoller')
  337. poller = PollPoller(**kwargs)
  338. if not poller:
  339. LOGGER.debug('Using SelectPoller')
  340. poller = SelectPoller(**kwargs)
  341. return poller
  342. def call_later(self, delay, callback):
  343. """Add the callback to the IOLoop timer to be called after delay seconds
  344. from the time of call on best-effort basis. Returns a handle to the
  345. timeout.
  346. :param float delay: The number of seconds to wait to call callback
  347. :param callable callback: The callback method
  348. :returns: handle to the created timeout that may be passed to
  349. `remove_timeout()`
  350. :rtype: object
  351. """
  352. return self._timer.call_later(delay, callback)
  353. def remove_timeout(self, timeout_handle):
  354. """Remove a timeout
  355. :param timeout_handle: Handle of timeout to remove
  356. """
  357. self._timer.remove_timeout(timeout_handle)
  358. def add_callback_threadsafe(self, callback):
  359. """Requests a call to the given function as soon as possible in the
  360. context of this IOLoop's thread.
  361. NOTE: This is the only thread-safe method in IOLoop. All other
  362. manipulations of IOLoop must be performed from the IOLoop's thread.
  363. For example, a thread may request a call to the `stop` method of an
  364. ioloop that is running in a different thread via
  365. `ioloop.add_callback_threadsafe(ioloop.stop)`
  366. :param callable callback: The callback method
  367. """
  368. if not callable(callback):
  369. raise TypeError(
  370. 'callback must be a callable, but got %r' % (callback,))
  371. # NOTE: `deque.append` is atomic
  372. self._callbacks.append(callback)
  373. # Wake up the IOLoop which may be running in another thread
  374. self._poller.wake_threadsafe()
  375. LOGGER.debug('add_callback_threadsafe: added callback=%r', callback)
  376. # To satisfy `AbstractSelectorIOLoop` requirement
  377. add_callback = add_callback_threadsafe
  378. def process_timeouts(self):
  379. """[Extension] Process pending callbacks and timeouts, invoking those
  380. whose time has come. Internal use only.
  381. """
  382. # Avoid I/O starvation by postponing new callbacks to the next iteration
  383. for _ in pika.compat.xrange(len(self._callbacks)):
  384. callback = self._callbacks.popleft()
  385. LOGGER.debug('process_timeouts: invoking callback=%r', callback)
  386. callback()
  387. self._timer.process_timeouts()
  388. def _get_remaining_interval(self):
  389. """Get the remaining interval to the next callback or timeout
  390. expiration.
  391. :returns: non-negative number of seconds until next callback or timer
  392. expiration; None if there are no callbacks and timers
  393. :rtype: float
  394. """
  395. if self._callbacks:
  396. return 0
  397. return self._timer.get_remaining_interval()
  398. def add_handler(self, fd, handler, events):
  399. """Start watching the given file descriptor for events
  400. :param int fd: The file descriptor
  401. :param callable handler: When requested event(s) occur,
  402. `handler(fd, events)` will be called.
  403. :param int events: The event mask using READ, WRITE, ERROR.
  404. """
  405. self._poller.add_handler(fd, handler, events)
  406. def update_handler(self, fd, events):
  407. """Changes the events we watch for
  408. :param int fd: The file descriptor
  409. :param int events: The event mask using READ, WRITE, ERROR
  410. """
  411. self._poller.update_handler(fd, events)
  412. def remove_handler(self, fd):
  413. """Stop watching the given file descriptor for events
  414. :param int fd: The file descriptor
  415. """
  416. self._poller.remove_handler(fd)
  417. def start(self):
  418. """[API] Start the main poller loop. It will loop until requested to
  419. exit. See `IOLoop.stop`.
  420. """
  421. self._poller.start()
  422. def stop(self):
  423. """[API] Request exit from the ioloop. The loop is NOT guaranteed to
  424. stop before this method returns.
  425. To invoke `stop()` safely from a thread other than this IOLoop's thread,
  426. call it via `add_callback_threadsafe`; e.g.,
  427. `ioloop.add_callback_threadsafe(ioloop.stop)`
  428. """
  429. self._poller.stop()
  430. def activate_poller(self):
  431. """[Extension] Activate the poller
  432. """
  433. self._poller.activate_poller()
  434. def deactivate_poller(self):
  435. """[Extension] Deactivate the poller
  436. """
  437. self._poller.deactivate_poller()
  438. def poll(self):
  439. """[Extension] Wait for events of interest on registered file
  440. descriptors until an event of interest occurs or next timer deadline or
  441. `_PollerBase._MAX_POLL_TIMEOUT`, whichever is sooner, and dispatch the
  442. corresponding event handlers.
  443. """
  444. self._poller.poll()
  445. class _PollerBase(pika.compat.AbstractBase): # pylint: disable=R0902
  446. """Base class for select-based IOLoop implementations"""
  447. # Drop out of the poll loop every _MAX_POLL_TIMEOUT secs as a worst case;
  448. # this is only a backstop value; we will run timeouts when they are
  449. # scheduled.
  450. _MAX_POLL_TIMEOUT = 5
  451. # if the poller uses MS override with 1000
  452. POLL_TIMEOUT_MULT = 1
  453. def __init__(self, get_wait_seconds, process_timeouts):
  454. """
  455. :param get_wait_seconds: Function for getting the maximum number of
  456. seconds to wait for IO for use by the poller
  457. :param process_timeouts: Function for processing timeouts for use by the
  458. poller
  459. """
  460. self._get_wait_seconds = get_wait_seconds
  461. self._process_timeouts = process_timeouts
  462. # We guard access to the waking file descriptors to avoid races from
  463. # closing them while another thread is calling our `wake()` method.
  464. self._waking_mutex = threading.Lock()
  465. # fd-to-handler function mappings
  466. self._fd_handlers = dict()
  467. # event-to-fdset mappings
  468. self._fd_events = {
  469. PollEvents.READ: set(),
  470. PollEvents.WRITE: set(),
  471. PollEvents.ERROR: set()
  472. }
  473. self._processing_fd_event_map = {}
  474. # Reentrancy tracker of the `start` method
  475. self._running = False
  476. self._stopping = False
  477. # Create ioloop-interrupt socket pair and register read handler.
  478. self._r_interrupt, self._w_interrupt = self._get_interrupt_pair()
  479. self.add_handler(self._r_interrupt.fileno(), self._read_interrupt,
  480. PollEvents.READ)
  481. def close(self):
  482. """Release poller's resources.
  483. `close()` is intended to be called after the poller's `start()` method
  484. returns. After calling `close()`, no other interaction with the closed
  485. poller instance should be performed.
  486. """
  487. # Unregister and close ioloop-interrupt socket pair; mutual exclusion is
  488. # necessary to avoid race condition with `wake_threadsafe` executing in
  489. # another thread's context
  490. assert not self._running, 'Cannot call close() before start() unwinds.'
  491. with self._waking_mutex:
  492. if self._w_interrupt is not None:
  493. self.remove_handler(self._r_interrupt.fileno()) # pylint: disable=E1101
  494. self._r_interrupt.close()
  495. self._r_interrupt = None
  496. self._w_interrupt.close()
  497. self._w_interrupt = None
  498. self.deactivate_poller()
  499. self._fd_handlers = None
  500. self._fd_events = None
  501. self._processing_fd_event_map = None
  502. def wake_threadsafe(self):
  503. """Wake up the poller as soon as possible. As the name indicates, this
  504. method is thread-safe.
  505. """
  506. with self._waking_mutex:
  507. if self._w_interrupt is None:
  508. return
  509. try:
  510. # Send byte to interrupt the poll loop, use send() instead of
  511. # os.write for Windows compatibility
  512. self._w_interrupt.send(b'X')
  513. except pika.compat.SOCKET_ERROR as err:
  514. if err.errno != errno.EWOULDBLOCK:
  515. raise
  516. except Exception as err:
  517. # There's nothing sensible to do here, we'll exit the interrupt
  518. # loop after POLL_TIMEOUT secs in worst case anyway.
  519. LOGGER.warning("Failed to send interrupt to poller: %s", err)
  520. raise
  521. def _get_max_wait(self):
  522. """Get the interval to the next timeout event, or a default interval
  523. :returns: maximum number of self.POLL_TIMEOUT_MULT-scaled time units
  524. to wait for IO events
  525. :rtype: int
  526. """
  527. delay = self._get_wait_seconds()
  528. if delay is None:
  529. delay = self._MAX_POLL_TIMEOUT
  530. else:
  531. delay = min(delay, self._MAX_POLL_TIMEOUT)
  532. return delay * self.POLL_TIMEOUT_MULT
  533. def add_handler(self, fileno, handler, events):
  534. """Add a new fileno to the set to be monitored
  535. :param int fileno: The file descriptor
  536. :param callable handler: What is called when an event happens
  537. :param int events: The event mask using READ, WRITE, ERROR
  538. """
  539. self._fd_handlers[fileno] = handler
  540. self._set_handler_events(fileno, events)
  541. # Inform the derived class
  542. self._register_fd(fileno, events)
  543. def update_handler(self, fileno, events):
  544. """Set the events to the current events
  545. :param int fileno: The file descriptor
  546. :param int events: The event mask using READ, WRITE, ERROR
  547. """
  548. # Record the change
  549. events_cleared, events_set = self._set_handler_events(fileno, events)
  550. # Inform the derived class
  551. self._modify_fd_events(
  552. fileno,
  553. events=events,
  554. events_to_clear=events_cleared,
  555. events_to_set=events_set)
  556. def remove_handler(self, fileno):
  557. """Remove a file descriptor from the set
  558. :param int fileno: The file descriptor
  559. """
  560. try:
  561. del self._processing_fd_event_map[fileno]
  562. except KeyError:
  563. pass
  564. events_cleared, _ = self._set_handler_events(fileno, 0)
  565. del self._fd_handlers[fileno]
  566. # Inform the derived class
  567. self._unregister_fd(fileno, events_to_clear=events_cleared)
  568. def _set_handler_events(self, fileno, events):
  569. """Set the handler's events to the given events; internal to
  570. `_PollerBase`.
  571. :param int fileno: The file descriptor
  572. :param int events: The event mask (READ, WRITE, ERROR)
  573. :returns: a 2-tuple (events_cleared, events_set)
  574. :rtype: tuple
  575. """
  576. events_cleared = 0
  577. events_set = 0
  578. for evt in (PollEvents.READ, PollEvents.WRITE, PollEvents.ERROR):
  579. if events & evt:
  580. if fileno not in self._fd_events[evt]:
  581. self._fd_events[evt].add(fileno)
  582. events_set |= evt
  583. else:
  584. if fileno in self._fd_events[evt]:
  585. self._fd_events[evt].discard(fileno)
  586. events_cleared |= evt
  587. return events_cleared, events_set
  588. def activate_poller(self):
  589. """Activate the poller
  590. """
  591. # Activate the underlying poller and register current events
  592. self._init_poller()
  593. fd_to_events = collections.defaultdict(int)
  594. for event, file_descriptors in self._fd_events.items():
  595. for fileno in file_descriptors:
  596. fd_to_events[fileno] |= event
  597. for fileno, events in fd_to_events.items():
  598. self._register_fd(fileno, events)
  599. def deactivate_poller(self):
  600. """Deactivate the poller
  601. """
  602. self._uninit_poller()
  603. def start(self):
  604. """Start the main poller loop. It will loop until requested to exit.
  605. This method is not reentrant and will raise an error if called
  606. recursively (pika/pika#1095)
  607. :raises: RuntimeError
  608. """
  609. if self._running:
  610. raise RuntimeError('IOLoop is not reentrant and is already running')
  611. else:
  612. LOGGER.debug('Entering IOLoop')
  613. self._running = True
  614. self.activate_poller()
  615. try:
  616. # Run event loop
  617. while not self._stopping:
  618. self.poll()
  619. self._process_timeouts()
  620. finally:
  621. try:
  622. LOGGER.debug('Deactivating poller')
  623. self.deactivate_poller()
  624. finally:
  625. self._stopping = False
  626. self._running = False
  627. def stop(self):
  628. """Request exit from the ioloop. The loop is NOT guaranteed to stop
  629. before this method returns.
  630. """
  631. LOGGER.debug('Stopping IOLoop')
  632. self._stopping = True
  633. self.wake_threadsafe()
  634. @abc.abstractmethod
  635. def poll(self):
  636. """Wait for events on interested filedescriptors.
  637. """
  638. raise NotImplementedError
  639. @abc.abstractmethod
  640. def _init_poller(self):
  641. """Notify the implementation to allocate the poller resource"""
  642. raise NotImplementedError
  643. @abc.abstractmethod
  644. def _uninit_poller(self):
  645. """Notify the implementation to release the poller resource"""
  646. raise NotImplementedError
  647. @abc.abstractmethod
  648. def _register_fd(self, fileno, events):
  649. """The base class invokes this method to notify the implementation to
  650. register the file descriptor with the polling object. The request must
  651. be ignored if the poller is not activated.
  652. :param int fileno: The file descriptor
  653. :param int events: The event mask (READ, WRITE, ERROR)
  654. """
  655. raise NotImplementedError
  656. @abc.abstractmethod
  657. def _modify_fd_events(self, fileno, events, events_to_clear, events_to_set):
  658. """The base class invoikes this method to notify the implementation to
  659. modify an already registered file descriptor. The request must be
  660. ignored if the poller is not activated.
  661. :param int fileno: The file descriptor
  662. :param int events: absolute events (READ, WRITE, ERROR)
  663. :param int events_to_clear: The events to clear (READ, WRITE, ERROR)
  664. :param int events_to_set: The events to set (READ, WRITE, ERROR)
  665. """
  666. raise NotImplementedError
  667. @abc.abstractmethod
  668. def _unregister_fd(self, fileno, events_to_clear):
  669. """The base class invokes this method to notify the implementation to
  670. unregister the file descriptor being tracked by the polling object. The
  671. request must be ignored if the poller is not activated.
  672. :param int fileno: The file descriptor
  673. :param int events_to_clear: The events to clear (READ, WRITE, ERROR)
  674. """
  675. raise NotImplementedError
  676. def _dispatch_fd_events(self, fd_event_map):
  677. """ Helper to dispatch callbacks for file descriptors that received
  678. events.
  679. Before doing so we re-calculate the event mask based on what is
  680. currently set in case it has been changed under our feet by a
  681. previous callback. We also take a store a refernce to the
  682. fd_event_map so that we can detect removal of an
  683. fileno during processing of another callback and not generate
  684. spurious callbacks on it.
  685. :param dict fd_event_map: Map of fds to events received on them.
  686. """
  687. # Reset the prior map; if the call is nested, this will suppress the
  688. # remaining dispatch in the earlier call.
  689. self._processing_fd_event_map.clear()
  690. self._processing_fd_event_map = fd_event_map
  691. for fileno in pika.compat.dictkeys(fd_event_map):
  692. if fileno not in fd_event_map:
  693. # the fileno has been removed from the map under our feet.
  694. continue
  695. events = fd_event_map[fileno]
  696. for evt in [PollEvents.READ, PollEvents.WRITE, PollEvents.ERROR]:
  697. if fileno not in self._fd_events[evt]:
  698. events &= ~evt
  699. if events:
  700. handler = self._fd_handlers[fileno]
  701. handler(fileno, events)
  702. @staticmethod
  703. def _get_interrupt_pair():
  704. """ Use a socketpair to be able to interrupt the ioloop if called
  705. from another thread. Socketpair() is not supported on some OS (Win)
  706. so use a pair of simple TCP sockets instead. The sockets will be
  707. closed and garbage collected by python when the ioloop itself is.
  708. """
  709. return pika.compat._nonblocking_socketpair() # pylint: disable=W0212
  710. def _read_interrupt(self, _interrupt_fd, _events):
  711. """ Read the interrupt byte(s). We ignore the event mask as we can ony
  712. get here if there's data to be read on our fd.
  713. :param int _interrupt_fd: (unused) The file descriptor to read from
  714. :param int _events: (unused) The events generated for this fd
  715. """
  716. try:
  717. # NOTE Use recv instead of os.read for windows compatibility
  718. self._r_interrupt.recv(512) # pylint: disable=E1101
  719. except pika.compat.SOCKET_ERROR as err:
  720. if err.errno != errno.EAGAIN:
  721. raise
  722. class SelectPoller(_PollerBase):
  723. """Default behavior is to use Select since it's the widest supported and has
  724. all of the methods we need for child classes as well. One should only need
  725. to override the update_handler and start methods for additional types.
  726. """
  727. # if the poller uses MS specify 1000
  728. POLL_TIMEOUT_MULT = 1
  729. def poll(self):
  730. """Wait for events of interest on registered file descriptors until an
  731. event of interest occurs or next timer deadline or _MAX_POLL_TIMEOUT,
  732. whichever is sooner, and dispatch the corresponding event handlers.
  733. """
  734. while True:
  735. try:
  736. if (self._fd_events[PollEvents.READ] or
  737. self._fd_events[PollEvents.WRITE] or
  738. self._fd_events[PollEvents.ERROR]):
  739. read, write, error = select.select(
  740. self._fd_events[PollEvents.READ],
  741. self._fd_events[PollEvents.WRITE],
  742. self._fd_events[PollEvents.ERROR], self._get_max_wait())
  743. else:
  744. # NOTE When called without any FDs, select fails on
  745. # Windows with error 10022, 'An invalid argument was
  746. # supplied'.
  747. time.sleep(self._get_max_wait())
  748. read, write, error = [], [], []
  749. break
  750. except _SELECT_ERRORS as error:
  751. if _is_resumable(error):
  752. continue
  753. else:
  754. raise
  755. # Build an event bit mask for each fileno we've received an event for
  756. fd_event_map = collections.defaultdict(int)
  757. for fd_set, evt in zip(
  758. (read, write, error),
  759. (PollEvents.READ, PollEvents.WRITE, PollEvents.ERROR)):
  760. for fileno in fd_set:
  761. fd_event_map[fileno] |= evt
  762. self._dispatch_fd_events(fd_event_map)
  763. def _init_poller(self):
  764. """Notify the implementation to allocate the poller resource"""
  765. # It's a no op in SelectPoller
  766. def _uninit_poller(self):
  767. """Notify the implementation to release the poller resource"""
  768. # It's a no op in SelectPoller
  769. def _register_fd(self, fileno, events):
  770. """The base class invokes this method to notify the implementation to
  771. register the file descriptor with the polling object. The request must
  772. be ignored if the poller is not activated.
  773. :param int fileno: The file descriptor
  774. :param int events: The event mask using READ, WRITE, ERROR
  775. """
  776. # It's a no op in SelectPoller
  777. def _modify_fd_events(self, fileno, events, events_to_clear, events_to_set):
  778. """The base class invoikes this method to notify the implementation to
  779. modify an already registered file descriptor. The request must be
  780. ignored if the poller is not activated.
  781. :param int fileno: The file descriptor
  782. :param int events: absolute events (READ, WRITE, ERROR)
  783. :param int events_to_clear: The events to clear (READ, WRITE, ERROR)
  784. :param int events_to_set: The events to set (READ, WRITE, ERROR)
  785. """
  786. # It's a no op in SelectPoller
  787. def _unregister_fd(self, fileno, events_to_clear):
  788. """The base class invokes this method to notify the implementation to
  789. unregister the file descriptor being tracked by the polling object. The
  790. request must be ignored if the poller is not activated.
  791. :param int fileno: The file descriptor
  792. :param int events_to_clear: The events to clear (READ, WRITE, ERROR)
  793. """
  794. # It's a no op in SelectPoller
  795. class KQueuePoller(_PollerBase):
  796. # pylint: disable=E1101
  797. """KQueuePoller works on BSD based systems and is faster than select"""
  798. def __init__(self, get_wait_seconds, process_timeouts):
  799. """Create an instance of the KQueuePoller
  800. """
  801. self._kqueue = None
  802. super(KQueuePoller, self).__init__(get_wait_seconds, process_timeouts)
  803. @staticmethod
  804. def _map_event(kevent):
  805. """return the event type associated with a kevent object
  806. :param kevent kevent: a kevent object as returned by kqueue.control()
  807. """
  808. mask = 0
  809. if kevent.filter == select.KQ_FILTER_READ:
  810. mask = PollEvents.READ
  811. elif kevent.filter == select.KQ_FILTER_WRITE:
  812. mask = PollEvents.WRITE
  813. if kevent.flags & select.KQ_EV_EOF:
  814. # May be set when the peer reader disconnects. We don't check
  815. # KQ_EV_EOF for KQ_FILTER_READ because in that case it may be
  816. # set before the remaining data is consumed from sockbuf.
  817. mask |= PollEvents.ERROR
  818. elif kevent.flags & select.KQ_EV_ERROR:
  819. mask = PollEvents.ERROR
  820. else:
  821. LOGGER.critical('Unexpected kevent: %s', kevent)
  822. return mask
  823. def poll(self):
  824. """Wait for events of interest on registered file descriptors until an
  825. event of interest occurs or next timer deadline or _MAX_POLL_TIMEOUT,
  826. whichever is sooner, and dispatch the corresponding event handlers.
  827. """
  828. while True:
  829. try:
  830. kevents = self._kqueue.control(None, 1000, self._get_max_wait())
  831. break
  832. except _SELECT_ERRORS as error:
  833. if _is_resumable(error):
  834. continue
  835. else:
  836. raise
  837. fd_event_map = collections.defaultdict(int)
  838. for event in kevents:
  839. fd_event_map[event.ident] |= self._map_event(event)
  840. self._dispatch_fd_events(fd_event_map)
  841. def _init_poller(self):
  842. """Notify the implementation to allocate the poller resource"""
  843. assert self._kqueue is None
  844. self._kqueue = select.kqueue()
  845. def _uninit_poller(self):
  846. """Notify the implementation to release the poller resource"""
  847. if self._kqueue is not None:
  848. self._kqueue.close()
  849. self._kqueue = None
  850. def _register_fd(self, fileno, events):
  851. """The base class invokes this method to notify the implementation to
  852. register the file descriptor with the polling object. The request must
  853. be ignored if the poller is not activated.
  854. :param int fileno: The file descriptor
  855. :param int events: The event mask using READ, WRITE, ERROR
  856. """
  857. self._modify_fd_events(
  858. fileno, events=events, events_to_clear=0, events_to_set=events)
  859. def _modify_fd_events(self, fileno, events, events_to_clear, events_to_set):
  860. """The base class invoikes this method to notify the implementation to
  861. modify an already registered file descriptor. The request must be
  862. ignored if the poller is not activated.
  863. :param int fileno: The file descriptor
  864. :param int events: absolute events (READ, WRITE, ERROR)
  865. :param int events_to_clear: The events to clear (READ, WRITE, ERROR)
  866. :param int events_to_set: The events to set (READ, WRITE, ERROR)
  867. """
  868. if self._kqueue is None:
  869. return
  870. kevents = list()
  871. if events_to_clear & PollEvents.READ:
  872. kevents.append(
  873. select.kevent(
  874. fileno,
  875. filter=select.KQ_FILTER_READ,
  876. flags=select.KQ_EV_DELETE))
  877. if events_to_set & PollEvents.READ:
  878. kevents.append(
  879. select.kevent(
  880. fileno,
  881. filter=select.KQ_FILTER_READ,
  882. flags=select.KQ_EV_ADD))
  883. if events_to_clear & PollEvents.WRITE:
  884. kevents.append(
  885. select.kevent(
  886. fileno,
  887. filter=select.KQ_FILTER_WRITE,
  888. flags=select.KQ_EV_DELETE))
  889. if events_to_set & PollEvents.WRITE:
  890. kevents.append(
  891. select.kevent(
  892. fileno,
  893. filter=select.KQ_FILTER_WRITE,
  894. flags=select.KQ_EV_ADD))
  895. self._kqueue.control(kevents, 0)
  896. def _unregister_fd(self, fileno, events_to_clear):
  897. """The base class invokes this method to notify the implementation to
  898. unregister the file descriptor being tracked by the polling object. The
  899. request must be ignored if the poller is not activated.
  900. :param int fileno: The file descriptor
  901. :param int events_to_clear: The events to clear (READ, WRITE, ERROR)
  902. """
  903. self._modify_fd_events(
  904. fileno, events=0, events_to_clear=events_to_clear, events_to_set=0)
  905. class PollPoller(_PollerBase):
  906. """Poll works on Linux and can have better performance than EPoll in
  907. certain scenarios. Both are faster than select.
  908. """
  909. POLL_TIMEOUT_MULT = 1000
  910. def __init__(self, get_wait_seconds, process_timeouts):
  911. """Create an instance of the KQueuePoller
  912. """
  913. self._poll = None
  914. super(PollPoller, self).__init__(get_wait_seconds, process_timeouts)
  915. @staticmethod
  916. def _create_poller():
  917. """
  918. :rtype: `select.poll`
  919. """
  920. return select.poll() # pylint: disable=E1101
  921. def poll(self):
  922. """Wait for events of interest on registered file descriptors until an
  923. event of interest occurs or next timer deadline or _MAX_POLL_TIMEOUT,
  924. whichever is sooner, and dispatch the corresponding event handlers.
  925. """
  926. while True:
  927. try:
  928. events = self._poll.poll(self._get_max_wait())
  929. break
  930. except _SELECT_ERRORS as error:
  931. if _is_resumable(error):
  932. continue
  933. else:
  934. raise
  935. fd_event_map = collections.defaultdict(int)
  936. for fileno, event in events:
  937. # NOTE: On OS X, when poll() sets POLLHUP, it's mutually-exclusive with
  938. # POLLOUT and it doesn't seem to set POLLERR along with POLLHUP when
  939. # socket connection fails, for example. So, we need to at least add
  940. # POLLERR when we see POLLHUP
  941. if (event & select.POLLHUP) and pika.compat.ON_OSX:
  942. event |= select.POLLERR
  943. fd_event_map[fileno] |= event
  944. self._dispatch_fd_events(fd_event_map)
  945. def _init_poller(self):
  946. """Notify the implementation to allocate the poller resource"""
  947. assert self._poll is None
  948. self._poll = self._create_poller()
  949. def _uninit_poller(self):
  950. """Notify the implementation to release the poller resource"""
  951. if self._poll is not None:
  952. if hasattr(self._poll, "close"):
  953. self._poll.close()
  954. self._poll = None
  955. def _register_fd(self, fileno, events):
  956. """The base class invokes this method to notify the implementation to
  957. register the file descriptor with the polling object. The request must
  958. be ignored if the poller is not activated.
  959. :param int fileno: The file descriptor
  960. :param int events: The event mask using READ, WRITE, ERROR
  961. """
  962. if self._poll is not None:
  963. self._poll.register(fileno, events)
  964. def _modify_fd_events(self, fileno, events, events_to_clear, events_to_set):
  965. """The base class invoikes this method to notify the implementation to
  966. modify an already registered file descriptor. The request must be
  967. ignored if the poller is not activated.
  968. :param int fileno: The file descriptor
  969. :param int events: absolute events (READ, WRITE, ERROR)
  970. :param int events_to_clear: The events to clear (READ, WRITE, ERROR)
  971. :param int events_to_set: The events to set (READ, WRITE, ERROR)
  972. """
  973. if self._poll is not None:
  974. self._poll.modify(fileno, events)
  975. def _unregister_fd(self, fileno, events_to_clear):
  976. """The base class invokes this method to notify the implementation to
  977. unregister the file descriptor being tracked by the polling object. The
  978. request must be ignored if the poller is not activated.
  979. :param int fileno: The file descriptor
  980. :param int events_to_clear: The events to clear (READ, WRITE, ERROR)
  981. """
  982. if self._poll is not None:
  983. self._poll.unregister(fileno)
  984. class EPollPoller(PollPoller):
  985. """EPoll works on Linux and can have better performance than Poll in
  986. certain scenarios. Both are faster than select.
  987. """
  988. POLL_TIMEOUT_MULT = 1
  989. @staticmethod
  990. def _create_poller():
  991. """
  992. :rtype: `select.poll`
  993. """
  994. return select.epoll() # pylint: disable=E1101