twisted.py 22 KB


  1. # Author: Ovidiu Predescu
  2. # Date: July 2011
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """Bridges between the Twisted reactor and Tornado IOLoop.
  16. This module lets you run applications and libraries written for
  17. Twisted in a Tornado application. It can be used in two modes,
  18. depending on which library's underlying event loop you want to use.
  19. This module has been tested with Twisted versions 11.0.0 and newer.
  20. """
  21. from __future__ import absolute_import, division, print_function
  22. import datetime
  23. import functools
  24. import numbers
  25. import socket
  26. import sys
  27. import twisted.internet.abstract # type: ignore
  28. from twisted.internet.defer import Deferred # type: ignore
  29. from twisted.internet.posixbase import PosixReactorBase # type: ignore
  30. from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime, IReadDescriptor, IWriteDescriptor # type: ignore # noqa: E501
  31. from twisted.python import failure, log # type: ignore
  32. from twisted.internet import error # type: ignore
  33. import twisted.names.cache # type: ignore
  34. import twisted.names.client # type: ignore
  35. import twisted.names.hosts # type: ignore
  36. import twisted.names.resolve # type: ignore
  37. from zope.interface import implementer # type: ignore
  38. from tornado.concurrent import Future, future_set_exc_info
  39. from tornado.escape import utf8
  40. from tornado import gen
  41. import tornado.ioloop
  42. from tornado.log import app_log
  43. from tornado.netutil import Resolver
  44. from tornado.stack_context import NullContext, wrap
  45. from tornado.ioloop import IOLoop
  46. from tornado.util import timedelta_to_seconds
  47. @implementer(IDelayedCall)
  48. class TornadoDelayedCall(object):
  49. """DelayedCall object for Tornado."""
  50. def __init__(self, reactor, seconds, f, *args, **kw):
  51. self._reactor = reactor
  52. self._func = functools.partial(f, *args, **kw)
  53. self._time = self._reactor.seconds() + seconds
  54. self._timeout = self._reactor._io_loop.add_timeout(self._time,
  55. self._called)
  56. self._active = True
  57. def _called(self):
  58. self._active = False
  59. self._reactor._removeDelayedCall(self)
  60. try:
  61. self._func()
  62. except:
  63. app_log.error("_called caught exception", exc_info=True)
  64. def getTime(self):
  65. return self._time
  66. def cancel(self):
  67. self._active = False
  68. self._reactor._io_loop.remove_timeout(self._timeout)
  69. self._reactor._removeDelayedCall(self)
  70. def delay(self, seconds):
  71. self._reactor._io_loop.remove_timeout(self._timeout)
  72. self._time += seconds
  73. self._timeout = self._reactor._io_loop.add_timeout(self._time,
  74. self._called)
  75. def reset(self, seconds):
  76. self._reactor._io_loop.remove_timeout(self._timeout)
  77. self._time = self._reactor.seconds() + seconds
  78. self._timeout = self._reactor._io_loop.add_timeout(self._time,
  79. self._called)
  80. def active(self):
  81. return self._active
  82. @implementer(IReactorTime, IReactorFDSet)
  83. class TornadoReactor(PosixReactorBase):
  84. """Twisted reactor built on the Tornado IOLoop.
  85. `TornadoReactor` implements the Twisted reactor interface on top of
  86. the Tornado IOLoop. To use it, simply call `install` at the beginning
  87. of the application::
  88. import tornado.platform.twisted
  89. tornado.platform.twisted.install()
  90. from twisted.internet import reactor
  91. When the app is ready to start, call ``IOLoop.current().start()``
  92. instead of ``reactor.run()``.
  93. It is also possible to create a non-global reactor by calling
  94. ``tornado.platform.twisted.TornadoReactor()``. However, if
  95. the `.IOLoop` and reactor are to be short-lived (such as those used in
  96. unit tests), additional cleanup may be required. Specifically, it is
  97. recommended to call::
  98. reactor.fireSystemEvent('shutdown')
  99. reactor.disconnectAll()
  100. before closing the `.IOLoop`.
  101. .. versionchanged:: 5.0
  102. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  103. .. deprecated:: 5.1
  104. This class will be removed in Tornado 6.0. Use
  105. ``twisted.internet.asyncioreactor.AsyncioSelectorReactor``
  106. instead.
  107. """
  108. def __init__(self):
  109. self._io_loop = tornado.ioloop.IOLoop.current()
  110. self._readers = {} # map of reader objects to fd
  111. self._writers = {} # map of writer objects to fd
  112. self._fds = {} # a map of fd to a (reader, writer) tuple
  113. self._delayedCalls = {}
  114. PosixReactorBase.__init__(self)
  115. self.addSystemEventTrigger('during', 'shutdown', self.crash)
  116. # IOLoop.start() bypasses some of the reactor initialization.
  117. # Fire off the necessary events if they weren't already triggered
  118. # by reactor.run().
  119. def start_if_necessary():
  120. if not self._started:
  121. self.fireSystemEvent('startup')
  122. self._io_loop.add_callback(start_if_necessary)
  123. # IReactorTime
  124. def seconds(self):
  125. return self._io_loop.time()
  126. def callLater(self, seconds, f, *args, **kw):
  127. dc = TornadoDelayedCall(self, seconds, f, *args, **kw)
  128. self._delayedCalls[dc] = True
  129. return dc
  130. def getDelayedCalls(self):
  131. return [x for x in self._delayedCalls if x._active]
  132. def _removeDelayedCall(self, dc):
  133. if dc in self._delayedCalls:
  134. del self._delayedCalls[dc]
  135. # IReactorThreads
  136. def callFromThread(self, f, *args, **kw):
  137. assert callable(f), "%s is not callable" % f
  138. with NullContext():
  139. # This NullContext is mainly for an edge case when running
  140. # TwistedIOLoop on top of a TornadoReactor.
  141. # TwistedIOLoop.add_callback uses reactor.callFromThread and
  142. # should not pick up additional StackContexts along the way.
  143. self._io_loop.add_callback(f, *args, **kw)
  144. # We don't need the waker code from the super class, Tornado uses
  145. # its own waker.
  146. def installWaker(self):
  147. pass
  148. def wakeUp(self):
  149. pass
  150. # IReactorFDSet
  151. def _invoke_callback(self, fd, events):
  152. if fd not in self._fds:
  153. return
  154. (reader, writer) = self._fds[fd]
  155. if reader:
  156. err = None
  157. if reader.fileno() == -1:
  158. err = error.ConnectionLost()
  159. elif events & IOLoop.READ:
  160. err = log.callWithLogger(reader, reader.doRead)
  161. if err is None and events & IOLoop.ERROR:
  162. err = error.ConnectionLost()
  163. if err is not None:
  164. self.removeReader(reader)
  165. reader.readConnectionLost(failure.Failure(err))
  166. if writer:
  167. err = None
  168. if writer.fileno() == -1:
  169. err = error.ConnectionLost()
  170. elif events & IOLoop.WRITE:
  171. err = log.callWithLogger(writer, writer.doWrite)
  172. if err is None and events & IOLoop.ERROR:
  173. err = error.ConnectionLost()
  174. if err is not None:
  175. self.removeWriter(writer)
  176. writer.writeConnectionLost(failure.Failure(err))
  177. def addReader(self, reader):
  178. if reader in self._readers:
  179. # Don't add the reader if it's already there
  180. return
  181. fd = reader.fileno()
  182. self._readers[reader] = fd
  183. if fd in self._fds:
  184. (_, writer) = self._fds[fd]
  185. self._fds[fd] = (reader, writer)
  186. if writer:
  187. # We already registered this fd for write events,
  188. # update it for read events as well.
  189. self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
  190. else:
  191. with NullContext():
  192. self._fds[fd] = (reader, None)
  193. self._io_loop.add_handler(fd, self._invoke_callback,
  194. IOLoop.READ)
  195. def addWriter(self, writer):
  196. if writer in self._writers:
  197. return
  198. fd = writer.fileno()
  199. self._writers[writer] = fd
  200. if fd in self._fds:
  201. (reader, _) = self._fds[fd]
  202. self._fds[fd] = (reader, writer)
  203. if reader:
  204. # We already registered this fd for read events,
  205. # update it for write events as well.
  206. self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
  207. else:
  208. with NullContext():
  209. self._fds[fd] = (None, writer)
  210. self._io_loop.add_handler(fd, self._invoke_callback,
  211. IOLoop.WRITE)
  212. def removeReader(self, reader):
  213. if reader in self._readers:
  214. fd = self._readers.pop(reader)
  215. (_, writer) = self._fds[fd]
  216. if writer:
  217. # We have a writer so we need to update the IOLoop for
  218. # write events only.
  219. self._fds[fd] = (None, writer)
  220. self._io_loop.update_handler(fd, IOLoop.WRITE)
  221. else:
  222. # Since we have no writer registered, we remove the
  223. # entry from _fds and unregister the handler from the
  224. # IOLoop
  225. del self._fds[fd]
  226. self._io_loop.remove_handler(fd)
  227. def removeWriter(self, writer):
  228. if writer in self._writers:
  229. fd = self._writers.pop(writer)
  230. (reader, _) = self._fds[fd]
  231. if reader:
  232. # We have a reader so we need to update the IOLoop for
  233. # read events only.
  234. self._fds[fd] = (reader, None)
  235. self._io_loop.update_handler(fd, IOLoop.READ)
  236. else:
  237. # Since we have no reader registered, we remove the
  238. # entry from the _fds and unregister the handler from
  239. # the IOLoop.
  240. del self._fds[fd]
  241. self._io_loop.remove_handler(fd)
  242. def removeAll(self):
  243. return self._removeAll(self._readers, self._writers)
  244. def getReaders(self):
  245. return self._readers.keys()
  246. def getWriters(self):
  247. return self._writers.keys()
  248. # The following functions are mainly used in twisted-style test cases;
  249. # it is expected that most users of the TornadoReactor will call
  250. # IOLoop.start() instead of Reactor.run().
  251. def stop(self):
  252. PosixReactorBase.stop(self)
  253. fire_shutdown = functools.partial(self.fireSystemEvent, "shutdown")
  254. self._io_loop.add_callback(fire_shutdown)
  255. def crash(self):
  256. PosixReactorBase.crash(self)
  257. self._io_loop.stop()
  258. def doIteration(self, delay):
  259. raise NotImplementedError("doIteration")
  260. def mainLoop(self):
  261. # Since this class is intended to be used in applications
  262. # where the top-level event loop is ``io_loop.start()`` rather
  263. # than ``reactor.run()``, it is implemented a little
  264. # differently than other Twisted reactors. We override
  265. # ``mainLoop`` instead of ``doIteration`` and must implement
  266. # timed call functionality on top of `.IOLoop.add_timeout`
  267. # rather than using the implementation in
  268. # ``PosixReactorBase``.
  269. self._io_loop.start()
  270. class _TestReactor(TornadoReactor):
  271. """Subclass of TornadoReactor for use in unittests.
  272. This can't go in the test.py file because of import-order dependencies
  273. with the Twisted reactor test builder.
  274. """
  275. def __init__(self):
  276. # always use a new ioloop
  277. IOLoop.clear_current()
  278. IOLoop(make_current=True)
  279. super(_TestReactor, self).__init__()
  280. IOLoop.clear_current()
  281. def listenTCP(self, port, factory, backlog=50, interface=''):
  282. # default to localhost to avoid firewall prompts on the mac
  283. if not interface:
  284. interface = '127.0.0.1'
  285. return super(_TestReactor, self).listenTCP(
  286. port, factory, backlog=backlog, interface=interface)
  287. def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
  288. if not interface:
  289. interface = '127.0.0.1'
  290. return super(_TestReactor, self).listenUDP(
  291. port, protocol, interface=interface, maxPacketSize=maxPacketSize)
  292. def install():
  293. """Install this package as the default Twisted reactor.
  294. ``install()`` must be called very early in the startup process,
  295. before most other twisted-related imports. Conversely, because it
  296. initializes the `.IOLoop`, it cannot be called before
  297. `.fork_processes` or multi-process `~.TCPServer.start`. These
  298. conflicting requirements make it difficult to use `.TornadoReactor`
  299. in multi-process mode, and an external process manager such as
  300. ``supervisord`` is recommended instead.
  301. .. versionchanged:: 5.0
  302. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  303. .. deprecated:: 5.1
  304. This functio will be removed in Tornado 6.0. Use
  305. ``twisted.internet.asyncioreactor.install`` instead.
  306. """
  307. reactor = TornadoReactor()
  308. from twisted.internet.main import installReactor # type: ignore
  309. installReactor(reactor)
  310. return reactor
  311. @implementer(IReadDescriptor, IWriteDescriptor)
  312. class _FD(object):
  313. def __init__(self, fd, fileobj, handler):
  314. self.fd = fd
  315. self.fileobj = fileobj
  316. self.handler = handler
  317. self.reading = False
  318. self.writing = False
  319. self.lost = False
  320. def fileno(self):
  321. return self.fd
  322. def doRead(self):
  323. if not self.lost:
  324. self.handler(self.fileobj, tornado.ioloop.IOLoop.READ)
  325. def doWrite(self):
  326. if not self.lost:
  327. self.handler(self.fileobj, tornado.ioloop.IOLoop.WRITE)
  328. def connectionLost(self, reason):
  329. if not self.lost:
  330. self.handler(self.fileobj, tornado.ioloop.IOLoop.ERROR)
  331. self.lost = True
  332. writeConnectionLost = readConnectionLost = connectionLost
  333. def logPrefix(self):
  334. return ''
  335. class TwistedIOLoop(tornado.ioloop.IOLoop):
  336. """IOLoop implementation that runs on Twisted.
  337. `TwistedIOLoop` implements the Tornado IOLoop interface on top of
  338. the Twisted reactor. Recommended usage::
  339. from tornado.platform.twisted import TwistedIOLoop
  340. from twisted.internet import reactor
  341. TwistedIOLoop().install()
  342. # Set up your tornado application as usual using `IOLoop.instance`
  343. reactor.run()
  344. Uses the global Twisted reactor by default. To create multiple
  345. ``TwistedIOLoops`` in the same process, you must pass a unique reactor
  346. when constructing each one.
  347. Not compatible with `tornado.process.Subprocess.set_exit_callback`
  348. because the ``SIGCHLD`` handlers used by Tornado and Twisted conflict
  349. with each other.
  350. See also :meth:`tornado.ioloop.IOLoop.install` for general notes on
  351. installing alternative IOLoops.
  352. .. deprecated:: 5.1
  353. The `asyncio` event loop will be the only available implementation in
  354. Tornado 6.0.
  355. """
  356. def initialize(self, reactor=None, **kwargs):
  357. super(TwistedIOLoop, self).initialize(**kwargs)
  358. if reactor is None:
  359. import twisted.internet.reactor # type: ignore
  360. reactor = twisted.internet.reactor
  361. self.reactor = reactor
  362. self.fds = {}
  363. def close(self, all_fds=False):
  364. fds = self.fds
  365. self.reactor.removeAll()
  366. for c in self.reactor.getDelayedCalls():
  367. c.cancel()
  368. if all_fds:
  369. for fd in fds.values():
  370. self.close_fd(fd.fileobj)
  371. def add_handler(self, fd, handler, events):
  372. if fd in self.fds:
  373. raise ValueError('fd %s added twice' % fd)
  374. fd, fileobj = self.split_fd(fd)
  375. self.fds[fd] = _FD(fd, fileobj, wrap(handler))
  376. if events & tornado.ioloop.IOLoop.READ:
  377. self.fds[fd].reading = True
  378. self.reactor.addReader(self.fds[fd])
  379. if events & tornado.ioloop.IOLoop.WRITE:
  380. self.fds[fd].writing = True
  381. self.reactor.addWriter(self.fds[fd])
  382. def update_handler(self, fd, events):
  383. fd, fileobj = self.split_fd(fd)
  384. if events & tornado.ioloop.IOLoop.READ:
  385. if not self.fds[fd].reading:
  386. self.fds[fd].reading = True
  387. self.reactor.addReader(self.fds[fd])
  388. else:
  389. if self.fds[fd].reading:
  390. self.fds[fd].reading = False
  391. self.reactor.removeReader(self.fds[fd])
  392. if events & tornado.ioloop.IOLoop.WRITE:
  393. if not self.fds[fd].writing:
  394. self.fds[fd].writing = True
  395. self.reactor.addWriter(self.fds[fd])
  396. else:
  397. if self.fds[fd].writing:
  398. self.fds[fd].writing = False
  399. self.reactor.removeWriter(self.fds[fd])
  400. def remove_handler(self, fd):
  401. fd, fileobj = self.split_fd(fd)
  402. if fd not in self.fds:
  403. return
  404. self.fds[fd].lost = True
  405. if self.fds[fd].reading:
  406. self.reactor.removeReader(self.fds[fd])
  407. if self.fds[fd].writing:
  408. self.reactor.removeWriter(self.fds[fd])
  409. del self.fds[fd]
  410. def start(self):
  411. old_current = IOLoop.current(instance=False)
  412. try:
  413. self._setup_logging()
  414. self.make_current()
  415. self.reactor.run()
  416. finally:
  417. if old_current is None:
  418. IOLoop.clear_current()
  419. else:
  420. old_current.make_current()
  421. def stop(self):
  422. self.reactor.crash()
  423. def add_timeout(self, deadline, callback, *args, **kwargs):
  424. # This method could be simplified (since tornado 4.0) by
  425. # overriding call_at instead of add_timeout, but we leave it
  426. # for now as a test of backwards-compatibility.
  427. if isinstance(deadline, numbers.Real):
  428. delay = max(deadline - self.time(), 0)
  429. elif isinstance(deadline, datetime.timedelta):
  430. delay = timedelta_to_seconds(deadline)
  431. else:
  432. raise TypeError("Unsupported deadline %r")
  433. return self.reactor.callLater(
  434. delay, self._run_callback,
  435. functools.partial(wrap(callback), *args, **kwargs))
  436. def remove_timeout(self, timeout):
  437. if timeout.active():
  438. timeout.cancel()
  439. def add_callback(self, callback, *args, **kwargs):
  440. self.reactor.callFromThread(
  441. self._run_callback,
  442. functools.partial(wrap(callback), *args, **kwargs))
  443. def add_callback_from_signal(self, callback, *args, **kwargs):
  444. self.add_callback(callback, *args, **kwargs)
  445. class TwistedResolver(Resolver):
  446. """Twisted-based asynchronous resolver.
  447. This is a non-blocking and non-threaded resolver. It is
  448. recommended only when threads cannot be used, since it has
  449. limitations compared to the standard ``getaddrinfo``-based
  450. `~tornado.netutil.Resolver` and
  451. `~tornado.netutil.DefaultExecutorResolver`. Specifically, it returns at
  452. most one result, and arguments other than ``host`` and ``family``
  453. are ignored. It may fail to resolve when ``family`` is not
  454. ``socket.AF_UNSPEC``.
  455. Requires Twisted 12.1 or newer.
  456. .. versionchanged:: 5.0
  457. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  458. """
  459. def initialize(self):
  460. # partial copy of twisted.names.client.createResolver, which doesn't
  461. # allow for a reactor to be passed in.
  462. self.reactor = tornado.platform.twisted.TornadoReactor()
  463. host_resolver = twisted.names.hosts.Resolver('/etc/hosts')
  464. cache_resolver = twisted.names.cache.CacheResolver(reactor=self.reactor)
  465. real_resolver = twisted.names.client.Resolver('/etc/resolv.conf',
  466. reactor=self.reactor)
  467. self.resolver = twisted.names.resolve.ResolverChain(
  468. [host_resolver, cache_resolver, real_resolver])
  469. @gen.coroutine
  470. def resolve(self, host, port, family=0):
  471. # getHostByName doesn't accept IP addresses, so if the input
  472. # looks like an IP address just return it immediately.
  473. if twisted.internet.abstract.isIPAddress(host):
  474. resolved = host
  475. resolved_family = socket.AF_INET
  476. elif twisted.internet.abstract.isIPv6Address(host):
  477. resolved = host
  478. resolved_family = socket.AF_INET6
  479. else:
  480. deferred = self.resolver.getHostByName(utf8(host))
  481. fut = Future()
  482. deferred.addBoth(fut.set_result)
  483. resolved = yield fut
  484. if isinstance(resolved, failure.Failure):
  485. try:
  486. resolved.raiseException()
  487. except twisted.names.error.DomainError as e:
  488. raise IOError(e)
  489. elif twisted.internet.abstract.isIPAddress(resolved):
  490. resolved_family = socket.AF_INET
  491. elif twisted.internet.abstract.isIPv6Address(resolved):
  492. resolved_family = socket.AF_INET6
  493. else:
  494. resolved_family = socket.AF_UNSPEC
  495. if family != socket.AF_UNSPEC and family != resolved_family:
  496. raise Exception('Requested socket family %d but got %d' %
  497. (family, resolved_family))
  498. result = [
  499. (resolved_family, (resolved, port)),
  500. ]
  501. raise gen.Return(result)
  502. if hasattr(gen.convert_yielded, 'register'):
  503. @gen.convert_yielded.register(Deferred) # type: ignore
  504. def _(d):
  505. f = Future()
  506. def errback(failure):
  507. try:
  508. failure.raiseException()
  509. # Should never happen, but just in case
  510. raise Exception("errback called without error")
  511. except:
  512. future_set_exc_info(f, sys.exc_info())
  513. d.addCallbacks(f.set_result, errback)
  514. return f