cache.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. # Tweepy
  2. # Copyright 2009-2010 Joshua Roesslein
  3. # See LICENSE for details.
  4. from __future__ import print_function
  5. import time
  6. import datetime
  7. import threading
  8. import os
  9. try:
  10. import cPickle as pickle
  11. except ImportError:
  12. import pickle
  13. try:
  14. import hashlib
  15. except ImportError:
  16. # python 2.4
  17. import md5 as hashlib
  18. try:
  19. import fcntl
  20. except ImportError:
  21. # Probably on a windows system
  22. # TODO: use win32file
  23. pass
  24. class Cache(object):
  25. """Cache interface"""
  26. def __init__(self, timeout=60):
  27. """Initialize the cache
  28. timeout: number of seconds to keep a cached entry
  29. """
  30. self.timeout = timeout
  31. def store(self, key, value):
  32. """Add new record to cache
  33. key: entry key
  34. value: data of entry
  35. """
  36. raise NotImplementedError
  37. def get(self, key, timeout=None):
  38. """Get cached entry if exists and not expired
  39. key: which entry to get
  40. timeout: override timeout with this value [optional]
  41. """
  42. raise NotImplementedError
  43. def count(self):
  44. """Get count of entries currently stored in cache"""
  45. raise NotImplementedError
  46. def cleanup(self):
  47. """Delete any expired entries in cache."""
  48. raise NotImplementedError
  49. def flush(self):
  50. """Delete all cached entries"""
  51. raise NotImplementedError
  52. class MemoryCache(Cache):
  53. """In-memory cache"""
  54. def __init__(self, timeout=60):
  55. Cache.__init__(self, timeout)
  56. self._entries = {}
  57. self.lock = threading.Lock()
  58. def __getstate__(self):
  59. # pickle
  60. return {'entries': self._entries, 'timeout': self.timeout}
  61. def __setstate__(self, state):
  62. # unpickle
  63. self.lock = threading.Lock()
  64. self._entries = state['entries']
  65. self.timeout = state['timeout']
  66. def _is_expired(self, entry, timeout):
  67. return timeout > 0 and (time.time() - entry[0]) >= timeout
  68. def store(self, key, value):
  69. self.lock.acquire()
  70. self._entries[key] = (time.time(), value)
  71. self.lock.release()
  72. def get(self, key, timeout=None):
  73. self.lock.acquire()
  74. try:
  75. # check to see if we have this key
  76. entry = self._entries.get(key)
  77. if not entry:
  78. # no hit, return nothing
  79. return None
  80. # use provided timeout in arguments if provided
  81. # otherwise use the one provided during init.
  82. if timeout is None:
  83. timeout = self.timeout
  84. # make sure entry is not expired
  85. if self._is_expired(entry, timeout):
  86. # entry expired, delete and return nothing
  87. del self._entries[key]
  88. return None
  89. # entry found and not expired, return it
  90. return entry[1]
  91. finally:
  92. self.lock.release()
  93. def count(self):
  94. return len(self._entries)
  95. def cleanup(self):
  96. self.lock.acquire()
  97. try:
  98. for k, v in dict(self._entries).items():
  99. if self._is_expired(v, self.timeout):
  100. del self._entries[k]
  101. finally:
  102. self.lock.release()
  103. def flush(self):
  104. self.lock.acquire()
  105. self._entries.clear()
  106. self.lock.release()
  107. class FileCache(Cache):
  108. """File-based cache"""
  109. # locks used to make cache thread-safe
  110. cache_locks = {}
  111. def __init__(self, cache_dir, timeout=60):
  112. Cache.__init__(self, timeout)
  113. if os.path.exists(cache_dir) is False:
  114. os.mkdir(cache_dir)
  115. self.cache_dir = cache_dir
  116. if cache_dir in FileCache.cache_locks:
  117. self.lock = FileCache.cache_locks[cache_dir]
  118. else:
  119. self.lock = threading.Lock()
  120. FileCache.cache_locks[cache_dir] = self.lock
  121. if os.name == 'posix':
  122. self._lock_file = self._lock_file_posix
  123. self._unlock_file = self._unlock_file_posix
  124. elif os.name == 'nt':
  125. self._lock_file = self._lock_file_win32
  126. self._unlock_file = self._unlock_file_win32
  127. else:
  128. print('Warning! FileCache locking not supported on this system!')
  129. self._lock_file = self._lock_file_dummy
  130. self._unlock_file = self._unlock_file_dummy
  131. def _get_path(self, key):
  132. md5 = hashlib.md5()
  133. md5.update(key.encode('utf-8'))
  134. return os.path.join(self.cache_dir, md5.hexdigest())
  135. def _lock_file_dummy(self, path, exclusive=True):
  136. return None
  137. def _unlock_file_dummy(self, lock):
  138. return
  139. def _lock_file_posix(self, path, exclusive=True):
  140. lock_path = path + '.lock'
  141. if exclusive is True:
  142. f_lock = open(lock_path, 'w')
  143. fcntl.lockf(f_lock, fcntl.LOCK_EX)
  144. else:
  145. f_lock = open(lock_path, 'r')
  146. fcntl.lockf(f_lock, fcntl.LOCK_SH)
  147. if os.path.exists(lock_path) is False:
  148. f_lock.close()
  149. return None
  150. return f_lock
  151. def _unlock_file_posix(self, lock):
  152. lock.close()
  153. def _lock_file_win32(self, path, exclusive=True):
  154. # TODO: implement
  155. return None
  156. def _unlock_file_win32(self, lock):
  157. # TODO: implement
  158. return
  159. def _delete_file(self, path):
  160. os.remove(path)
  161. if os.path.exists(path + '.lock'):
  162. os.remove(path + '.lock')
  163. def store(self, key, value):
  164. path = self._get_path(key)
  165. self.lock.acquire()
  166. try:
  167. # acquire lock and open file
  168. f_lock = self._lock_file(path)
  169. datafile = open(path, 'wb')
  170. # write data
  171. pickle.dump((time.time(), value), datafile)
  172. # close and unlock file
  173. datafile.close()
  174. self._unlock_file(f_lock)
  175. finally:
  176. self.lock.release()
  177. def get(self, key, timeout=None):
  178. return self._get(self._get_path(key), timeout)
  179. def _get(self, path, timeout):
  180. if os.path.exists(path) is False:
  181. # no record
  182. return None
  183. self.lock.acquire()
  184. try:
  185. # acquire lock and open
  186. f_lock = self._lock_file(path, False)
  187. datafile = open(path, 'rb')
  188. # read pickled object
  189. created_time, value = pickle.load(datafile)
  190. datafile.close()
  191. # check if value is expired
  192. if timeout is None:
  193. timeout = self.timeout
  194. if timeout > 0:
  195. if (time.time() - created_time) >= timeout:
  196. # expired! delete from cache
  197. value = None
  198. self._delete_file(path)
  199. # unlock and return result
  200. self._unlock_file(f_lock)
  201. return value
  202. finally:
  203. self.lock.release()
  204. def count(self):
  205. c = 0
  206. for entry in os.listdir(self.cache_dir):
  207. if entry.endswith('.lock'):
  208. continue
  209. c += 1
  210. return c
  211. def cleanup(self):
  212. for entry in os.listdir(self.cache_dir):
  213. if entry.endswith('.lock'):
  214. continue
  215. self._get(os.path.join(self.cache_dir, entry), None)
  216. def flush(self):
  217. for entry in os.listdir(self.cache_dir):
  218. if entry.endswith('.lock'):
  219. continue
  220. self._delete_file(os.path.join(self.cache_dir, entry))
  221. class MemCacheCache(Cache):
  222. """Cache interface"""
  223. def __init__(self, client, timeout=60):
  224. """Initialize the cache
  225. client: The memcache client
  226. timeout: number of seconds to keep a cached entry
  227. """
  228. self.client = client
  229. self.timeout = timeout
  230. def store(self, key, value):
  231. """Add new record to cache
  232. key: entry key
  233. value: data of entry
  234. """
  235. self.client.set(key, value, time=self.timeout)
  236. def get(self, key, timeout=None):
  237. """Get cached entry if exists and not expired
  238. key: which entry to get
  239. timeout: override timeout with this value [optional].
  240. DOES NOT WORK HERE
  241. """
  242. return self.client.get(key)
  243. def count(self):
  244. """Get count of entries currently stored in cache. RETURN 0"""
  245. raise NotImplementedError
  246. def cleanup(self):
  247. """Delete any expired entries in cache. NO-OP"""
  248. raise NotImplementedError
  249. def flush(self):
  250. """Delete all cached entries. NO-OP"""
  251. raise NotImplementedError
  252. class RedisCache(Cache):
  253. """Cache running in a redis server"""
  254. def __init__(self, client,
  255. timeout=60,
  256. keys_container='tweepy:keys',
  257. pre_identifier='tweepy:'):
  258. Cache.__init__(self, timeout)
  259. self.client = client
  260. self.keys_container = keys_container
  261. self.pre_identifier = pre_identifier
  262. def _is_expired(self, entry, timeout):
  263. # Returns true if the entry has expired
  264. return timeout > 0 and (time.time() - entry[0]) >= timeout
  265. def store(self, key, value):
  266. """Store the key, value pair in our redis server"""
  267. # Prepend tweepy to our key,
  268. # this makes it easier to identify tweepy keys in our redis server
  269. key = self.pre_identifier + key
  270. # Get a pipe (to execute several redis commands in one step)
  271. pipe = self.client.pipeline()
  272. # Set our values in a redis hash (similar to python dict)
  273. pipe.set(key, pickle.dumps((time.time(), value)))
  274. # Set the expiration
  275. pipe.expire(key, self.timeout)
  276. # Add the key to a set containing all the keys
  277. pipe.sadd(self.keys_container, key)
  278. # Execute the instructions in the redis server
  279. pipe.execute()
  280. def get(self, key, timeout=None):
  281. """Given a key, returns an element from the redis table"""
  282. key = self.pre_identifier + key
  283. # Check to see if we have this key
  284. unpickled_entry = self.client.get(key)
  285. if not unpickled_entry:
  286. # No hit, return nothing
  287. return None
  288. entry = pickle.loads(unpickled_entry)
  289. # Use provided timeout in arguments if provided
  290. # otherwise use the one provided during init.
  291. if timeout is None:
  292. timeout = self.timeout
  293. # Make sure entry is not expired
  294. if self._is_expired(entry, timeout):
  295. # entry expired, delete and return nothing
  296. self.delete_entry(key)
  297. return None
  298. # entry found and not expired, return it
  299. return entry[1]
  300. def count(self):
  301. """Note: This is not very efficient,
  302. since it retreives all the keys from the redis
  303. server to know how many keys we have"""
  304. return len(self.client.smembers(self.keys_container))
  305. def delete_entry(self, key):
  306. """Delete an object from the redis table"""
  307. pipe = self.client.pipeline()
  308. pipe.srem(self.keys_container, key)
  309. pipe.delete(key)
  310. pipe.execute()
  311. def cleanup(self):
  312. """Cleanup all the expired keys"""
  313. keys = self.client.smembers(self.keys_container)
  314. for key in keys:
  315. entry = self.client.get(key)
  316. if entry:
  317. entry = pickle.loads(entry)
  318. if self._is_expired(entry, self.timeout):
  319. self.delete_entry(key)
  320. def flush(self):
  321. """Delete all entries from the cache"""
  322. keys = self.client.smembers(self.keys_container)
  323. for key in keys:
  324. self.delete_entry(key)
  325. class MongodbCache(Cache):
  326. """A simple pickle-based MongoDB cache sytem."""
  327. def __init__(self, db, timeout=3600, collection='tweepy_cache'):
  328. """Should receive a "database" cursor from pymongo."""
  329. Cache.__init__(self, timeout)
  330. self.timeout = timeout
  331. self.col = db[collection]
  332. self.col.create_index('created', expireAfterSeconds=timeout)
  333. def store(self, key, value):
  334. from bson.binary import Binary
  335. now = datetime.datetime.utcnow()
  336. blob = Binary(pickle.dumps(value))
  337. self.col.insert({'created': now, '_id': key, 'value': blob})
  338. def get(self, key, timeout=None):
  339. if timeout:
  340. raise NotImplementedError
  341. obj = self.col.find_one({'_id': key})
  342. if obj:
  343. return pickle.loads(obj['value'])
  344. def count(self):
  345. return self.col.find({}).count()
  346. def delete_entry(self, key):
  347. return self.col.remove({'_id': key})
  348. def cleanup(self):
  349. """MongoDB will automatically clear expired keys."""
  350. pass
  351. def flush(self):
  352. self.col.drop()
  353. self.col.create_index('created', expireAfterSeconds=self.timeout)