utils_sys.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import logging
  4. import sys
  5. import threading
  6. import uuid
  7. from typing import Union, TYPE_CHECKING, Optional
  8. from contextlib import contextmanager
  9. from django.core.cache import CacheHandler, DefaultCacheProxy, caches
  10. from kombu.five import monotonic
  11. logger = logging.getLogger(__name__)
  12. if TYPE_CHECKING:
  13. from contextlib import GeneratorContextManager
  14. class ThreadLock(object):
  15. def __init__(self):
  16. self._lock = None
  17. def acquire_lock(self):
  18. """
  19. Acquire the module-level lock for serializing access to shared data.
  20. This should be released with _releaseLock().
  21. """
  22. if not self._lock:
  23. self._lock = threading.RLock()
  24. if self._lock:
  25. self._lock.acquire()
  26. def release_lock(self):
  27. """
  28. Release the module-level lock acquired by calling _acquireLock().
  29. """
  30. if self._lock:
  31. self._lock.release()
  32. @contextmanager
  33. def memcache_lock(key, value, expire = 60 * 10, mc = caches['lock']):
  34. # type: (str, Optional[str,int], int, Union[DefaultCacheProxy, CacheHandler])->GeneratorContextManager
  35. """
  36. Example usage
  37. ```
  38. with memcache_lock(cache, lock_id, self.app.oid) as acquired:
  39. if acquired:
  40. return Feed.objects.import_feed(feed_url).url
  41. logger.debug(
  42. 'Feed %s is already being imported by another worker', feed_url)
  43. ```
  44. :param mc:
  45. :param key:
  46. :param value:
  47. :param expire
  48. :return:
  49. """
  50. value = str(value)
  51. timeout_at = monotonic() + expire - 3
  52. status = mc.add(key, value, expire)
  53. logger.debug('memcache_lock add result is: {}; key is: {}'.format(status, key))
  54. try:
  55. yield status
  56. finally:
  57. # memcache delete is very slow, but we have to use it to take
  58. # advantage of using add() for atomic locking
  59. if monotonic() < timeout_at and status:
  60. # don't release the lock if we exceeded the timeout
  61. # to lessen the chance of releasing an expired lock
  62. # owned by someone else
  63. # also don't release the lock if we didn't acquire it
  64. result = mc.delete(key)
  65. logger.debug('memcache_lock delete result is: {}; key is: {}'.format(result, key))
  66. class MemcachedLock(object):
  67. """
  68. Try to do same as threading.Lock, but using Memcached to store lock instance to do a distributed lock
  69. """
  70. def __init__(self, key, value, expire = 360, mc = caches['lock']):
  71. # type: (str, str, int, Union[DefaultCacheProxy, CacheHandler])->MemcachedLock
  72. self.key = key
  73. self.mc = mc
  74. self.timeout = expire
  75. self.instance_id = '{}:{}'.format(uuid.uuid1().hex, value)
  76. self.timeout_at = monotonic() + expire - 3
  77. def __repr__(self):
  78. return 'MemcachedLock<key={}, id={}>'.format(self.key, self.instance_id)
  79. def acquire(self):
  80. logger.debug('=== MemcachedLock === try to acquire memcache lock {}'.format(repr(self)))
  81. added = self.mc.add(self.key, self.instance_id, self.timeout)
  82. logger.debug("=== MemcachedLock === Added=%s" % repr(added))
  83. if added:
  84. logger.debug('=== MemcachedLock === acquired memcache lock {}'.format(repr(self)))
  85. return True
  86. if added == 0 and not (added is False):
  87. raise RuntimeError(
  88. u"=== MemcachedLock === Error calling memcached add! Is memcached up and configured? memcached_client.add returns %s" % repr(
  89. added))
  90. return False
  91. def release(self):
  92. logger.debug('=== MemcachedLock === try to release memcache lock {}'.format(repr(self)))
  93. value = self.mc.get(self.key)
  94. if value == self.instance_id:
  95. # Avoid short timeout, because if key expires, after GET, and another lock occurs, memcached remove
  96. # below can delete another lock! There is no way to solve this in memcached
  97. result = self.mc.delete(self.key)
  98. logger.debug('=== MemcachedLock === delete result is: {}'.format(str(result)))
  99. else:
  100. logger.warn("=== MemcachedLock === no lock to release {}. Increase TIMEOUT of lock operations".format(repr(self)))
  101. @property
  102. def locked(self):
  103. return True if self.mc.get(self.key) is not None else False
  104. PY3 = sys.version_info[0] == 3
  105. @contextmanager
  106. def MyStringIO():
  107. from six import StringIO
  108. try:
  109. fi = StringIO()
  110. yield fi
  111. finally:
  112. fi.close()