lock.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. import threading
  2. import time as mod_time
  3. import uuid
  4. from redis.exceptions import LockError, WatchError
  5. from redis.utils import dummy
  6. from redis._compat import b
  7. class Lock(object):
  8. """
  9. A shared, distributed Lock. Using Redis for locking allows the Lock
  10. to be shared across processes and/or machines.
  11. It's left to the user to resolve deadlock issues and make sure
  12. multiple clients play nicely together.
  13. """
  14. def __init__(self, redis, name, timeout=None, sleep=0.1,
  15. blocking=True, blocking_timeout=None, thread_local=True):
  16. """
  17. Create a new Lock instance named ``name`` using the Redis client
  18. supplied by ``redis``.
  19. ``timeout`` indicates a maximum life for the lock.
  20. By default, it will remain locked until release() is called.
  21. ``timeout`` can be specified as a float or integer, both representing
  22. the number of seconds to wait.
  23. ``sleep`` indicates the amount of time to sleep per loop iteration
  24. when the lock is in blocking mode and another client is currently
  25. holding the lock.
  26. ``blocking`` indicates whether calling ``acquire`` should block until
  27. the lock has been acquired or to fail immediately, causing ``acquire``
  28. to return False and the lock not being acquired. Defaults to True.
  29. Note this value can be overridden by passing a ``blocking``
  30. argument to ``acquire``.
  31. ``blocking_timeout`` indicates the maximum amount of time in seconds to
  32. spend trying to acquire the lock. A value of ``None`` indicates
  33. continue trying forever. ``blocking_timeout`` can be specified as a
  34. float or integer, both representing the number of seconds to wait.
  35. ``thread_local`` indicates whether the lock token is placed in
  36. thread-local storage. By default, the token is placed in thread local
  37. storage so that a thread only sees its token, not a token set by
  38. another thread. Consider the following timeline:
  39. time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
  40. thread-1 sets the token to "abc"
  41. time: 1, thread-2 blocks trying to acquire `my-lock` using the
  42. Lock instance.
  43. time: 5, thread-1 has not yet completed. redis expires the lock
  44. key.
  45. time: 5, thread-2 acquired `my-lock` now that it's available.
  46. thread-2 sets the token to "xyz"
  47. time: 6, thread-1 finishes its work and calls release(). if the
  48. token is *not* stored in thread local storage, then
  49. thread-1 would see the token value as "xyz" and would be
  50. able to successfully release the thread-2's lock.
  51. In some use cases it's necessary to disable thread local storage. For
  52. example, if you have code where one thread acquires a lock and passes
  53. that lock instance to a worker thread to release later. If thread
  54. local storage isn't disabled in this case, the worker thread won't see
  55. the token set by the thread that acquired the lock. Our assumption
  56. is that these cases aren't common and as such default to using
  57. thread local storage.
  58. """
  59. self.redis = redis
  60. self.name = name
  61. self.timeout = timeout
  62. self.sleep = sleep
  63. self.blocking = blocking
  64. self.blocking_timeout = blocking_timeout
  65. self.thread_local = bool(thread_local)
  66. self.local = threading.local() if self.thread_local else dummy()
  67. self.local.token = None
  68. if self.timeout and self.sleep > self.timeout:
  69. raise LockError("'sleep' must be less than 'timeout'")
  70. def __enter__(self):
  71. # force blocking, as otherwise the user would have to check whether
  72. # the lock was actually acquired or not.
  73. self.acquire(blocking=True)
  74. return self
  75. def __exit__(self, exc_type, exc_value, traceback):
  76. self.release()
  77. def acquire(self, blocking=None, blocking_timeout=None):
  78. """
  79. Use Redis to hold a shared, distributed lock named ``name``.
  80. Returns True once the lock is acquired.
  81. If ``blocking`` is False, always return immediately. If the lock
  82. was acquired, return True, otherwise return False.
  83. ``blocking_timeout`` specifies the maximum number of seconds to
  84. wait trying to acquire the lock.
  85. """
  86. sleep = self.sleep
  87. token = b(uuid.uuid1().hex)
  88. if blocking is None:
  89. blocking = self.blocking
  90. if blocking_timeout is None:
  91. blocking_timeout = self.blocking_timeout
  92. stop_trying_at = None
  93. if blocking_timeout is not None:
  94. stop_trying_at = mod_time.time() + blocking_timeout
  95. while 1:
  96. if self.do_acquire(token):
  97. self.local.token = token
  98. return True
  99. if not blocking:
  100. return False
  101. if stop_trying_at is not None and mod_time.time() > stop_trying_at:
  102. return False
  103. mod_time.sleep(sleep)
  104. def do_acquire(self, token):
  105. if self.redis.setnx(self.name, token):
  106. if self.timeout:
  107. # convert to milliseconds
  108. timeout = int(self.timeout * 1000)
  109. self.redis.pexpire(self.name, timeout)
  110. return True
  111. return False
  112. def release(self):
  113. "Releases the already acquired lock"
  114. expected_token = self.local.token
  115. if expected_token is None:
  116. raise LockError("Cannot release an unlocked lock")
  117. self.local.token = None
  118. self.do_release(expected_token)
  119. def do_release(self, expected_token):
  120. name = self.name
  121. def execute_release(pipe):
  122. lock_value = pipe.get(name)
  123. if lock_value != expected_token:
  124. raise LockError("Cannot release a lock that's no longer owned")
  125. pipe.delete(name)
  126. self.redis.transaction(execute_release, name)
  127. def extend(self, additional_time):
  128. """
  129. Adds more time to an already acquired lock.
  130. ``additional_time`` can be specified as an integer or a float, both
  131. representing the number of seconds to add.
  132. """
  133. if self.local.token is None:
  134. raise LockError("Cannot extend an unlocked lock")
  135. if self.timeout is None:
  136. raise LockError("Cannot extend a lock with no timeout")
  137. return self.do_extend(additional_time)
  138. def do_extend(self, additional_time):
  139. pipe = self.redis.pipeline()
  140. pipe.watch(self.name)
  141. lock_value = pipe.get(self.name)
  142. if lock_value != self.local.token:
  143. raise LockError("Cannot extend a lock that's no longer owned")
  144. expiration = pipe.pttl(self.name)
  145. if expiration is None or expiration < 0:
  146. # Redis evicted the lock key between the previous get() and now
  147. # we'll handle this when we call pexpire()
  148. expiration = 0
  149. pipe.multi()
  150. pipe.pexpire(self.name, expiration + int(additional_time * 1000))
  151. try:
  152. response = pipe.execute()
  153. except WatchError:
  154. # someone else acquired the lock
  155. raise LockError("Cannot extend a lock that's no longer owned")
  156. if not response[0]:
  157. # pexpire returns False if the key doesn't exist
  158. raise LockError("Cannot extend a lock that's no longer owned")
  159. return True
  160. class LuaLock(Lock):
  161. """
  162. A lock implementation that uses Lua scripts rather than pipelines
  163. and watches.
  164. """
  165. lua_acquire = None
  166. lua_release = None
  167. lua_extend = None
  168. # KEYS[1] - lock name
  169. # ARGV[1] - token
  170. # ARGV[2] - timeout in milliseconds
  171. # return 1 if lock was acquired, otherwise 0
  172. LUA_ACQUIRE_SCRIPT = """
  173. if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then
  174. if ARGV[2] ~= '' then
  175. redis.call('pexpire', KEYS[1], ARGV[2])
  176. end
  177. return 1
  178. end
  179. return 0
  180. """
  181. # KEYS[1] - lock name
  182. # ARGS[1] - token
  183. # return 1 if the lock was released, otherwise 0
  184. LUA_RELEASE_SCRIPT = """
  185. local token = redis.call('get', KEYS[1])
  186. if not token or token ~= ARGV[1] then
  187. return 0
  188. end
  189. redis.call('del', KEYS[1])
  190. return 1
  191. """
  192. # KEYS[1] - lock name
  193. # ARGS[1] - token
  194. # ARGS[2] - additional milliseconds
  195. # return 1 if the locks time was extended, otherwise 0
  196. LUA_EXTEND_SCRIPT = """
  197. local token = redis.call('get', KEYS[1])
  198. if not token or token ~= ARGV[1] then
  199. return 0
  200. end
  201. local expiration = redis.call('pttl', KEYS[1])
  202. if not expiration then
  203. expiration = 0
  204. end
  205. if expiration < 0 then
  206. return 0
  207. end
  208. redis.call('pexpire', KEYS[1], expiration + ARGV[2])
  209. return 1
  210. """
  211. def __init__(self, *args, **kwargs):
  212. super(LuaLock, self).__init__(*args, **kwargs)
  213. LuaLock.register_scripts(self.redis)
  214. @classmethod
  215. def register_scripts(cls, redis):
  216. if cls.lua_acquire is None:
  217. cls.lua_acquire = redis.register_script(cls.LUA_ACQUIRE_SCRIPT)
  218. if cls.lua_release is None:
  219. cls.lua_release = redis.register_script(cls.LUA_RELEASE_SCRIPT)
  220. if cls.lua_extend is None:
  221. cls.lua_extend = redis.register_script(cls.LUA_EXTEND_SCRIPT)
  222. def do_acquire(self, token):
  223. timeout = self.timeout and int(self.timeout * 1000) or ''
  224. return bool(self.lua_acquire(keys=[self.name],
  225. args=[token, timeout],
  226. client=self.redis))
  227. def do_release(self, expected_token):
  228. if not bool(self.lua_release(keys=[self.name],
  229. args=[expected_token],
  230. client=self.redis)):
  231. raise LockError("Cannot release a lock that's no longer owned")
  232. def do_extend(self, additional_time):
  233. additional_time = int(additional_time * 1000)
  234. if not bool(self.lua_extend(keys=[self.name],
  235. args=[self.local.token, additional_time],
  236. client=self.redis)):
  237. raise LockError("Cannot extend a lock that's no longer owned")
  238. return True