locks.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. # Copyright 2015 The Tornado Authors
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  4. # not use this file except in compliance with the License. You may obtain
  5. # a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  11. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  12. # License for the specific language governing permissions and limitations
  13. # under the License.
  14. from __future__ import absolute_import, division, print_function
  15. import collections
  16. from concurrent.futures import CancelledError
  17. from tornado import gen, ioloop
  18. from tornado.concurrent import Future, future_set_result_unless_cancelled
  19. __all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock']
  20. class _TimeoutGarbageCollector(object):
  21. """Base class for objects that periodically clean up timed-out waiters.
  22. Avoids memory leak in a common pattern like:
  23. while True:
  24. yield condition.wait(short_timeout)
  25. print('looping....')
  26. """
  27. def __init__(self):
  28. self._waiters = collections.deque() # Futures.
  29. self._timeouts = 0
  30. def _garbage_collect(self):
  31. # Occasionally clear timed-out waiters.
  32. self._timeouts += 1
  33. if self._timeouts > 100:
  34. self._timeouts = 0
  35. self._waiters = collections.deque(
  36. w for w in self._waiters if not w.done())
  37. class Condition(_TimeoutGarbageCollector):
  38. """A condition allows one or more coroutines to wait until notified.
  39. Like a standard `threading.Condition`, but does not need an underlying lock
  40. that is acquired and released.
  41. With a `Condition`, coroutines can wait to be notified by other coroutines:
  42. .. testcode::
  43. from tornado import gen
  44. from tornado.ioloop import IOLoop
  45. from tornado.locks import Condition
  46. condition = Condition()
  47. async def waiter():
  48. print("I'll wait right here")
  49. await condition.wait()
  50. print("I'm done waiting")
  51. async def notifier():
  52. print("About to notify")
  53. condition.notify()
  54. print("Done notifying")
  55. async def runner():
  56. # Wait for waiter() and notifier() in parallel
  57. await gen.multi([waiter(), notifier()])
  58. IOLoop.current().run_sync(runner)
  59. .. testoutput::
  60. I'll wait right here
  61. About to notify
  62. Done notifying
  63. I'm done waiting
  64. `wait` takes an optional ``timeout`` argument, which is either an absolute
  65. timestamp::
  66. io_loop = IOLoop.current()
  67. # Wait up to 1 second for a notification.
  68. await condition.wait(timeout=io_loop.time() + 1)
  69. ...or a `datetime.timedelta` for a timeout relative to the current time::
  70. # Wait up to 1 second.
  71. await condition.wait(timeout=datetime.timedelta(seconds=1))
  72. The method returns False if there's no notification before the deadline.
  73. .. versionchanged:: 5.0
  74. Previously, waiters could be notified synchronously from within
  75. `notify`. Now, the notification will always be received on the
  76. next iteration of the `.IOLoop`.
  77. """
  78. def __init__(self):
  79. super(Condition, self).__init__()
  80. self.io_loop = ioloop.IOLoop.current()
  81. def __repr__(self):
  82. result = '<%s' % (self.__class__.__name__, )
  83. if self._waiters:
  84. result += ' waiters[%s]' % len(self._waiters)
  85. return result + '>'
  86. def wait(self, timeout=None):
  87. """Wait for `.notify`.
  88. Returns a `.Future` that resolves ``True`` if the condition is notified,
  89. or ``False`` after a timeout.
  90. """
  91. waiter = Future()
  92. self._waiters.append(waiter)
  93. if timeout:
  94. def on_timeout():
  95. if not waiter.done():
  96. future_set_result_unless_cancelled(waiter, False)
  97. self._garbage_collect()
  98. io_loop = ioloop.IOLoop.current()
  99. timeout_handle = io_loop.add_timeout(timeout, on_timeout)
  100. waiter.add_done_callback(
  101. lambda _: io_loop.remove_timeout(timeout_handle))
  102. return waiter
  103. def notify(self, n=1):
  104. """Wake ``n`` waiters."""
  105. waiters = [] # Waiters we plan to run right now.
  106. while n and self._waiters:
  107. waiter = self._waiters.popleft()
  108. if not waiter.done(): # Might have timed out.
  109. n -= 1
  110. waiters.append(waiter)
  111. for waiter in waiters:
  112. future_set_result_unless_cancelled(waiter, True)
  113. def notify_all(self):
  114. """Wake all waiters."""
  115. self.notify(len(self._waiters))
  116. class Event(object):
  117. """An event blocks coroutines until its internal flag is set to True.
  118. Similar to `threading.Event`.
  119. A coroutine can wait for an event to be set. Once it is set, calls to
  120. ``yield event.wait()`` will not block unless the event has been cleared:
  121. .. testcode::
  122. from tornado import gen
  123. from tornado.ioloop import IOLoop
  124. from tornado.locks import Event
  125. event = Event()
  126. async def waiter():
  127. print("Waiting for event")
  128. await event.wait()
  129. print("Not waiting this time")
  130. await event.wait()
  131. print("Done")
  132. async def setter():
  133. print("About to set the event")
  134. event.set()
  135. async def runner():
  136. await gen.multi([waiter(), setter()])
  137. IOLoop.current().run_sync(runner)
  138. .. testoutput::
  139. Waiting for event
  140. About to set the event
  141. Not waiting this time
  142. Done
  143. """
  144. def __init__(self):
  145. self._value = False
  146. self._waiters = set()
  147. def __repr__(self):
  148. return '<%s %s>' % (
  149. self.__class__.__name__, 'set' if self.is_set() else 'clear')
  150. def is_set(self):
  151. """Return ``True`` if the internal flag is true."""
  152. return self._value
  153. def set(self):
  154. """Set the internal flag to ``True``. All waiters are awakened.
  155. Calling `.wait` once the flag is set will not block.
  156. """
  157. if not self._value:
  158. self._value = True
  159. for fut in self._waiters:
  160. if not fut.done():
  161. fut.set_result(None)
  162. def clear(self):
  163. """Reset the internal flag to ``False``.
  164. Calls to `.wait` will block until `.set` is called.
  165. """
  166. self._value = False
  167. def wait(self, timeout=None):
  168. """Block until the internal flag is true.
  169. Returns a Future, which raises `tornado.util.TimeoutError` after a
  170. timeout.
  171. """
  172. fut = Future()
  173. if self._value:
  174. fut.set_result(None)
  175. return fut
  176. self._waiters.add(fut)
  177. fut.add_done_callback(lambda fut: self._waiters.remove(fut))
  178. if timeout is None:
  179. return fut
  180. else:
  181. timeout_fut = gen.with_timeout(timeout, fut, quiet_exceptions=(CancelledError,))
  182. # This is a slightly clumsy workaround for the fact that
  183. # gen.with_timeout doesn't cancel its futures. Cancelling
  184. # fut will remove it from the waiters list.
  185. timeout_fut.add_done_callback(lambda tf: fut.cancel() if not fut.done() else None)
  186. return timeout_fut
  187. class _ReleasingContextManager(object):
  188. """Releases a Lock or Semaphore at the end of a "with" statement.
  189. with (yield semaphore.acquire()):
  190. pass
  191. # Now semaphore.release() has been called.
  192. """
  193. def __init__(self, obj):
  194. self._obj = obj
  195. def __enter__(self):
  196. pass
  197. def __exit__(self, exc_type, exc_val, exc_tb):
  198. self._obj.release()
  199. class Semaphore(_TimeoutGarbageCollector):
  200. """A lock that can be acquired a fixed number of times before blocking.
  201. A Semaphore manages a counter representing the number of `.release` calls
  202. minus the number of `.acquire` calls, plus an initial value. The `.acquire`
  203. method blocks if necessary until it can return without making the counter
  204. negative.
  205. Semaphores limit access to a shared resource. To allow access for two
  206. workers at a time:
  207. .. testsetup:: semaphore
  208. from collections import deque
  209. from tornado import gen
  210. from tornado.ioloop import IOLoop
  211. from tornado.concurrent import Future
  212. # Ensure reliable doctest output: resolve Futures one at a time.
  213. futures_q = deque([Future() for _ in range(3)])
  214. async def simulator(futures):
  215. for f in futures:
  216. # simulate the asynchronous passage of time
  217. await gen.sleep(0)
  218. await gen.sleep(0)
  219. f.set_result(None)
  220. IOLoop.current().add_callback(simulator, list(futures_q))
  221. def use_some_resource():
  222. return futures_q.popleft()
  223. .. testcode:: semaphore
  224. from tornado import gen
  225. from tornado.ioloop import IOLoop
  226. from tornado.locks import Semaphore
  227. sem = Semaphore(2)
  228. async def worker(worker_id):
  229. await sem.acquire()
  230. try:
  231. print("Worker %d is working" % worker_id)
  232. await use_some_resource()
  233. finally:
  234. print("Worker %d is done" % worker_id)
  235. sem.release()
  236. async def runner():
  237. # Join all workers.
  238. await gen.multi([worker(i) for i in range(3)])
  239. IOLoop.current().run_sync(runner)
  240. .. testoutput:: semaphore
  241. Worker 0 is working
  242. Worker 1 is working
  243. Worker 0 is done
  244. Worker 2 is working
  245. Worker 1 is done
  246. Worker 2 is done
  247. Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
  248. the semaphore has been released once, by worker 0.
  249. The semaphore can be used as an async context manager::
  250. async def worker(worker_id):
  251. async with sem:
  252. print("Worker %d is working" % worker_id)
  253. await use_some_resource()
  254. # Now the semaphore has been released.
  255. print("Worker %d is done" % worker_id)
  256. For compatibility with older versions of Python, `.acquire` is a
  257. context manager, so ``worker`` could also be written as::
  258. @gen.coroutine
  259. def worker(worker_id):
  260. with (yield sem.acquire()):
  261. print("Worker %d is working" % worker_id)
  262. yield use_some_resource()
  263. # Now the semaphore has been released.
  264. print("Worker %d is done" % worker_id)
  265. .. versionchanged:: 4.3
  266. Added ``async with`` support in Python 3.5.
  267. """
  268. def __init__(self, value=1):
  269. super(Semaphore, self).__init__()
  270. if value < 0:
  271. raise ValueError('semaphore initial value must be >= 0')
  272. self._value = value
  273. def __repr__(self):
  274. res = super(Semaphore, self).__repr__()
  275. extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format(
  276. self._value)
  277. if self._waiters:
  278. extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
  279. return '<{0} [{1}]>'.format(res[1:-1], extra)
  280. def release(self):
  281. """Increment the counter and wake one waiter."""
  282. self._value += 1
  283. while self._waiters:
  284. waiter = self._waiters.popleft()
  285. if not waiter.done():
  286. self._value -= 1
  287. # If the waiter is a coroutine paused at
  288. #
  289. # with (yield semaphore.acquire()):
  290. #
  291. # then the context manager's __exit__ calls release() at the end
  292. # of the "with" block.
  293. waiter.set_result(_ReleasingContextManager(self))
  294. break
  295. def acquire(self, timeout=None):
  296. """Decrement the counter. Returns a Future.
  297. Block if the counter is zero and wait for a `.release`. The Future
  298. raises `.TimeoutError` after the deadline.
  299. """
  300. waiter = Future()
  301. if self._value > 0:
  302. self._value -= 1
  303. waiter.set_result(_ReleasingContextManager(self))
  304. else:
  305. self._waiters.append(waiter)
  306. if timeout:
  307. def on_timeout():
  308. if not waiter.done():
  309. waiter.set_exception(gen.TimeoutError())
  310. self._garbage_collect()
  311. io_loop = ioloop.IOLoop.current()
  312. timeout_handle = io_loop.add_timeout(timeout, on_timeout)
  313. waiter.add_done_callback(
  314. lambda _: io_loop.remove_timeout(timeout_handle))
  315. return waiter
  316. def __enter__(self):
  317. raise RuntimeError(
  318. "Use Semaphore like 'with (yield semaphore.acquire())', not like"
  319. " 'with semaphore'")
  320. __exit__ = __enter__
  321. @gen.coroutine
  322. def __aenter__(self):
  323. yield self.acquire()
  324. @gen.coroutine
  325. def __aexit__(self, typ, value, tb):
  326. self.release()
  327. class BoundedSemaphore(Semaphore):
  328. """A semaphore that prevents release() being called too many times.
  329. If `.release` would increment the semaphore's value past the initial
  330. value, it raises `ValueError`. Semaphores are mostly used to guard
  331. resources with limited capacity, so a semaphore released too many times
  332. is a sign of a bug.
  333. """
  334. def __init__(self, value=1):
  335. super(BoundedSemaphore, self).__init__(value=value)
  336. self._initial_value = value
  337. def release(self):
  338. """Increment the counter and wake one waiter."""
  339. if self._value >= self._initial_value:
  340. raise ValueError("Semaphore released too many times")
  341. super(BoundedSemaphore, self).release()
  342. class Lock(object):
  343. """A lock for coroutines.
  344. A Lock begins unlocked, and `acquire` locks it immediately. While it is
  345. locked, a coroutine that yields `acquire` waits until another coroutine
  346. calls `release`.
  347. Releasing an unlocked lock raises `RuntimeError`.
  348. A Lock can be used as an async context manager with the ``async
  349. with`` statement:
  350. >>> from tornado import locks
  351. >>> lock = locks.Lock()
  352. >>>
  353. >>> async def f():
  354. ... async with lock:
  355. ... # Do something holding the lock.
  356. ... pass
  357. ...
  358. ... # Now the lock is released.
  359. For compatibility with older versions of Python, the `.acquire`
  360. method asynchronously returns a regular context manager:
  361. >>> async def f2():
  362. ... with (yield lock.acquire()):
  363. ... # Do something holding the lock.
  364. ... pass
  365. ...
  366. ... # Now the lock is released.
  367. .. versionchanged:: 4.3
  368. Added ``async with`` support in Python 3.5.
  369. """
  370. def __init__(self):
  371. self._block = BoundedSemaphore(value=1)
  372. def __repr__(self):
  373. return "<%s _block=%s>" % (
  374. self.__class__.__name__,
  375. self._block)
  376. def acquire(self, timeout=None):
  377. """Attempt to lock. Returns a Future.
  378. Returns a Future, which raises `tornado.util.TimeoutError` after a
  379. timeout.
  380. """
  381. return self._block.acquire(timeout)
  382. def release(self):
  383. """Unlock.
  384. The first coroutine in line waiting for `acquire` gets the lock.
  385. If not locked, raise a `RuntimeError`.
  386. """
  387. try:
  388. self._block.release()
  389. except ValueError:
  390. raise RuntimeError('release unlocked lock')
  391. def __enter__(self):
  392. raise RuntimeError(
  393. "Use Lock like 'with (yield lock)', not like 'with lock'")
  394. __exit__ = __enter__
  395. @gen.coroutine
  396. def __aenter__(self):
  397. yield self.acquire()
  398. @gen.coroutine
  399. def __aexit__(self, typ, value, tb):
  400. self.release()