thread_util.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. # Copyright 2012-2015 MongoDB, Inc.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Utilities for multi-threading support."""
  15. import threading
  16. try:
  17. from time import monotonic as _time
  18. except ImportError:
  19. from time import time as _time
  20. from pymongo.monotonic import time as _time
  21. from pymongo.errors import ExceededMaxWaiters
  22. ### Begin backport from CPython 3.2 for timeout support for Semaphore.acquire
  23. class Semaphore:
  24. # After Tim Peters' semaphore class, but not quite the same (no maximum)
  25. def __init__(self, value=1):
  26. if value < 0:
  27. raise ValueError("semaphore initial value must be >= 0")
  28. self._cond = threading.Condition(threading.Lock())
  29. self._value = value
  30. def acquire(self, blocking=True, timeout=None):
  31. if not blocking and timeout is not None:
  32. raise ValueError("can't specify timeout for non-blocking acquire")
  33. rc = False
  34. endtime = None
  35. with self._cond:
  36. while self._value == 0:
  37. if not blocking:
  38. break
  39. if timeout is not None:
  40. if endtime is None:
  41. endtime = _time() + timeout
  42. else:
  43. timeout = endtime - _time()
  44. if timeout <= 0:
  45. break
  46. self._cond.wait(timeout)
  47. else:
  48. self._value = self._value - 1
  49. rc = True
  50. return rc
  51. __enter__ = acquire
  52. def release(self):
  53. with self._cond:
  54. self._value = self._value + 1
  55. self._cond.notify()
  56. def __exit__(self, t, v, tb):
  57. self.release()
  58. @property
  59. def counter(self):
  60. return self._value
  61. class BoundedSemaphore(Semaphore):
  62. """Semaphore that checks that # releases is <= # acquires"""
  63. def __init__(self, value=1):
  64. Semaphore.__init__(self, value)
  65. self._initial_value = value
  66. def release(self):
  67. if self._value >= self._initial_value:
  68. raise ValueError("Semaphore released too many times")
  69. return Semaphore.release(self)
  70. ### End backport from CPython 3.2
  71. class DummySemaphore(object):
  72. def __init__(self, value=None):
  73. pass
  74. def acquire(self, blocking=True, timeout=None):
  75. return True
  76. def release(self):
  77. pass
  78. class MaxWaitersBoundedSemaphore(object):
  79. def __init__(self, semaphore_class, value=1, max_waiters=1):
  80. self.waiter_semaphore = semaphore_class(max_waiters)
  81. self.semaphore = semaphore_class(value)
  82. def acquire(self, blocking=True, timeout=None):
  83. if not self.waiter_semaphore.acquire(False):
  84. raise ExceededMaxWaiters()
  85. try:
  86. return self.semaphore.acquire(blocking, timeout)
  87. finally:
  88. self.waiter_semaphore.release()
  89. def __getattr__(self, name):
  90. return getattr(self.semaphore, name)
  91. class MaxWaitersBoundedSemaphoreThread(MaxWaitersBoundedSemaphore):
  92. def __init__(self, value=1, max_waiters=1):
  93. MaxWaitersBoundedSemaphore.__init__(
  94. self, BoundedSemaphore, value, max_waiters)
  95. def create_semaphore(max_size, max_waiters):
  96. if max_size is None:
  97. return DummySemaphore()
  98. else:
  99. if max_waiters is None:
  100. return BoundedSemaphore(max_size)
  101. else:
  102. return MaxWaitersBoundedSemaphoreThread(max_size, max_waiters)