_semaphore.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. import sys
  2. from gevent.hub import get_hub, getcurrent
  3. from gevent.timeout import Timeout
  4. __all__ = ['Semaphore', 'BoundedSemaphore']
  5. class Semaphore(object):
  6. """
  7. Semaphore(value=1) -> Semaphore
  8. A semaphore manages a counter representing the number of release()
  9. calls minus the number of acquire() calls, plus an initial value.
  10. The acquire() method blocks if necessary until it can return
  11. without making the counter negative.
  12. If not given, ``value`` defaults to 1.
  13. The semaphore is a context manager and can be used in ``with`` statements.
  14. This Semaphore's ``__exit__`` method does not call the trace function
  15. on CPython, but does under PyPy.
  16. .. seealso:: :class:`BoundedSemaphore` for a safer version that prevents
  17. some classes of bugs.
  18. """
  19. def __init__(self, value=1):
  20. if value < 0:
  21. raise ValueError("semaphore initial value must be >= 0")
  22. self.counter = value
  23. self._dirty = False
  24. # In PyPy 2.6.1 with Cython 0.23, `cdef public` or `cdef
  25. # readonly` or simply `cdef` attributes of type `object` can appear to leak if
  26. # a Python subclass is used (this is visible simply
  27. # instantiating this subclass if _links=[]). Our _links and
  28. # _notifier are such attributes, and gevent.thread subclasses
  29. # this class. Thus, we carefully manage the lifetime of the
  30. # objects we put in these attributes so that, in the normal
  31. # case of a semaphore used correctly (deallocated when it's not
  32. # locked and no one is waiting), the leak goes away (because
  33. # these objects are back to None). This can also be solved on PyPy
  34. # by simply not declaring these objects in the pxd file, but that doesn't work for
  35. # CPython ("No attribute...")
  36. # See https://github.com/gevent/gevent/issues/660
  37. self._links = None
  38. self._notifier = None
  39. # we don't want to do get_hub() here to allow defining module-level locks
  40. # without initializing the hub
  41. def __str__(self):
  42. params = (self.__class__.__name__, self.counter, len(self._links) if self._links else 0)
  43. return '<%s counter=%s _links[%s]>' % params
  44. def locked(self):
  45. """Return a boolean indicating whether the semaphore can be acquired.
  46. Most useful with binary semaphores."""
  47. return self.counter <= 0
  48. def release(self):
  49. """
  50. Release the semaphore, notifying any waiters if needed.
  51. """
  52. self.counter += 1
  53. self._start_notify()
  54. return self.counter
  55. def _start_notify(self):
  56. if self._links and self.counter > 0 and not self._notifier:
  57. # We create a new self._notifier each time through the loop,
  58. # if needed. (it has a __bool__ method that tells whether it has
  59. # been run; once it's run once---at the end of the loop---it becomes
  60. # false.)
  61. # NOTE: Passing the bound method will cause a memory leak on PyPy
  62. # with Cython <= 0.23.3. You must use >= 0.23.4.
  63. # See https://bitbucket.org/pypy/pypy/issues/2149/memory-leak-for-python-subclass-of-cpyext#comment-22371546
  64. self._notifier = get_hub().loop.run_callback(self._notify_links)
  65. def _notify_links(self):
  66. # Subclasses CANNOT override. This is a cdef method.
  67. # We release self._notifier here. We are called by it
  68. # at the end of the loop, and it is now false in a boolean way (as soon
  69. # as this method returns).
  70. # If we get acquired/released again, we will create a new one, but there's
  71. # no need to keep it around until that point (making it potentially climb
  72. # into older GC generations, notably on PyPy)
  73. notifier = self._notifier
  74. try:
  75. while True:
  76. self._dirty = False
  77. if not self._links:
  78. # In case we were manually unlinked before
  79. # the callback. Which shouldn't happen
  80. return
  81. for link in self._links:
  82. if self.counter <= 0:
  83. return
  84. try:
  85. link(self) # Must use Cython >= 0.23.4 on PyPy else this leaks memory
  86. except: # pylint:disable=bare-except
  87. getcurrent().handle_error((link, self), *sys.exc_info())
  88. if self._dirty:
  89. # We mutated self._links so we need to start over
  90. break
  91. if not self._dirty:
  92. return
  93. finally:
  94. # We should not have created a new notifier even if callbacks
  95. # released us because we loop through *all* of our links on the
  96. # same callback while self._notifier is still true.
  97. assert self._notifier is notifier
  98. self._notifier = None
  99. def rawlink(self, callback):
  100. """
  101. rawlink(callback) -> None
  102. Register a callback to call when a counter is more than zero.
  103. *callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
  104. *callback* will be passed one argument: this instance.
  105. This method is normally called automatically by :meth:`acquire` and :meth:`wait`; most code
  106. will not need to use it.
  107. """
  108. if not callable(callback):
  109. raise TypeError('Expected callable:', callback)
  110. if self._links is None:
  111. self._links = [callback]
  112. else:
  113. self._links.append(callback)
  114. self._dirty = True
  115. def unlink(self, callback):
  116. """
  117. unlink(callback) -> None
  118. Remove the callback set by :meth:`rawlink`.
  119. This method is normally called automatically by :meth:`acquire` and :meth:`wait`; most
  120. code will not need to use it.
  121. """
  122. try:
  123. self._links.remove(callback)
  124. self._dirty = True
  125. except (ValueError, AttributeError):
  126. pass
  127. if not self._links:
  128. self._links = None
  129. # TODO: Cancel a notifier if there are no links?
  130. def _do_wait(self, timeout):
  131. """
  132. Wait for up to *timeout* seconds to expire. If timeout
  133. elapses, return the exception. Otherwise, return None.
  134. Raises timeout if a different timer expires.
  135. """
  136. switch = getcurrent().switch
  137. self.rawlink(switch)
  138. try:
  139. timer = Timeout._start_new_or_dummy(timeout)
  140. try:
  141. try:
  142. result = get_hub().switch()
  143. assert result is self, 'Invalid switch into Semaphore.wait/acquire(): %r' % (result, )
  144. except Timeout as ex:
  145. if ex is not timer:
  146. raise
  147. return ex
  148. finally:
  149. timer.cancel()
  150. finally:
  151. self.unlink(switch)
  152. def wait(self, timeout=None):
  153. """
  154. wait(timeout=None) -> int
  155. Wait until it is possible to acquire this semaphore, or until the optional
  156. *timeout* elapses.
  157. .. caution:: If this semaphore was initialized with a size of 0,
  158. this method will block forever if no timeout is given.
  159. :keyword float timeout: If given, specifies the maximum amount of seconds
  160. this method will block.
  161. :return: A number indicating how many times the semaphore can be acquired
  162. before blocking.
  163. """
  164. if self.counter > 0:
  165. return self.counter
  166. self._do_wait(timeout) # return value irrelevant, whether we got it or got a timeout
  167. return self.counter
  168. def acquire(self, blocking=True, timeout=None):
  169. """
  170. acquire(blocking=True, timeout=None) -> bool
  171. Acquire the semaphore.
  172. .. caution:: If this semaphore was initialized with a size of 0,
  173. this method will block forever (unless a timeout is given or blocking is
  174. set to false).
  175. :keyword bool blocking: If True (the default), this function will block
  176. until the semaphore is acquired.
  177. :keyword float timeout: If given, specifies the maximum amount of seconds
  178. this method will block.
  179. :return: A boolean indicating whether the semaphore was acquired.
  180. If ``blocking`` is True and ``timeout`` is None (the default), then
  181. (so long as this semaphore was initialized with a size greater than 0)
  182. this will always return True. If a timeout was given, and it expired before
  183. the semaphore was acquired, False will be returned. (Note that this can still
  184. raise a ``Timeout`` exception, if some other caller had already started a timer.)
  185. """
  186. if self.counter > 0:
  187. self.counter -= 1
  188. return True
  189. if not blocking:
  190. return False
  191. timeout = self._do_wait(timeout)
  192. if timeout is not None:
  193. # Our timer expired.
  194. return False
  195. # Neither our timer no another one expired, so we blocked until
  196. # awoke. Therefore, the counter is ours
  197. self.counter -= 1
  198. assert self.counter >= 0
  199. return True
  200. _py3k_acquire = acquire # PyPy needs this; it must be static for Cython
  201. def __enter__(self):
  202. self.acquire()
  203. def __exit__(self, t, v, tb):
  204. self.release()
  205. class BoundedSemaphore(Semaphore):
  206. """
  207. BoundedSemaphore(value=1) -> BoundedSemaphore
  208. A bounded semaphore checks to make sure its current value doesn't
  209. exceed its initial value. If it does, :class:`ValueError` is
  210. raised. In most situations semaphores are used to guard resources
  211. with limited capacity. If the semaphore is released too many times
  212. it's a sign of a bug.
  213. If not given, *value* defaults to 1.
  214. """
  215. #: For monkey-patching, allow changing the class of error we raise
  216. _OVER_RELEASE_ERROR = ValueError
  217. def __init__(self, *args, **kwargs):
  218. Semaphore.__init__(self, *args, **kwargs)
  219. self._initial_value = self.counter
  220. def release(self):
  221. if self.counter >= self._initial_value:
  222. raise self._OVER_RELEASE_ERROR("Semaphore released too many times")
  223. return Semaphore.release(self)