# -*- coding: utf-8 -*- # !/usr/bin/env python import logging import shutil import sys import tempfile import threading import uuid from typing import Union, TYPE_CHECKING, Optional from contextlib import contextmanager from django.core.cache import CacheHandler, DefaultCacheProxy, caches from kombu.five import monotonic logger = logging.getLogger(__name__) if TYPE_CHECKING: from contextlib import GeneratorContextManager class ThreadLock(object): def __init__(self): self._lock = None def acquire_lock(self): """ Acquire the module-level lock for serializing access to shared data. This should be released with _releaseLock(). """ if not self._lock: self._lock = threading.RLock() if self._lock: self._lock.acquire() def release_lock(self): """ Release the module-level lock acquired by calling _acquireLock(). """ if self._lock: self._lock.release() @contextmanager def memcache_lock(key, value, expire = 60 * 10, mc = caches['lock']): # type: (str, Optional[str,int], int, Union[DefaultCacheProxy, CacheHandler])->GeneratorContextManager """ Example usage ``` with memcache_lock(cache, lock_id, self.app.oid) as acquired: if acquired: return Feed.objects.import_feed(feed_url).url logger.debug( 'Feed %s is already being imported by another worker', feed_url) ``` :param mc: :param key: :param value: :param expire :return: """ value = str(value) timeout_at = monotonic() + expire - 3 status = mc.add(key, value, expire) logger.debug('memcache_lock add result is: {}; key is: {}'.format(status, key)) try: yield status finally: # memcache delete is very slow, but we have to use it to take # advantage of using add() for atomic locking if monotonic() < timeout_at and status: # don't release the lock if we exceeded the timeout # to lessen the chance of releasing an expired lock # owned by someone else # also don't release the lock if we didn't acquire it result = mc.delete(key) logger.debug('memcache_lock delete result is: {}; key is: {}'.format(result, key)) class MemcachedLock(object): """ Try to do same as threading.Lock, but using Memcached to store lock instance to do a distributed lock """ def __init__(self, key, value, expire = 360, mc = caches['lock']): # type: (str, str, int, Union[DefaultCacheProxy, CacheHandler])->MemcachedLock self.key = key self.mc = mc self.timeout = expire self.instance_id = '{}:{}'.format(uuid.uuid1().hex, value) self.timeout_at = monotonic() + expire - 3 def __repr__(self): return 'MemcachedLock'.format(self.key, self.instance_id) def acquire(self): logger.debug('=== MemcachedLock === try to acquire memcache lock {}'.format(repr(self))) added = self.mc.add(self.key, self.instance_id, self.timeout) logger.debug("=== MemcachedLock === Added=%s" % repr(added)) if added: logger.debug('=== MemcachedLock === acquired memcache lock {}'.format(repr(self))) return True if added == 0 and not (added is False): raise RuntimeError( u"=== MemcachedLock === Error calling memcached add! Is memcached up and configured? memcached_client.add returns %s" % repr( added)) return False def release(self): logger.debug('=== MemcachedLock === try to release memcache lock {}'.format(repr(self))) value = self.mc.get(self.key) if value == self.instance_id: # Avoid short timeout, because if key expires, after GET, and another lock occurs, memcached remove # below can delete another lock! There is no way to solve this in memcached result = self.mc.delete(self.key) logger.debug('=== MemcachedLock === delete result is: {}'.format(str(result))) else: logger.warn( "=== MemcachedLock === no lock to release {}. Increase TIMEOUT of lock operations".format(repr(self))) @property def locked(self): return True if self.mc.get(self.key) is not None else False PY3 = sys.version_info[0] == 3 @contextmanager def MyStringIO(): from six import StringIO try: fi = StringIO() yield fi finally: fi.close() class TemporaryDirectory(object): def __enter__(self): self.name = tempfile.mkdtemp() return self.name def __exit__(self, exc_type, exc_value, traceback): shutil.rmtree(self.name)