123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515 |
- """A clone of threading module (version 2.7.2) that always
- targets real OS threads. (Unlike 'threading' which flips between
- green and OS threads based on whether the monkey patching is in effect
- or not).
- This module is missing 'Thread' class, but includes 'Queue'.
- """
- from __future__ import absolute_import
- try:
- from Queue import Full, Empty
- except ImportError:
- from queue import Full, Empty # pylint:disable=import-error
- from collections import deque
- import heapq
- from time import time as _time, sleep as _sleep
- from gevent import monkey
- from gevent._compat import PY3
- __all__ = ['Condition',
- 'Event',
- 'Lock',
- 'RLock',
- 'Semaphore',
- 'BoundedSemaphore',
- 'Queue',
- 'local',
- 'stack_size']
- thread_name = '_thread' if PY3 else 'thread'
- start_new_thread, Lock, get_ident, local, stack_size = monkey.get_original(thread_name, [
- 'start_new_thread', 'allocate_lock', 'get_ident', '_local', 'stack_size'])
- class RLock(object):
- def __init__(self):
- self.__block = Lock()
- self.__owner = None
- self.__count = 0
- def __repr__(self):
- owner = self.__owner
- return "<%s owner=%r count=%d>" % (
- self.__class__.__name__, owner, self.__count)
- def acquire(self, blocking=1):
- me = get_ident()
- if self.__owner == me:
- self.__count = self.__count + 1
- return 1
- rc = self.__block.acquire(blocking)
- if rc:
- self.__owner = me
- self.__count = 1
- return rc
- __enter__ = acquire
- def release(self):
- if self.__owner != get_ident():
- raise RuntimeError("cannot release un-acquired lock")
- self.__count = count = self.__count - 1
- if not count:
- self.__owner = None
- self.__block.release()
- def __exit__(self, t, v, tb):
- self.release()
- # Internal methods used by condition variables
- def _acquire_restore(self, count_owner):
- count, owner = count_owner
- self.__block.acquire()
- self.__count = count
- self.__owner = owner
- def _release_save(self):
- count = self.__count
- self.__count = 0
- owner = self.__owner
- self.__owner = None
- self.__block.release()
- return (count, owner)
- def _is_owned(self):
- return self.__owner == get_ident()
- class Condition(object):
- # pylint:disable=method-hidden
- def __init__(self, lock=None):
- if lock is None:
- lock = RLock()
- self.__lock = lock
- # Export the lock's acquire() and release() methods
- self.acquire = lock.acquire
- self.release = lock.release
- # If the lock defines _release_save() and/or _acquire_restore(),
- # these override the default implementations (which just call
- # release() and acquire() on the lock). Ditto for _is_owned().
- try:
- self._release_save = lock._release_save
- except AttributeError:
- pass
- try:
- self._acquire_restore = lock._acquire_restore
- except AttributeError:
- pass
- try:
- self._is_owned = lock._is_owned
- except AttributeError:
- pass
- self.__waiters = []
- def __enter__(self):
- return self.__lock.__enter__()
- def __exit__(self, *args):
- return self.__lock.__exit__(*args)
- def __repr__(self):
- return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
- def _release_save(self):
- self.__lock.release() # No state to save
- def _acquire_restore(self, x): # pylint:disable=unused-argument
- self.__lock.acquire() # Ignore saved state
- def _is_owned(self):
- # Return True if lock is owned by current_thread.
- # This method is called only if __lock doesn't have _is_owned().
- if self.__lock.acquire(0):
- self.__lock.release()
- return False
- return True
- def wait(self, timeout=None):
- if not self._is_owned():
- raise RuntimeError("cannot wait on un-acquired lock")
- waiter = Lock()
- waiter.acquire()
- self.__waiters.append(waiter)
- saved_state = self._release_save()
- try: # restore state no matter what (e.g., KeyboardInterrupt)
- if timeout is None:
- waiter.acquire()
- else:
- # Balancing act: We can't afford a pure busy loop, so we
- # have to sleep; but if we sleep the whole timeout time,
- # we'll be unresponsive. The scheme here sleeps very
- # little at first, longer as time goes on, but never longer
- # than 20 times per second (or the timeout time remaining).
- endtime = _time() + timeout
- delay = 0.0005 # 500 us -> initial delay of 1 ms
- while True:
- gotit = waiter.acquire(0)
- if gotit:
- break
- remaining = endtime - _time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, .05)
- _sleep(delay)
- if not gotit:
- try:
- self.__waiters.remove(waiter)
- except ValueError:
- pass
- finally:
- self._acquire_restore(saved_state)
- def notify(self, n=1):
- if not self._is_owned():
- raise RuntimeError("cannot notify on un-acquired lock")
- __waiters = self.__waiters
- waiters = __waiters[:n]
- if not waiters:
- return
- for waiter in waiters:
- waiter.release()
- try:
- __waiters.remove(waiter)
- except ValueError:
- pass
- def notify_all(self):
- self.notify(len(self.__waiters))
- class Semaphore(object):
- # After Tim Peters' semaphore class, but not quite the same (no maximum)
- def __init__(self, value=1):
- if value < 0:
- raise ValueError("semaphore initial value must be >= 0")
- self.__cond = Condition(Lock())
- self.__value = value
- def acquire(self, blocking=1):
- rc = False
- self.__cond.acquire()
- while self.__value == 0:
- if not blocking:
- break
- self.__cond.wait()
- else:
- self.__value = self.__value - 1
- rc = True
- self.__cond.release()
- return rc
- __enter__ = acquire
- def release(self):
- self.__cond.acquire()
- self.__value = self.__value + 1
- self.__cond.notify()
- self.__cond.release()
- def __exit__(self, t, v, tb):
- self.release()
- class BoundedSemaphore(Semaphore):
- """Semaphore that checks that # releases is <= # acquires"""
- def __init__(self, value=1):
- Semaphore.__init__(self, value)
- self._initial_value = value
- def release(self):
- if self.Semaphore__value >= self._initial_value: # pylint:disable=no-member
- raise ValueError("Semaphore released too many times")
- return Semaphore.release(self)
- class Event(object):
- # After Tim Peters' event class (without is_posted())
- def __init__(self):
- self.__cond = Condition(Lock())
- self.__flag = False
- def _reset_internal_locks(self):
- # private! called by Thread._reset_internal_locks by _after_fork()
- self.__cond.__init__()
- def is_set(self):
- return self.__flag
- def set(self):
- self.__cond.acquire()
- try:
- self.__flag = True
- self.__cond.notify_all()
- finally:
- self.__cond.release()
- def clear(self):
- self.__cond.acquire()
- try:
- self.__flag = False
- finally:
- self.__cond.release()
- def wait(self, timeout=None):
- self.__cond.acquire()
- try:
- if not self.__flag:
- self.__cond.wait(timeout)
- return self.__flag
- finally:
- self.__cond.release()
- class Queue: # pylint:disable=old-style-class
- """Create a queue object with a given maximum size.
- If maxsize is <= 0, the queue size is infinite.
- """
- def __init__(self, maxsize=0):
- self.maxsize = maxsize
- self._init(maxsize)
- # mutex must be held whenever the queue is mutating. All methods
- # that acquire mutex must release it before returning. mutex
- # is shared between the three conditions, so acquiring and
- # releasing the conditions also acquires and releases mutex.
- self.mutex = Lock()
- # Notify not_empty whenever an item is added to the queue; a
- # thread waiting to get is notified then.
- self.not_empty = Condition(self.mutex)
- # Notify not_full whenever an item is removed from the queue;
- # a thread waiting to put is notified then.
- self.not_full = Condition(self.mutex)
- # Notify all_tasks_done whenever the number of unfinished tasks
- # drops to zero; thread waiting to join() is notified to resume
- self.all_tasks_done = Condition(self.mutex)
- self.unfinished_tasks = 0
- def task_done(self):
- """Indicate that a formerly enqueued task is complete.
- Used by Queue consumer threads. For each get() used to fetch a task,
- a subsequent call to task_done() tells the queue that the processing
- on the task is complete.
- If a join() is currently blocking, it will resume when all items
- have been processed (meaning that a task_done() call was received
- for every item that had been put() into the queue).
- Raises a ValueError if called more times than there were items
- placed in the queue.
- """
- self.all_tasks_done.acquire()
- try:
- unfinished = self.unfinished_tasks - 1
- if unfinished <= 0:
- if unfinished < 0:
- raise ValueError('task_done() called too many times')
- self.all_tasks_done.notify_all()
- self.unfinished_tasks = unfinished
- finally:
- self.all_tasks_done.release()
- def join(self):
- """Blocks until all items in the Queue have been gotten and processed.
- The count of unfinished tasks goes up whenever an item is added to the
- queue. The count goes down whenever a consumer thread calls task_done()
- to indicate the item was retrieved and all work on it is complete.
- When the count of unfinished tasks drops to zero, join() unblocks.
- """
- self.all_tasks_done.acquire()
- try:
- while self.unfinished_tasks:
- self.all_tasks_done.wait()
- finally:
- self.all_tasks_done.release()
- def qsize(self):
- """Return the approximate size of the queue (not reliable!)."""
- self.mutex.acquire()
- try:
- return self._qsize()
- finally:
- self.mutex.release()
- def empty(self):
- """Return True if the queue is empty, False otherwise (not reliable!)."""
- self.mutex.acquire()
- try:
- return not self._qsize()
- finally:
- self.mutex.release()
- def full(self):
- """Return True if the queue is full, False otherwise (not reliable!)."""
- self.mutex.acquire()
- try:
- if self.maxsize <= 0:
- return False
- if self.maxsize >= self._qsize():
- return True
- finally:
- self.mutex.release()
- def put(self, item, block=True, timeout=None):
- """Put an item into the queue.
- If optional args 'block' is true and 'timeout' is None (the default),
- block if necessary until a free slot is available. If 'timeout' is
- a positive number, it blocks at most 'timeout' seconds and raises
- the Full exception if no free slot was available within that time.
- Otherwise ('block' is false), put an item on the queue if a free slot
- is immediately available, else raise the Full exception ('timeout'
- is ignored in that case).
- """
- self.not_full.acquire()
- try:
- if self.maxsize > 0:
- if not block:
- if self._qsize() >= self.maxsize:
- raise Full
- elif timeout is None:
- while self._qsize() >= self.maxsize:
- self.not_full.wait()
- elif timeout < 0:
- raise ValueError("'timeout' must be a positive number")
- else:
- endtime = _time() + timeout
- while self._qsize() >= self.maxsize:
- remaining = endtime - _time()
- if remaining <= 0.0:
- raise Full
- self.not_full.wait(remaining)
- self._put(item)
- self.unfinished_tasks += 1
- self.not_empty.notify()
- finally:
- self.not_full.release()
- def put_nowait(self, item):
- """Put an item into the queue without blocking.
- Only enqueue the item if a free slot is immediately available.
- Otherwise raise the Full exception.
- """
- return self.put(item, False)
- def get(self, block=True, timeout=None):
- """Remove and return an item from the queue.
- If optional args 'block' is true and 'timeout' is None (the default),
- block if necessary until an item is available. If 'timeout' is
- a positive number, it blocks at most 'timeout' seconds and raises
- the Empty exception if no item was available within that time.
- Otherwise ('block' is false), return an item if one is immediately
- available, else raise the Empty exception ('timeout' is ignored
- in that case).
- """
- self.not_empty.acquire()
- try:
- if not block:
- if not self._qsize():
- raise Empty
- elif timeout is None:
- while not self._qsize():
- self.not_empty.wait()
- elif timeout < 0:
- raise ValueError("'timeout' must be a positive number")
- else:
- endtime = _time() + timeout
- while not self._qsize():
- remaining = endtime - _time()
- if remaining <= 0.0:
- raise Empty
- self.not_empty.wait(remaining)
- item = self._get()
- self.not_full.notify()
- return item
- finally:
- self.not_empty.release()
- def get_nowait(self):
- """Remove and return an item from the queue without blocking.
- Only get an item if one is immediately available. Otherwise
- raise the Empty exception.
- """
- return self.get(False)
- # Override these methods to implement other queue organizations
- # (e.g. stack or priority queue).
- # These will only be called with appropriate locks held
- # Initialize the queue representation
- def _init(self, maxsize):
- # pylint:disable=unused-argument
- self.queue = deque()
- def _qsize(self, len=len):
- return len(self.queue)
- # Put a new item in the queue
- def _put(self, item):
- self.queue.append(item)
- # Get an item from the queue
- def _get(self):
- return self.queue.popleft()
- class PriorityQueue(Queue):
- '''Variant of Queue that retrieves open entries in priority order (lowest first).
- Entries are typically tuples of the form: (priority number, data).
- '''
- def _init(self, maxsize):
- self.queue = []
- def _qsize(self, len=len):
- return len(self.queue)
- def _put(self, item, heappush=heapq.heappush):
- # pylint:disable=arguments-differ
- heappush(self.queue, item)
- def _get(self, heappop=heapq.heappop):
- # pylint:disable=arguments-differ
- return heappop(self.queue)
- class LifoQueue(Queue):
- '''Variant of Queue that retrieves most recently added entries first.'''
- def _init(self, maxsize):
- self.queue = []
- def _qsize(self, len=len):
- return len(self.queue)
- def _put(self, item):
- self.queue.append(item)
- def _get(self):
- return self.queue.pop()
|