123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import logging
- import shutil
- import sys
- import tempfile
- import threading
- import uuid
- from typing import Union, TYPE_CHECKING, Optional
- from contextlib import contextmanager
- from django.core.cache import CacheHandler, DefaultCacheProxy, caches
- from kombu.five import monotonic
- logger = logging.getLogger(__name__)
- if TYPE_CHECKING:
- from contextlib import GeneratorContextManager
- class ThreadLock(object):
- def __init__(self):
- self._lock = None
- def acquire_lock(self):
- """
- Acquire the module-level lock for serializing access to shared data.
- This should be released with _releaseLock().
- """
- if not self._lock:
- self._lock = threading.RLock()
- if self._lock:
- self._lock.acquire()
- def release_lock(self):
- """
- Release the module-level lock acquired by calling _acquireLock().
- """
- if self._lock:
- self._lock.release()
- @contextmanager
- def memcache_lock(key, value, expire = 60 * 10, mc = caches['lock']):
- # type: (str, Optional[str,int], int, Union[DefaultCacheProxy, CacheHandler])->GeneratorContextManager
- """
- Example usage
- ```
- with memcache_lock(cache, lock_id, self.app.oid) as acquired:
- if acquired:
- return Feed.objects.import_feed(feed_url).url
- logger.debug(
- 'Feed %s is already being imported by another worker', feed_url)
- ```
- :param mc:
- :param key:
- :param value:
- :param expire
- :return:
- """
- value = str(value)
- timeout_at = monotonic() + expire - 3
- status = mc.add(key, value, expire)
- logger.debug('memcache_lock add result is: {}; key is: {}'.format(status, key))
- try:
- yield status
- finally:
- # memcache delete is very slow, but we have to use it to take
- # advantage of using add() for atomic locking
- if monotonic() < timeout_at and status:
- # don't release the lock if we exceeded the timeout
- # to lessen the chance of releasing an expired lock
- # owned by someone else
- # also don't release the lock if we didn't acquire it
- result = mc.delete(key)
- logger.debug('memcache_lock delete result is: {}; key is: {}'.format(result, key))
- class MemcachedLock(object):
- """
- Try to do same as threading.Lock, but using Memcached to store lock instance to do a distributed lock
- """
- def __init__(self, key, value, expire = 360, mc = caches['lock']):
- # type: (str, str, int, Union[DefaultCacheProxy, CacheHandler])->MemcachedLock
- self.key = key
- self.mc = mc
- self.timeout = expire
- self.instance_id = '{}:{}'.format(uuid.uuid1().hex, value)
- self.timeout_at = monotonic() + expire - 3
- def __repr__(self):
- return 'MemcachedLock<key={}, id={}>'.format(self.key, self.instance_id)
- def acquire(self):
- logger.debug('=== MemcachedLock === try to acquire memcache lock {}'.format(repr(self)))
- added = self.mc.add(self.key, self.instance_id, self.timeout)
- logger.debug("=== MemcachedLock === Added=%s" % repr(added))
- if added:
- logger.debug('=== MemcachedLock === acquired memcache lock {}'.format(repr(self)))
- return True
- if added == 0 and not (added is False):
- raise RuntimeError(
- u"=== MemcachedLock === Error calling memcached add! Is memcached up and configured? memcached_client.add returns %s" % repr(
- added))
- return False
- def release(self):
- logger.debug('=== MemcachedLock === try to release memcache lock {}'.format(repr(self)))
- value = self.mc.get(self.key)
- if value == self.instance_id:
- # Avoid short timeout, because if key expires, after GET, and another lock occurs, memcached remove
- # below can delete another lock! There is no way to solve this in memcached
- result = self.mc.delete(self.key)
- logger.debug('=== MemcachedLock === delete result is: {}'.format(str(result)))
- else:
- logger.warn(
- "=== MemcachedLock === no lock to release {}. Increase TIMEOUT of lock operations".format(repr(self)))
- @property
- def locked(self):
- return True if self.mc.get(self.key) is not None else False
- PY3 = sys.version_info[0] == 3
- @contextmanager
- def MyStringIO():
- from six import StringIO
- try:
- fi = StringIO()
- yield fi
- finally:
- fi.close()
- class TemporaryDirectory(object):
- def __enter__(self):
- self.name = tempfile.mkdtemp()
- return self.name
- def __exit__(self, exc_type, exc_value, traceback):
- shutil.rmtree(self.name)
|