utils_sys.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import logging
  4. import shutil
  5. import sys
  6. import tempfile
  7. import threading
  8. import uuid
  9. from typing import Union, TYPE_CHECKING, Optional
  10. from contextlib import contextmanager
  11. from django.core.cache import CacheHandler, DefaultCacheProxy, caches
  12. from kombu.five import monotonic
  13. logger = logging.getLogger(__name__)
  14. if TYPE_CHECKING:
  15. from contextlib import GeneratorContextManager
  16. class ThreadLock(object):
  17. def __init__(self):
  18. self._lock = None
  19. def acquire_lock(self):
  20. """
  21. Acquire the module-level lock for serializing access to shared data.
  22. This should be released with _releaseLock().
  23. """
  24. if not self._lock:
  25. self._lock = threading.RLock()
  26. if self._lock:
  27. self._lock.acquire()
  28. def release_lock(self):
  29. """
  30. Release the module-level lock acquired by calling _acquireLock().
  31. """
  32. if self._lock:
  33. self._lock.release()
  34. @contextmanager
  35. def memcache_lock(key, value, expire = 60 * 10, mc = caches['lock']):
  36. # type: (str, Optional[str,int], int, Union[DefaultCacheProxy, CacheHandler])->GeneratorContextManager
  37. """
  38. Example usage
  39. ```
  40. with memcache_lock(cache, lock_id, self.app.oid) as acquired:
  41. if acquired:
  42. return Feed.objects.import_feed(feed_url).url
  43. logger.debug(
  44. 'Feed %s is already being imported by another worker', feed_url)
  45. ```
  46. :param mc:
  47. :param key:
  48. :param value:
  49. :param expire
  50. :return:
  51. """
  52. value = str(value)
  53. timeout_at = monotonic() + expire - 3
  54. status = mc.add(key, value, expire)
  55. logger.debug('memcache_lock add result is: {}; key is: {}'.format(status, key))
  56. try:
  57. yield status
  58. finally:
  59. # memcache delete is very slow, but we have to use it to take
  60. # advantage of using add() for atomic locking
  61. if monotonic() < timeout_at and status:
  62. # don't release the lock if we exceeded the timeout
  63. # to lessen the chance of releasing an expired lock
  64. # owned by someone else
  65. # also don't release the lock if we didn't acquire it
  66. result = mc.delete(key)
  67. logger.debug('memcache_lock delete result is: {}; key is: {}'.format(result, key))
  68. class MemcachedLock(object):
  69. """
  70. Try to do same as threading.Lock, but using Memcached to store lock instance to do a distributed lock
  71. """
  72. def __init__(self, key, value, expire = 360, mc = caches['lock']):
  73. # type: (str, str, int, Union[DefaultCacheProxy, CacheHandler])->MemcachedLock
  74. self.key = key
  75. self.mc = mc
  76. self.timeout = expire
  77. self.instance_id = '{}:{}'.format(uuid.uuid1().hex, value)
  78. self.timeout_at = monotonic() + expire - 3
  79. def __repr__(self):
  80. return 'MemcachedLock<key={}, id={}>'.format(self.key, self.instance_id)
  81. def acquire(self):
  82. logger.debug('=== MemcachedLock === try to acquire memcache lock {}'.format(repr(self)))
  83. added = self.mc.add(self.key, self.instance_id, self.timeout)
  84. logger.debug("=== MemcachedLock === Added=%s" % repr(added))
  85. if added:
  86. logger.debug('=== MemcachedLock === acquired memcache lock {}'.format(repr(self)))
  87. return True
  88. if added == 0 and not (added is False):
  89. raise RuntimeError(
  90. u"=== MemcachedLock === Error calling memcached add! Is memcached up and configured? memcached_client.add returns %s" % repr(
  91. added))
  92. return False
  93. def release(self):
  94. logger.debug('=== MemcachedLock === try to release memcache lock {}'.format(repr(self)))
  95. value = self.mc.get(self.key)
  96. if value == self.instance_id:
  97. # Avoid short timeout, because if key expires, after GET, and another lock occurs, memcached remove
  98. # below can delete another lock! There is no way to solve this in memcached
  99. result = self.mc.delete(self.key)
  100. logger.debug('=== MemcachedLock === delete result is: {}'.format(str(result)))
  101. else:
  102. logger.warn(
  103. "=== MemcachedLock === no lock to release {}. Increase TIMEOUT of lock operations".format(repr(self)))
  104. @property
  105. def locked(self):
  106. return True if self.mc.get(self.key) is not None else False
  107. PY3 = sys.version_info[0] == 3
  108. @contextmanager
  109. def MyStringIO():
  110. from six import StringIO
  111. try:
  112. fi = StringIO()
  113. yield fi
  114. finally:
  115. fi.close()
  116. class TemporaryDirectory(object):
  117. def __enter__(self):
  118. self.name = tempfile.mkdtemp()
  119. return self.name
  120. def __exit__(self, exc_type, exc_value, traceback):
  121. shutil.rmtree(self.name)