ioloop.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2009 Facebook
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License. You may obtain
  7. # a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14. # License for the specific language governing permissions and limitations
  15. # under the License.
  16. """An I/O event loop for non-blocking sockets.
  17. Typical applications will use a single `IOLoop` object, in the
  18. `IOLoop.instance` singleton. The `IOLoop.start` method should usually
  19. be called at the end of the ``main()`` function. Atypical applications may
  20. use more than one `IOLoop`, such as one `IOLoop` per thread, or per `unittest`
  21. case.
  22. In addition to I/O events, the `IOLoop` can also schedule time-based events.
  23. `IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`.
  24. """
  25. from __future__ import absolute_import, division, print_function, with_statement
  26. import datetime
  27. import errno
  28. import functools
  29. import heapq
  30. import itertools
  31. import logging
  32. import numbers
  33. import os
  34. import select
  35. import sys
  36. import threading
  37. import time
  38. import traceback
  39. import math
  40. from .concurrent import TracebackFuture, is_future
  41. from .log import app_log, gen_log
  42. from . import stack_context
  43. from .util import Configurable, errno_from_exception, timedelta_to_seconds
  44. try:
  45. import signal
  46. except ImportError:
  47. signal = None
  48. try:
  49. import thread # py2
  50. except ImportError:
  51. import _thread as thread # py3
  52. from .platform.auto import set_close_exec, Waker
  53. _POLL_TIMEOUT = 3600.0
  54. class TimeoutError(Exception):
  55. pass
  56. class IOLoop(Configurable):
  57. """A level-triggered I/O loop.
  58. We use ``epoll`` (Linux) or ``kqueue`` (BSD and Mac OS X) if they
  59. are available, or else we fall back on select(). If you are
  60. implementing a system that needs to handle thousands of
  61. simultaneous connections, you should use a system that supports
  62. either ``epoll`` or ``kqueue``.
  63. Example usage for a simple TCP server:
  64. .. testcode::
  65. import errno
  66. import functools
  67. import tornado.ioloop
  68. import socket
  69. def connection_ready(sock, fd, events):
  70. while True:
  71. try:
  72. connection, address = sock.accept()
  73. except socket.error as e:
  74. if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
  75. raise
  76. return
  77. connection.setblocking(0)
  78. handle_connection(connection, address)
  79. if __name__ == '__main__':
  80. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  81. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  82. sock.setblocking(0)
  83. sock.bind(("", port))
  84. sock.listen(128)
  85. io_loop = tornado.ioloop.IOLoop.current()
  86. callback = functools.partial(connection_ready, sock)
  87. io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
  88. io_loop.start()
  89. .. testoutput::
  90. :hide:
  91. By default, a newly-constructed `IOLoop` becomes the thread's current
  92. `IOLoop`, unless there already is a current `IOLoop`. This behavior
  93. can be controlled with the ``make_current`` argument to the `IOLoop`
  94. constructor: if ``make_current=True``, the new `IOLoop` will always
  95. try to become current and it raises an error if there is already a
  96. current instance. If ``make_current=False``, the new `IOLoop` will
  97. not try to become current.
  98. .. versionchanged:: 4.2
  99. Added the ``make_current`` keyword argument to the `IOLoop`
  100. constructor.
  101. """
  102. # Constants from the epoll module
  103. _EPOLLIN = 0x001
  104. _EPOLLPRI = 0x002
  105. _EPOLLOUT = 0x004
  106. _EPOLLERR = 0x008
  107. _EPOLLHUP = 0x010
  108. _EPOLLRDHUP = 0x2000
  109. _EPOLLONESHOT = (1 << 30)
  110. _EPOLLET = (1 << 31)
  111. # Our events map exactly to the epoll events
  112. NONE = 0
  113. READ = _EPOLLIN
  114. WRITE = _EPOLLOUT
  115. ERROR = _EPOLLERR | _EPOLLHUP
  116. # Global lock for creating global IOLoop instance
  117. _instance_lock = threading.Lock()
  118. _current = threading.local()
  119. @staticmethod
  120. def instance():
  121. """Returns a global `IOLoop` instance.
  122. Most applications have a single, global `IOLoop` running on the
  123. main thread. Use this method to get this instance from
  124. another thread. In most other cases, it is better to use `current()`
  125. to get the current thread's `IOLoop`.
  126. """
  127. if not hasattr(IOLoop, "_instance"):
  128. with IOLoop._instance_lock:
  129. if not hasattr(IOLoop, "_instance"):
  130. # New instance after double check
  131. IOLoop._instance = IOLoop()
  132. return IOLoop._instance
  133. @staticmethod
  134. def initialized():
  135. """Returns true if the singleton instance has been created."""
  136. return hasattr(IOLoop, "_instance")
  137. def install(self):
  138. """Installs this `IOLoop` object as the singleton instance.
  139. This is normally not necessary as `instance()` will create
  140. an `IOLoop` on demand, but you may want to call `install` to use
  141. a custom subclass of `IOLoop`.
  142. """
  143. assert not IOLoop.initialized()
  144. IOLoop._instance = self
  145. @staticmethod
  146. def clear_instance():
  147. """Clear the global `IOLoop` instance.
  148. .. versionadded:: 4.0
  149. """
  150. if hasattr(IOLoop, "_instance"):
  151. del IOLoop._instance
  152. @staticmethod
  153. def current(instance=True):
  154. """Returns the current thread's `IOLoop`.
  155. If an `IOLoop` is currently running or has been marked as
  156. current by `make_current`, returns that instance. If there is
  157. no current `IOLoop`, returns `IOLoop.instance()` (i.e. the
  158. main thread's `IOLoop`, creating one if necessary) if ``instance``
  159. is true.
  160. In general you should use `IOLoop.current` as the default when
  161. constructing an asynchronous object, and use `IOLoop.instance`
  162. when you mean to communicate to the main thread from a different
  163. one.
  164. .. versionchanged:: 4.1
  165. Added ``instance`` argument to control the fallback to
  166. `IOLoop.instance()`.
  167. """
  168. current = getattr(IOLoop._current, "instance", None)
  169. if current is None and instance:
  170. return IOLoop.instance()
  171. return current
  172. def make_current(self):
  173. """Makes this the `IOLoop` for the current thread.
  174. An `IOLoop` automatically becomes current for its thread
  175. when it is started, but it is sometimes useful to call
  176. `make_current` explicitly before starting the `IOLoop`,
  177. so that code run at startup time can find the right
  178. instance.
  179. .. versionchanged:: 4.1
  180. An `IOLoop` created while there is no current `IOLoop`
  181. will automatically become current.
  182. """
  183. IOLoop._current.instance = self
  184. @staticmethod
  185. def clear_current():
  186. IOLoop._current.instance = None
  187. @classmethod
  188. def configurable_base(cls):
  189. return IOLoop
  190. @classmethod
  191. def configurable_default(cls):
  192. # this is the only patch to IOLoop:
  193. from zmq.eventloop.ioloop import ZMQIOLoop
  194. return ZMQIOLoop
  195. if hasattr(select, "epoll"):
  196. from tornado.platform.epoll import EPollIOLoop
  197. return EPollIOLoop
  198. if hasattr(select, "kqueue"):
  199. # Python 2.6+ on BSD or Mac
  200. from tornado.platform.kqueue import KQueueIOLoop
  201. return KQueueIOLoop
  202. from tornado.platform.select import SelectIOLoop
  203. return SelectIOLoop
  204. def initialize(self, make_current=None):
  205. if make_current is None:
  206. if IOLoop.current(instance=False) is None:
  207. self.make_current()
  208. elif make_current:
  209. if IOLoop.current(instance=False) is not None:
  210. raise RuntimeError("current IOLoop already exists")
  211. self.make_current()
  212. def close(self, all_fds=False):
  213. """Closes the `IOLoop`, freeing any resources used.
  214. If ``all_fds`` is true, all file descriptors registered on the
  215. IOLoop will be closed (not just the ones created by the
  216. `IOLoop` itself).
  217. Many applications will only use a single `IOLoop` that runs for the
  218. entire lifetime of the process. In that case closing the `IOLoop`
  219. is not necessary since everything will be cleaned up when the
  220. process exits. `IOLoop.close` is provided mainly for scenarios
  221. such as unit tests, which create and destroy a large number of
  222. ``IOLoops``.
  223. An `IOLoop` must be completely stopped before it can be closed. This
  224. means that `IOLoop.stop()` must be called *and* `IOLoop.start()` must
  225. be allowed to return before attempting to call `IOLoop.close()`.
  226. Therefore the call to `close` will usually appear just after
  227. the call to `start` rather than near the call to `stop`.
  228. .. versionchanged:: 3.1
  229. If the `IOLoop` implementation supports non-integer objects
  230. for "file descriptors", those objects will have their
  231. ``close`` method when ``all_fds`` is true.
  232. """
  233. raise NotImplementedError()
  234. def add_handler(self, fd, handler, events):
  235. """Registers the given handler to receive the given events for ``fd``.
  236. The ``fd`` argument may either be an integer file descriptor or
  237. a file-like object with a ``fileno()`` method (and optionally a
  238. ``close()`` method, which may be called when the `IOLoop` is shut
  239. down).
  240. The ``events`` argument is a bitwise or of the constants
  241. ``IOLoop.READ``, ``IOLoop.WRITE``, and ``IOLoop.ERROR``.
  242. When an event occurs, ``handler(fd, events)`` will be run.
  243. .. versionchanged:: 4.0
  244. Added the ability to pass file-like objects in addition to
  245. raw file descriptors.
  246. """
  247. raise NotImplementedError()
  248. def update_handler(self, fd, events):
  249. """Changes the events we listen for ``fd``.
  250. .. versionchanged:: 4.0
  251. Added the ability to pass file-like objects in addition to
  252. raw file descriptors.
  253. """
  254. raise NotImplementedError()
  255. def remove_handler(self, fd):
  256. """Stop listening for events on ``fd``.
  257. .. versionchanged:: 4.0
  258. Added the ability to pass file-like objects in addition to
  259. raw file descriptors.
  260. """
  261. raise NotImplementedError()
  262. def set_blocking_signal_threshold(self, seconds, action):
  263. """Sends a signal if the `IOLoop` is blocked for more than
  264. ``s`` seconds.
  265. Pass ``seconds=None`` to disable. Requires Python 2.6 on a unixy
  266. platform.
  267. The action parameter is a Python signal handler. Read the
  268. documentation for the `signal` module for more information.
  269. If ``action`` is None, the process will be killed if it is
  270. blocked for too long.
  271. """
  272. raise NotImplementedError()
  273. def set_blocking_log_threshold(self, seconds):
  274. """Logs a stack trace if the `IOLoop` is blocked for more than
  275. ``s`` seconds.
  276. Equivalent to ``set_blocking_signal_threshold(seconds,
  277. self.log_stack)``
  278. """
  279. self.set_blocking_signal_threshold(seconds, self.log_stack)
  280. def log_stack(self, signal, frame):
  281. """Signal handler to log the stack trace of the current thread.
  282. For use with `set_blocking_signal_threshold`.
  283. """
  284. gen_log.warning('IOLoop blocked for %f seconds in\n%s',
  285. self._blocking_signal_threshold,
  286. ''.join(traceback.format_stack(frame)))
  287. def start(self):
  288. """Starts the I/O loop.
  289. The loop will run until one of the callbacks calls `stop()`, which
  290. will make the loop stop after the current event iteration completes.
  291. """
  292. raise NotImplementedError()
  293. def _setup_logging(self):
  294. """The IOLoop catches and logs exceptions, so it's
  295. important that log output be visible. However, python's
  296. default behavior for non-root loggers (prior to python
  297. 3.2) is to print an unhelpful "no handlers could be
  298. found" message rather than the actual log entry, so we
  299. must explicitly configure logging if we've made it this
  300. far without anything.
  301. This method should be called from start() in subclasses.
  302. """
  303. if not any([logging.getLogger().handlers,
  304. logging.getLogger('tornado').handlers,
  305. logging.getLogger('tornado.application').handlers]):
  306. logging.basicConfig()
  307. def stop(self):
  308. """Stop the I/O loop.
  309. If the event loop is not currently running, the next call to `start()`
  310. will return immediately.
  311. To use asynchronous methods from otherwise-synchronous code (such as
  312. unit tests), you can start and stop the event loop like this::
  313. ioloop = IOLoop()
  314. async_method(ioloop=ioloop, callback=ioloop.stop)
  315. ioloop.start()
  316. ``ioloop.start()`` will return after ``async_method`` has run
  317. its callback, whether that callback was invoked before or
  318. after ``ioloop.start``.
  319. Note that even after `stop` has been called, the `IOLoop` is not
  320. completely stopped until `IOLoop.start` has also returned.
  321. Some work that was scheduled before the call to `stop` may still
  322. be run before the `IOLoop` shuts down.
  323. """
  324. raise NotImplementedError()
  325. def run_sync(self, func, timeout=None):
  326. """Starts the `IOLoop`, runs the given function, and stops the loop.
  327. The function must return either a yieldable object or
  328. ``None``. If the function returns a yieldable object, the
  329. `IOLoop` will run until the yieldable is resolved (and
  330. `run_sync()` will return the yieldable's result). If it raises
  331. an exception, the `IOLoop` will stop and the exception will be
  332. re-raised to the caller.
  333. The keyword-only argument ``timeout`` may be used to set
  334. a maximum duration for the function. If the timeout expires,
  335. a `TimeoutError` is raised.
  336. This method is useful in conjunction with `tornado.gen.coroutine`
  337. to allow asynchronous calls in a ``main()`` function::
  338. @gen.coroutine
  339. def main():
  340. # do stuff...
  341. if __name__ == '__main__':
  342. IOLoop.current().run_sync(main)
  343. .. versionchanged:: 4.3
  344. Returning a non-``None``, non-yieldable value is now an error.
  345. """
  346. future_cell = [None]
  347. def run():
  348. try:
  349. result = func()
  350. if result is not None:
  351. from tornado.gen import convert_yielded
  352. result = convert_yielded(result)
  353. except Exception:
  354. future_cell[0] = TracebackFuture()
  355. future_cell[0].set_exc_info(sys.exc_info())
  356. else:
  357. if is_future(result):
  358. future_cell[0] = result
  359. else:
  360. future_cell[0] = TracebackFuture()
  361. future_cell[0].set_result(result)
  362. self.add_future(future_cell[0], lambda future: self.stop())
  363. self.add_callback(run)
  364. if timeout is not None:
  365. timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
  366. self.start()
  367. if timeout is not None:
  368. self.remove_timeout(timeout_handle)
  369. if not future_cell[0].done():
  370. raise TimeoutError('Operation timed out after %s seconds' % timeout)
  371. return future_cell[0].result()
  372. def time(self):
  373. """Returns the current time according to the `IOLoop`'s clock.
  374. The return value is a floating-point number relative to an
  375. unspecified time in the past.
  376. By default, the `IOLoop`'s time function is `time.time`. However,
  377. it may be configured to use e.g. `time.monotonic` instead.
  378. Calls to `add_timeout` that pass a number instead of a
  379. `datetime.timedelta` should use this function to compute the
  380. appropriate time, so they can work no matter what time function
  381. is chosen.
  382. """
  383. return time.time()
  384. def add_timeout(self, deadline, callback, *args, **kwargs):
  385. """Runs the ``callback`` at the time ``deadline`` from the I/O loop.
  386. Returns an opaque handle that may be passed to
  387. `remove_timeout` to cancel.
  388. ``deadline`` may be a number denoting a time (on the same
  389. scale as `IOLoop.time`, normally `time.time`), or a
  390. `datetime.timedelta` object for a deadline relative to the
  391. current time. Since Tornado 4.0, `call_later` is a more
  392. convenient alternative for the relative case since it does not
  393. require a timedelta object.
  394. Note that it is not safe to call `add_timeout` from other threads.
  395. Instead, you must use `add_callback` to transfer control to the
  396. `IOLoop`'s thread, and then call `add_timeout` from there.
  397. Subclasses of IOLoop must implement either `add_timeout` or
  398. `call_at`; the default implementations of each will call
  399. the other. `call_at` is usually easier to implement, but
  400. subclasses that wish to maintain compatibility with Tornado
  401. versions prior to 4.0 must use `add_timeout` instead.
  402. .. versionchanged:: 4.0
  403. Now passes through ``*args`` and ``**kwargs`` to the callback.
  404. """
  405. if isinstance(deadline, numbers.Real):
  406. return self.call_at(deadline, callback, *args, **kwargs)
  407. elif isinstance(deadline, datetime.timedelta):
  408. return self.call_at(self.time() + timedelta_to_seconds(deadline),
  409. callback, *args, **kwargs)
  410. else:
  411. raise TypeError("Unsupported deadline %r" % deadline)
  412. def call_later(self, delay, callback, *args, **kwargs):
  413. """Runs the ``callback`` after ``delay`` seconds have passed.
  414. Returns an opaque handle that may be passed to `remove_timeout`
  415. to cancel. Note that unlike the `asyncio` method of the same
  416. name, the returned object does not have a ``cancel()`` method.
  417. See `add_timeout` for comments on thread-safety and subclassing.
  418. .. versionadded:: 4.0
  419. """
  420. return self.call_at(self.time() + delay, callback, *args, **kwargs)
  421. def call_at(self, when, callback, *args, **kwargs):
  422. """Runs the ``callback`` at the absolute time designated by ``when``.
  423. ``when`` must be a number using the same reference point as
  424. `IOLoop.time`.
  425. Returns an opaque handle that may be passed to `remove_timeout`
  426. to cancel. Note that unlike the `asyncio` method of the same
  427. name, the returned object does not have a ``cancel()`` method.
  428. See `add_timeout` for comments on thread-safety and subclassing.
  429. .. versionadded:: 4.0
  430. """
  431. return self.add_timeout(when, callback, *args, **kwargs)
  432. def remove_timeout(self, timeout):
  433. """Cancels a pending timeout.
  434. The argument is a handle as returned by `add_timeout`. It is
  435. safe to call `remove_timeout` even if the callback has already
  436. been run.
  437. """
  438. raise NotImplementedError()
  439. def add_callback(self, callback, *args, **kwargs):
  440. """Calls the given callback on the next I/O loop iteration.
  441. It is safe to call this method from any thread at any time,
  442. except from a signal handler. Note that this is the **only**
  443. method in `IOLoop` that makes this thread-safety guarantee; all
  444. other interaction with the `IOLoop` must be done from that
  445. `IOLoop`'s thread. `add_callback()` may be used to transfer
  446. control from other threads to the `IOLoop`'s thread.
  447. To add a callback from a signal handler, see
  448. `add_callback_from_signal`.
  449. """
  450. raise NotImplementedError()
  451. def add_callback_from_signal(self, callback, *args, **kwargs):
  452. """Calls the given callback on the next I/O loop iteration.
  453. Safe for use from a Python signal handler; should not be used
  454. otherwise.
  455. Callbacks added with this method will be run without any
  456. `.stack_context`, to avoid picking up the context of the function
  457. that was interrupted by the signal.
  458. """
  459. raise NotImplementedError()
  460. def spawn_callback(self, callback, *args, **kwargs):
  461. """Calls the given callback on the next IOLoop iteration.
  462. Unlike all other callback-related methods on IOLoop,
  463. ``spawn_callback`` does not associate the callback with its caller's
  464. ``stack_context``, so it is suitable for fire-and-forget callbacks
  465. that should not interfere with the caller.
  466. .. versionadded:: 4.0
  467. """
  468. with stack_context.NullContext():
  469. self.add_callback(callback, *args, **kwargs)
  470. def add_future(self, future, callback):
  471. """Schedules a callback on the ``IOLoop`` when the given
  472. `.Future` is finished.
  473. The callback is invoked with one argument, the
  474. `.Future`.
  475. """
  476. assert is_future(future)
  477. callback = stack_context.wrap(callback)
  478. future.add_done_callback(
  479. lambda future: self.add_callback(callback, future))
  480. def _run_callback(self, callback):
  481. """Runs a callback with error handling.
  482. For use in subclasses.
  483. """
  484. try:
  485. ret = callback()
  486. if ret is not None:
  487. from tornado import gen
  488. # Functions that return Futures typically swallow all
  489. # exceptions and store them in the Future. If a Future
  490. # makes it out to the IOLoop, ensure its exception (if any)
  491. # gets logged too.
  492. try:
  493. ret = gen.convert_yielded(ret)
  494. except gen.BadYieldError:
  495. # It's not unusual for add_callback to be used with
  496. # methods returning a non-None and non-yieldable
  497. # result, which should just be ignored.
  498. pass
  499. else:
  500. self.add_future(ret, lambda f: f.result())
  501. except Exception:
  502. self.handle_callback_exception(callback)
  503. def handle_callback_exception(self, callback):
  504. """This method is called whenever a callback run by the `IOLoop`
  505. throws an exception.
  506. By default simply logs the exception as an error. Subclasses
  507. may override this method to customize reporting of exceptions.
  508. The exception itself is not passed explicitly, but is available
  509. in `sys.exc_info`.
  510. """
  511. app_log.error("Exception in callback %r", callback, exc_info=True)
  512. def split_fd(self, fd):
  513. """Returns an (fd, obj) pair from an ``fd`` parameter.
  514. We accept both raw file descriptors and file-like objects as
  515. input to `add_handler` and related methods. When a file-like
  516. object is passed, we must retain the object itself so we can
  517. close it correctly when the `IOLoop` shuts down, but the
  518. poller interfaces favor file descriptors (they will accept
  519. file-like objects and call ``fileno()`` for you, but they
  520. always return the descriptor itself).
  521. This method is provided for use by `IOLoop` subclasses and should
  522. not generally be used by application code.
  523. .. versionadded:: 4.0
  524. """
  525. try:
  526. return fd.fileno(), fd
  527. except AttributeError:
  528. return fd, fd
  529. def close_fd(self, fd):
  530. """Utility method to close an ``fd``.
  531. If ``fd`` is a file-like object, we close it directly; otherwise
  532. we use `os.close`.
  533. This method is provided for use by `IOLoop` subclasses (in
  534. implementations of ``IOLoop.close(all_fds=True)`` and should
  535. not generally be used by application code.
  536. .. versionadded:: 4.0
  537. """
  538. try:
  539. try:
  540. fd.close()
  541. except AttributeError:
  542. os.close(fd)
  543. except OSError:
  544. pass
  545. class PollIOLoop(IOLoop):
  546. """Base class for IOLoops built around a select-like function.
  547. For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
  548. (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
  549. `tornado.platform.select.SelectIOLoop` (all platforms).
  550. """
  551. def initialize(self, impl, time_func=None, **kwargs):
  552. super(PollIOLoop, self).initialize(**kwargs)
  553. self._impl = impl
  554. if hasattr(self._impl, 'fileno'):
  555. set_close_exec(self._impl.fileno())
  556. self.time_func = time_func or time.time
  557. self._handlers = {}
  558. self._events = {}
  559. self._callbacks = []
  560. self._callback_lock = threading.Lock()
  561. self._timeouts = []
  562. self._cancellations = 0
  563. self._running = False
  564. self._stopped = False
  565. self._closing = False
  566. self._thread_ident = None
  567. self._blocking_signal_threshold = None
  568. self._timeout_counter = itertools.count()
  569. # Create a pipe that we send bogus data to when we want to wake
  570. # the I/O loop when it is idle
  571. self._waker = Waker()
  572. self.add_handler(self._waker.fileno(),
  573. lambda fd, events: self._waker.consume(),
  574. self.READ)
  575. def close(self, all_fds=False):
  576. with self._callback_lock:
  577. self._closing = True
  578. self.remove_handler(self._waker.fileno())
  579. if all_fds:
  580. for fd, handler in self._handlers.values():
  581. self.close_fd(fd)
  582. self._waker.close()
  583. self._impl.close()
  584. self._callbacks = None
  585. self._timeouts = None
  586. def add_handler(self, fd, handler, events):
  587. fd, obj = self.split_fd(fd)
  588. self._handlers[fd] = (obj, stack_context.wrap(handler))
  589. self._impl.register(fd, events | self.ERROR)
  590. def update_handler(self, fd, events):
  591. fd, obj = self.split_fd(fd)
  592. self._impl.modify(fd, events | self.ERROR)
  593. def remove_handler(self, fd):
  594. fd, obj = self.split_fd(fd)
  595. self._handlers.pop(fd, None)
  596. self._events.pop(fd, None)
  597. try:
  598. self._impl.unregister(fd)
  599. except Exception:
  600. gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
  601. def set_blocking_signal_threshold(self, seconds, action):
  602. if not hasattr(signal, "setitimer"):
  603. gen_log.error("set_blocking_signal_threshold requires a signal module "
  604. "with the setitimer method")
  605. return
  606. self._blocking_signal_threshold = seconds
  607. if seconds is not None:
  608. signal.signal(signal.SIGALRM,
  609. action if action is not None else signal.SIG_DFL)
  610. def start(self):
  611. if self._running:
  612. raise RuntimeError("IOLoop is already running")
  613. self._setup_logging()
  614. if self._stopped:
  615. self._stopped = False
  616. return
  617. old_current = getattr(IOLoop._current, "instance", None)
  618. IOLoop._current.instance = self
  619. self._thread_ident = thread.get_ident()
  620. self._running = True
  621. # signal.set_wakeup_fd closes a race condition in event loops:
  622. # a signal may arrive at the beginning of select/poll/etc
  623. # before it goes into its interruptible sleep, so the signal
  624. # will be consumed without waking the select. The solution is
  625. # for the (C, synchronous) signal handler to write to a pipe,
  626. # which will then be seen by select.
  627. #
  628. # In python's signal handling semantics, this only matters on the
  629. # main thread (fortunately, set_wakeup_fd only works on the main
  630. # thread and will raise a ValueError otherwise).
  631. #
  632. # If someone has already set a wakeup fd, we don't want to
  633. # disturb it. This is an issue for twisted, which does its
  634. # SIGCHLD processing in response to its own wakeup fd being
  635. # written to. As long as the wakeup fd is registered on the IOLoop,
  636. # the loop will still wake up and everything should work.
  637. old_wakeup_fd = None
  638. if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
  639. # requires python 2.6+, unix. set_wakeup_fd exists but crashes
  640. # the python process on windows.
  641. try:
  642. old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
  643. if old_wakeup_fd != -1:
  644. # Already set, restore previous value. This is a little racy,
  645. # but there's no clean get_wakeup_fd and in real use the
  646. # IOLoop is just started once at the beginning.
  647. signal.set_wakeup_fd(old_wakeup_fd)
  648. old_wakeup_fd = None
  649. except ValueError:
  650. # Non-main thread, or the previous value of wakeup_fd
  651. # is no longer valid.
  652. old_wakeup_fd = None
  653. try:
  654. while True:
  655. # Prevent IO event starvation by delaying new callbacks
  656. # to the next iteration of the event loop.
  657. with self._callback_lock:
  658. callbacks = self._callbacks
  659. self._callbacks = []
  660. # Add any timeouts that have come due to the callback list.
  661. # Do not run anything until we have determined which ones
  662. # are ready, so timeouts that call add_timeout cannot
  663. # schedule anything in this iteration.
  664. due_timeouts = []
  665. if self._timeouts:
  666. now = self.time()
  667. while self._timeouts:
  668. if self._timeouts[0].callback is None:
  669. # The timeout was cancelled. Note that the
  670. # cancellation check is repeated below for timeouts
  671. # that are cancelled by another timeout or callback.
  672. heapq.heappop(self._timeouts)
  673. self._cancellations -= 1
  674. elif self._timeouts[0].deadline <= now:
  675. due_timeouts.append(heapq.heappop(self._timeouts))
  676. else:
  677. break
  678. if (self._cancellations > 512
  679. and self._cancellations > (len(self._timeouts) >> 1)):
  680. # Clean up the timeout queue when it gets large and it's
  681. # more than half cancellations.
  682. self._cancellations = 0
  683. self._timeouts = [x for x in self._timeouts
  684. if x.callback is not None]
  685. heapq.heapify(self._timeouts)
  686. for callback in callbacks:
  687. self._run_callback(callback)
  688. for timeout in due_timeouts:
  689. if timeout.callback is not None:
  690. self._run_callback(timeout.callback)
  691. # Closures may be holding on to a lot of memory, so allow
  692. # them to be freed before we go into our poll wait.
  693. callbacks = callback = due_timeouts = timeout = None
  694. if self._callbacks:
  695. # If any callbacks or timeouts called add_callback,
  696. # we don't want to wait in poll() before we run them.
  697. poll_timeout = 0.0
  698. elif self._timeouts:
  699. # If there are any timeouts, schedule the first one.
  700. # Use self.time() instead of 'now' to account for time
  701. # spent running callbacks.
  702. poll_timeout = self._timeouts[0].deadline - self.time()
  703. poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
  704. else:
  705. # No timeouts and no callbacks, so use the default.
  706. poll_timeout = _POLL_TIMEOUT
  707. if not self._running:
  708. break
  709. if self._blocking_signal_threshold is not None:
  710. # clear alarm so it doesn't fire while poll is waiting for
  711. # events.
  712. signal.setitimer(signal.ITIMER_REAL, 0, 0)
  713. try:
  714. event_pairs = self._impl.poll(poll_timeout)
  715. except Exception as e:
  716. # Depending on python version and IOLoop implementation,
  717. # different exception types may be thrown and there are
  718. # two ways EINTR might be signaled:
  719. # * e.errno == errno.EINTR
  720. # * e.args is like (errno.EINTR, 'Interrupted system call')
  721. if errno_from_exception(e) == errno.EINTR:
  722. continue
  723. else:
  724. raise
  725. if self._blocking_signal_threshold is not None:
  726. signal.setitimer(signal.ITIMER_REAL,
  727. self._blocking_signal_threshold, 0)
  728. # Pop one fd at a time from the set of pending fds and run
  729. # its handler. Since that handler may perform actions on
  730. # other file descriptors, there may be reentrant calls to
  731. # this IOLoop that update self._events
  732. self._events.update(event_pairs)
  733. while self._events:
  734. fd, events = self._events.popitem()
  735. try:
  736. fd_obj, handler_func = self._handlers[fd]
  737. handler_func(fd_obj, events)
  738. except (OSError, IOError) as e:
  739. if errno_from_exception(e) == errno.EPIPE:
  740. # Happens when the client closes the connection
  741. pass
  742. else:
  743. self.handle_callback_exception(self._handlers.get(fd))
  744. except Exception:
  745. self.handle_callback_exception(self._handlers.get(fd))
  746. fd_obj = handler_func = None
  747. finally:
  748. # reset the stopped flag so another start/stop pair can be issued
  749. self._stopped = False
  750. if self._blocking_signal_threshold is not None:
  751. signal.setitimer(signal.ITIMER_REAL, 0, 0)
  752. IOLoop._current.instance = old_current
  753. if old_wakeup_fd is not None:
  754. signal.set_wakeup_fd(old_wakeup_fd)
  755. def stop(self):
  756. self._running = False
  757. self._stopped = True
  758. self._waker.wake()
  759. def time(self):
  760. return self.time_func()
  761. def call_at(self, deadline, callback, *args, **kwargs):
  762. timeout = _Timeout(
  763. deadline,
  764. functools.partial(stack_context.wrap(callback), *args, **kwargs),
  765. self)
  766. heapq.heappush(self._timeouts, timeout)
  767. return timeout
  768. def remove_timeout(self, timeout):
  769. # Removing from a heap is complicated, so just leave the defunct
  770. # timeout object in the queue (see discussion in
  771. # http://docs.python.org/library/heapq.html).
  772. # If this turns out to be a problem, we could add a garbage
  773. # collection pass whenever there are too many dead timeouts.
  774. timeout.callback = None
  775. self._cancellations += 1
  776. def add_callback(self, callback, *args, **kwargs):
  777. if thread.get_ident() != self._thread_ident:
  778. # If we're not on the IOLoop's thread, we need to synchronize
  779. # with other threads, or waking logic will induce a race.
  780. with self._callback_lock:
  781. if self._closing:
  782. return
  783. list_empty = not self._callbacks
  784. self._callbacks.append(functools.partial(
  785. stack_context.wrap(callback), *args, **kwargs))
  786. if list_empty:
  787. # If we're not in the IOLoop's thread, and we added the
  788. # first callback to an empty list, we may need to wake it
  789. # up (it may wake up on its own, but an occasional extra
  790. # wake is harmless). Waking up a polling IOLoop is
  791. # relatively expensive, so we try to avoid it when we can.
  792. self._waker.wake()
  793. else:
  794. if self._closing:
  795. return
  796. # If we're on the IOLoop's thread, we don't need the lock,
  797. # since we don't need to wake anyone, just add the
  798. # callback. Blindly insert into self._callbacks. This is
  799. # safe even from signal handlers because the GIL makes
  800. # list.append atomic. One subtlety is that if the signal
  801. # is interrupting another thread holding the
  802. # _callback_lock block in IOLoop.start, we may modify
  803. # either the old or new version of self._callbacks, but
  804. # either way will work.
  805. self._callbacks.append(functools.partial(
  806. stack_context.wrap(callback), *args, **kwargs))
  807. def add_callback_from_signal(self, callback, *args, **kwargs):
  808. with stack_context.NullContext():
  809. self.add_callback(callback, *args, **kwargs)
  810. class _Timeout(object):
  811. """An IOLoop timeout, a UNIX timestamp and a callback"""
  812. # Reduce memory overhead when there are lots of pending callbacks
  813. __slots__ = ['deadline', 'callback', 'tiebreaker']
  814. def __init__(self, deadline, callback, io_loop):
  815. if not isinstance(deadline, numbers.Real):
  816. raise TypeError("Unsupported deadline %r" % deadline)
  817. self.deadline = deadline
  818. self.callback = callback
  819. self.tiebreaker = next(io_loop._timeout_counter)
  820. # Comparison methods to sort by deadline, with object id as a tiebreaker
  821. # to guarantee a consistent ordering. The heapq module uses __le__
  822. # in python2.5, and __lt__ in 2.6+ (sort() and most other comparisons
  823. # use __lt__).
  824. def __lt__(self, other):
  825. return ((self.deadline, self.tiebreaker) <
  826. (other.deadline, other.tiebreaker))
  827. def __le__(self, other):
  828. return ((self.deadline, self.tiebreaker) <=
  829. (other.deadline, other.tiebreaker))
  830. class PeriodicCallback(object):
  831. """Schedules the given callback to be called periodically.
  832. The callback is called every ``callback_time`` milliseconds.
  833. Note that the timeout is given in milliseconds, while most other
  834. time-related functions in Tornado use seconds.
  835. If the callback runs for longer than ``callback_time`` milliseconds,
  836. subsequent invocations will be skipped to get back on schedule.
  837. `start` must be called after the `PeriodicCallback` is created.
  838. .. versionchanged:: 4.1
  839. The ``io_loop`` argument is deprecated.
  840. """
  841. def __init__(self, callback, callback_time, io_loop=None):
  842. self.callback = callback
  843. if callback_time <= 0:
  844. raise ValueError("Periodic callback must have a positive callback_time")
  845. self.callback_time = callback_time
  846. self.io_loop = io_loop or IOLoop.current()
  847. self._running = False
  848. self._timeout = None
  849. def start(self):
  850. """Starts the timer."""
  851. self._running = True
  852. self._next_timeout = self.io_loop.time()
  853. self._schedule_next()
  854. def stop(self):
  855. """Stops the timer."""
  856. self._running = False
  857. if self._timeout is not None:
  858. self.io_loop.remove_timeout(self._timeout)
  859. self._timeout = None
  860. def is_running(self):
  861. """Return True if this `.PeriodicCallback` has been started.
  862. .. versionadded:: 4.1
  863. """
  864. return self._running
  865. def _run(self):
  866. if not self._running:
  867. return
  868. try:
  869. return self.callback()
  870. except Exception:
  871. self.io_loop.handle_callback_exception(self.callback)
  872. finally:
  873. self._schedule_next()
  874. def _schedule_next(self):
  875. if self._running:
  876. current_time = self.io_loop.time()
  877. if self._next_timeout <= current_time:
  878. callback_time_sec = self.callback_time / 1000.0
  879. self._next_timeout += (math.floor((current_time - self._next_timeout) / callback_time_sec) + 1) * callback_time_sec
  880. self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)