123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526 |
- # Copyright 2015 The Tornado Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- from __future__ import absolute_import, division, print_function
- import collections
- from concurrent.futures import CancelledError
- from tornado import gen, ioloop
- from tornado.concurrent import Future, future_set_result_unless_cancelled
- __all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock']
- class _TimeoutGarbageCollector(object):
- """Base class for objects that periodically clean up timed-out waiters.
- Avoids memory leak in a common pattern like:
- while True:
- yield condition.wait(short_timeout)
- print('looping....')
- """
- def __init__(self):
- self._waiters = collections.deque() # Futures.
- self._timeouts = 0
- def _garbage_collect(self):
- # Occasionally clear timed-out waiters.
- self._timeouts += 1
- if self._timeouts > 100:
- self._timeouts = 0
- self._waiters = collections.deque(
- w for w in self._waiters if not w.done())
- class Condition(_TimeoutGarbageCollector):
- """A condition allows one or more coroutines to wait until notified.
- Like a standard `threading.Condition`, but does not need an underlying lock
- that is acquired and released.
- With a `Condition`, coroutines can wait to be notified by other coroutines:
- .. testcode::
- from tornado import gen
- from tornado.ioloop import IOLoop
- from tornado.locks import Condition
- condition = Condition()
- async def waiter():
- print("I'll wait right here")
- await condition.wait()
- print("I'm done waiting")
- async def notifier():
- print("About to notify")
- condition.notify()
- print("Done notifying")
- async def runner():
- # Wait for waiter() and notifier() in parallel
- await gen.multi([waiter(), notifier()])
- IOLoop.current().run_sync(runner)
- .. testoutput::
- I'll wait right here
- About to notify
- Done notifying
- I'm done waiting
- `wait` takes an optional ``timeout`` argument, which is either an absolute
- timestamp::
- io_loop = IOLoop.current()
- # Wait up to 1 second for a notification.
- await condition.wait(timeout=io_loop.time() + 1)
- ...or a `datetime.timedelta` for a timeout relative to the current time::
- # Wait up to 1 second.
- await condition.wait(timeout=datetime.timedelta(seconds=1))
- The method returns False if there's no notification before the deadline.
- .. versionchanged:: 5.0
- Previously, waiters could be notified synchronously from within
- `notify`. Now, the notification will always be received on the
- next iteration of the `.IOLoop`.
- """
- def __init__(self):
- super(Condition, self).__init__()
- self.io_loop = ioloop.IOLoop.current()
- def __repr__(self):
- result = '<%s' % (self.__class__.__name__, )
- if self._waiters:
- result += ' waiters[%s]' % len(self._waiters)
- return result + '>'
- def wait(self, timeout=None):
- """Wait for `.notify`.
- Returns a `.Future` that resolves ``True`` if the condition is notified,
- or ``False`` after a timeout.
- """
- waiter = Future()
- self._waiters.append(waiter)
- if timeout:
- def on_timeout():
- if not waiter.done():
- future_set_result_unless_cancelled(waiter, False)
- self._garbage_collect()
- io_loop = ioloop.IOLoop.current()
- timeout_handle = io_loop.add_timeout(timeout, on_timeout)
- waiter.add_done_callback(
- lambda _: io_loop.remove_timeout(timeout_handle))
- return waiter
- def notify(self, n=1):
- """Wake ``n`` waiters."""
- waiters = [] # Waiters we plan to run right now.
- while n and self._waiters:
- waiter = self._waiters.popleft()
- if not waiter.done(): # Might have timed out.
- n -= 1
- waiters.append(waiter)
- for waiter in waiters:
- future_set_result_unless_cancelled(waiter, True)
- def notify_all(self):
- """Wake all waiters."""
- self.notify(len(self._waiters))
- class Event(object):
- """An event blocks coroutines until its internal flag is set to True.
- Similar to `threading.Event`.
- A coroutine can wait for an event to be set. Once it is set, calls to
- ``yield event.wait()`` will not block unless the event has been cleared:
- .. testcode::
- from tornado import gen
- from tornado.ioloop import IOLoop
- from tornado.locks import Event
- event = Event()
- async def waiter():
- print("Waiting for event")
- await event.wait()
- print("Not waiting this time")
- await event.wait()
- print("Done")
- async def setter():
- print("About to set the event")
- event.set()
- async def runner():
- await gen.multi([waiter(), setter()])
- IOLoop.current().run_sync(runner)
- .. testoutput::
- Waiting for event
- About to set the event
- Not waiting this time
- Done
- """
- def __init__(self):
- self._value = False
- self._waiters = set()
- def __repr__(self):
- return '<%s %s>' % (
- self.__class__.__name__, 'set' if self.is_set() else 'clear')
- def is_set(self):
- """Return ``True`` if the internal flag is true."""
- return self._value
- def set(self):
- """Set the internal flag to ``True``. All waiters are awakened.
- Calling `.wait` once the flag is set will not block.
- """
- if not self._value:
- self._value = True
- for fut in self._waiters:
- if not fut.done():
- fut.set_result(None)
- def clear(self):
- """Reset the internal flag to ``False``.
- Calls to `.wait` will block until `.set` is called.
- """
- self._value = False
- def wait(self, timeout=None):
- """Block until the internal flag is true.
- Returns a Future, which raises `tornado.util.TimeoutError` after a
- timeout.
- """
- fut = Future()
- if self._value:
- fut.set_result(None)
- return fut
- self._waiters.add(fut)
- fut.add_done_callback(lambda fut: self._waiters.remove(fut))
- if timeout is None:
- return fut
- else:
- timeout_fut = gen.with_timeout(timeout, fut, quiet_exceptions=(CancelledError,))
- # This is a slightly clumsy workaround for the fact that
- # gen.with_timeout doesn't cancel its futures. Cancelling
- # fut will remove it from the waiters list.
- timeout_fut.add_done_callback(lambda tf: fut.cancel() if not fut.done() else None)
- return timeout_fut
- class _ReleasingContextManager(object):
- """Releases a Lock or Semaphore at the end of a "with" statement.
- with (yield semaphore.acquire()):
- pass
- # Now semaphore.release() has been called.
- """
- def __init__(self, obj):
- self._obj = obj
- def __enter__(self):
- pass
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._obj.release()
- class Semaphore(_TimeoutGarbageCollector):
- """A lock that can be acquired a fixed number of times before blocking.
- A Semaphore manages a counter representing the number of `.release` calls
- minus the number of `.acquire` calls, plus an initial value. The `.acquire`
- method blocks if necessary until it can return without making the counter
- negative.
- Semaphores limit access to a shared resource. To allow access for two
- workers at a time:
- .. testsetup:: semaphore
- from collections import deque
- from tornado import gen
- from tornado.ioloop import IOLoop
- from tornado.concurrent import Future
- # Ensure reliable doctest output: resolve Futures one at a time.
- futures_q = deque([Future() for _ in range(3)])
- async def simulator(futures):
- for f in futures:
- # simulate the asynchronous passage of time
- await gen.sleep(0)
- await gen.sleep(0)
- f.set_result(None)
- IOLoop.current().add_callback(simulator, list(futures_q))
- def use_some_resource():
- return futures_q.popleft()
- .. testcode:: semaphore
- from tornado import gen
- from tornado.ioloop import IOLoop
- from tornado.locks import Semaphore
- sem = Semaphore(2)
- async def worker(worker_id):
- await sem.acquire()
- try:
- print("Worker %d is working" % worker_id)
- await use_some_resource()
- finally:
- print("Worker %d is done" % worker_id)
- sem.release()
- async def runner():
- # Join all workers.
- await gen.multi([worker(i) for i in range(3)])
- IOLoop.current().run_sync(runner)
- .. testoutput:: semaphore
- Worker 0 is working
- Worker 1 is working
- Worker 0 is done
- Worker 2 is working
- Worker 1 is done
- Worker 2 is done
- Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
- the semaphore has been released once, by worker 0.
- The semaphore can be used as an async context manager::
- async def worker(worker_id):
- async with sem:
- print("Worker %d is working" % worker_id)
- await use_some_resource()
- # Now the semaphore has been released.
- print("Worker %d is done" % worker_id)
- For compatibility with older versions of Python, `.acquire` is a
- context manager, so ``worker`` could also be written as::
- @gen.coroutine
- def worker(worker_id):
- with (yield sem.acquire()):
- print("Worker %d is working" % worker_id)
- yield use_some_resource()
- # Now the semaphore has been released.
- print("Worker %d is done" % worker_id)
- .. versionchanged:: 4.3
- Added ``async with`` support in Python 3.5.
- """
- def __init__(self, value=1):
- super(Semaphore, self).__init__()
- if value < 0:
- raise ValueError('semaphore initial value must be >= 0')
- self._value = value
- def __repr__(self):
- res = super(Semaphore, self).__repr__()
- extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format(
- self._value)
- if self._waiters:
- extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
- return '<{0} [{1}]>'.format(res[1:-1], extra)
- def release(self):
- """Increment the counter and wake one waiter."""
- self._value += 1
- while self._waiters:
- waiter = self._waiters.popleft()
- if not waiter.done():
- self._value -= 1
- # If the waiter is a coroutine paused at
- #
- # with (yield semaphore.acquire()):
- #
- # then the context manager's __exit__ calls release() at the end
- # of the "with" block.
- waiter.set_result(_ReleasingContextManager(self))
- break
- def acquire(self, timeout=None):
- """Decrement the counter. Returns a Future.
- Block if the counter is zero and wait for a `.release`. The Future
- raises `.TimeoutError` after the deadline.
- """
- waiter = Future()
- if self._value > 0:
- self._value -= 1
- waiter.set_result(_ReleasingContextManager(self))
- else:
- self._waiters.append(waiter)
- if timeout:
- def on_timeout():
- if not waiter.done():
- waiter.set_exception(gen.TimeoutError())
- self._garbage_collect()
- io_loop = ioloop.IOLoop.current()
- timeout_handle = io_loop.add_timeout(timeout, on_timeout)
- waiter.add_done_callback(
- lambda _: io_loop.remove_timeout(timeout_handle))
- return waiter
- def __enter__(self):
- raise RuntimeError(
- "Use Semaphore like 'with (yield semaphore.acquire())', not like"
- " 'with semaphore'")
- __exit__ = __enter__
- @gen.coroutine
- def __aenter__(self):
- yield self.acquire()
- @gen.coroutine
- def __aexit__(self, typ, value, tb):
- self.release()
- class BoundedSemaphore(Semaphore):
- """A semaphore that prevents release() being called too many times.
- If `.release` would increment the semaphore's value past the initial
- value, it raises `ValueError`. Semaphores are mostly used to guard
- resources with limited capacity, so a semaphore released too many times
- is a sign of a bug.
- """
- def __init__(self, value=1):
- super(BoundedSemaphore, self).__init__(value=value)
- self._initial_value = value
- def release(self):
- """Increment the counter and wake one waiter."""
- if self._value >= self._initial_value:
- raise ValueError("Semaphore released too many times")
- super(BoundedSemaphore, self).release()
- class Lock(object):
- """A lock for coroutines.
- A Lock begins unlocked, and `acquire` locks it immediately. While it is
- locked, a coroutine that yields `acquire` waits until another coroutine
- calls `release`.
- Releasing an unlocked lock raises `RuntimeError`.
- A Lock can be used as an async context manager with the ``async
- with`` statement:
- >>> from tornado import locks
- >>> lock = locks.Lock()
- >>>
- >>> async def f():
- ... async with lock:
- ... # Do something holding the lock.
- ... pass
- ...
- ... # Now the lock is released.
- For compatibility with older versions of Python, the `.acquire`
- method asynchronously returns a regular context manager:
- >>> async def f2():
- ... with (yield lock.acquire()):
- ... # Do something holding the lock.
- ... pass
- ...
- ... # Now the lock is released.
- .. versionchanged:: 4.3
- Added ``async with`` support in Python 3.5.
- """
- def __init__(self):
- self._block = BoundedSemaphore(value=1)
- def __repr__(self):
- return "<%s _block=%s>" % (
- self.__class__.__name__,
- self._block)
- def acquire(self, timeout=None):
- """Attempt to lock. Returns a Future.
- Returns a Future, which raises `tornado.util.TimeoutError` after a
- timeout.
- """
- return self._block.acquire(timeout)
- def release(self):
- """Unlock.
- The first coroutine in line waiting for `acquire` gets the lock.
- If not locked, raise a `RuntimeError`.
- """
- try:
- self._block.release()
- except ValueError:
- raise RuntimeError('release unlocked lock')
- def __enter__(self):
- raise RuntimeError(
- "Use Lock like 'with (yield lock)', not like 'with lock'")
- __exit__ = __enter__
- @gen.coroutine
- def __aenter__(self):
- yield self.acquire()
- @gen.coroutine
- def __aexit__(self, typ, value, tb):
- self.release()
|