123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- # Tweepy
- # Copyright 2009-2010 Joshua Roesslein
- # See LICENSE for details.
- from __future__ import print_function
- import time
- import datetime
- import threading
- import os
- try:
- import cPickle as pickle
- except ImportError:
- import pickle
- try:
- import hashlib
- except ImportError:
- # python 2.4
- import md5 as hashlib
- try:
- import fcntl
- except ImportError:
- # Probably on a windows system
- # TODO: use win32file
- pass
- class Cache(object):
- """Cache interface"""
- def __init__(self, timeout=60):
- """Initialize the cache
- timeout: number of seconds to keep a cached entry
- """
- self.timeout = timeout
- def store(self, key, value):
- """Add new record to cache
- key: entry key
- value: data of entry
- """
- raise NotImplementedError
- def get(self, key, timeout=None):
- """Get cached entry if exists and not expired
- key: which entry to get
- timeout: override timeout with this value [optional]
- """
- raise NotImplementedError
- def count(self):
- """Get count of entries currently stored in cache"""
- raise NotImplementedError
- def cleanup(self):
- """Delete any expired entries in cache."""
- raise NotImplementedError
- def flush(self):
- """Delete all cached entries"""
- raise NotImplementedError
- class MemoryCache(Cache):
- """In-memory cache"""
- def __init__(self, timeout=60):
- Cache.__init__(self, timeout)
- self._entries = {}
- self.lock = threading.Lock()
- def __getstate__(self):
- # pickle
- return {'entries': self._entries, 'timeout': self.timeout}
- def __setstate__(self, state):
- # unpickle
- self.lock = threading.Lock()
- self._entries = state['entries']
- self.timeout = state['timeout']
- def _is_expired(self, entry, timeout):
- return timeout > 0 and (time.time() - entry[0]) >= timeout
- def store(self, key, value):
- self.lock.acquire()
- self._entries[key] = (time.time(), value)
- self.lock.release()
- def get(self, key, timeout=None):
- self.lock.acquire()
- try:
- # check to see if we have this key
- entry = self._entries.get(key)
- if not entry:
- # no hit, return nothing
- return None
- # use provided timeout in arguments if provided
- # otherwise use the one provided during init.
- if timeout is None:
- timeout = self.timeout
- # make sure entry is not expired
- if self._is_expired(entry, timeout):
- # entry expired, delete and return nothing
- del self._entries[key]
- return None
- # entry found and not expired, return it
- return entry[1]
- finally:
- self.lock.release()
- def count(self):
- return len(self._entries)
- def cleanup(self):
- self.lock.acquire()
- try:
- for k, v in dict(self._entries).items():
- if self._is_expired(v, self.timeout):
- del self._entries[k]
- finally:
- self.lock.release()
- def flush(self):
- self.lock.acquire()
- self._entries.clear()
- self.lock.release()
- class FileCache(Cache):
- """File-based cache"""
- # locks used to make cache thread-safe
- cache_locks = {}
- def __init__(self, cache_dir, timeout=60):
- Cache.__init__(self, timeout)
- if os.path.exists(cache_dir) is False:
- os.mkdir(cache_dir)
- self.cache_dir = cache_dir
- if cache_dir in FileCache.cache_locks:
- self.lock = FileCache.cache_locks[cache_dir]
- else:
- self.lock = threading.Lock()
- FileCache.cache_locks[cache_dir] = self.lock
- if os.name == 'posix':
- self._lock_file = self._lock_file_posix
- self._unlock_file = self._unlock_file_posix
- elif os.name == 'nt':
- self._lock_file = self._lock_file_win32
- self._unlock_file = self._unlock_file_win32
- else:
- print('Warning! FileCache locking not supported on this system!')
- self._lock_file = self._lock_file_dummy
- self._unlock_file = self._unlock_file_dummy
- def _get_path(self, key):
- md5 = hashlib.md5()
- md5.update(key.encode('utf-8'))
- return os.path.join(self.cache_dir, md5.hexdigest())
- def _lock_file_dummy(self, path, exclusive=True):
- return None
- def _unlock_file_dummy(self, lock):
- return
- def _lock_file_posix(self, path, exclusive=True):
- lock_path = path + '.lock'
- if exclusive is True:
- f_lock = open(lock_path, 'w')
- fcntl.lockf(f_lock, fcntl.LOCK_EX)
- else:
- f_lock = open(lock_path, 'r')
- fcntl.lockf(f_lock, fcntl.LOCK_SH)
- if os.path.exists(lock_path) is False:
- f_lock.close()
- return None
- return f_lock
- def _unlock_file_posix(self, lock):
- lock.close()
- def _lock_file_win32(self, path, exclusive=True):
- # TODO: implement
- return None
- def _unlock_file_win32(self, lock):
- # TODO: implement
- return
- def _delete_file(self, path):
- os.remove(path)
- if os.path.exists(path + '.lock'):
- os.remove(path + '.lock')
- def store(self, key, value):
- path = self._get_path(key)
- self.lock.acquire()
- try:
- # acquire lock and open file
- f_lock = self._lock_file(path)
- datafile = open(path, 'wb')
- # write data
- pickle.dump((time.time(), value), datafile)
- # close and unlock file
- datafile.close()
- self._unlock_file(f_lock)
- finally:
- self.lock.release()
- def get(self, key, timeout=None):
- return self._get(self._get_path(key), timeout)
- def _get(self, path, timeout):
- if os.path.exists(path) is False:
- # no record
- return None
- self.lock.acquire()
- try:
- # acquire lock and open
- f_lock = self._lock_file(path, False)
- datafile = open(path, 'rb')
- # read pickled object
- created_time, value = pickle.load(datafile)
- datafile.close()
- # check if value is expired
- if timeout is None:
- timeout = self.timeout
- if timeout > 0:
- if (time.time() - created_time) >= timeout:
- # expired! delete from cache
- value = None
- self._delete_file(path)
- # unlock and return result
- self._unlock_file(f_lock)
- return value
- finally:
- self.lock.release()
- def count(self):
- c = 0
- for entry in os.listdir(self.cache_dir):
- if entry.endswith('.lock'):
- continue
- c += 1
- return c
- def cleanup(self):
- for entry in os.listdir(self.cache_dir):
- if entry.endswith('.lock'):
- continue
- self._get(os.path.join(self.cache_dir, entry), None)
- def flush(self):
- for entry in os.listdir(self.cache_dir):
- if entry.endswith('.lock'):
- continue
- self._delete_file(os.path.join(self.cache_dir, entry))
- class MemCacheCache(Cache):
- """Cache interface"""
- def __init__(self, client, timeout=60):
- """Initialize the cache
- client: The memcache client
- timeout: number of seconds to keep a cached entry
- """
- self.client = client
- self.timeout = timeout
- def store(self, key, value):
- """Add new record to cache
- key: entry key
- value: data of entry
- """
- self.client.set(key, value, time=self.timeout)
- def get(self, key, timeout=None):
- """Get cached entry if exists and not expired
- key: which entry to get
- timeout: override timeout with this value [optional].
- DOES NOT WORK HERE
- """
- return self.client.get(key)
- def count(self):
- """Get count of entries currently stored in cache. RETURN 0"""
- raise NotImplementedError
- def cleanup(self):
- """Delete any expired entries in cache. NO-OP"""
- raise NotImplementedError
- def flush(self):
- """Delete all cached entries. NO-OP"""
- raise NotImplementedError
- class RedisCache(Cache):
- """Cache running in a redis server"""
- def __init__(self, client,
- timeout=60,
- keys_container='tweepy:keys',
- pre_identifier='tweepy:'):
- Cache.__init__(self, timeout)
- self.client = client
- self.keys_container = keys_container
- self.pre_identifier = pre_identifier
- def _is_expired(self, entry, timeout):
- # Returns true if the entry has expired
- return timeout > 0 and (time.time() - entry[0]) >= timeout
- def store(self, key, value):
- """Store the key, value pair in our redis server"""
- # Prepend tweepy to our key,
- # this makes it easier to identify tweepy keys in our redis server
- key = self.pre_identifier + key
- # Get a pipe (to execute several redis commands in one step)
- pipe = self.client.pipeline()
- # Set our values in a redis hash (similar to python dict)
- pipe.set(key, pickle.dumps((time.time(), value)))
- # Set the expiration
- pipe.expire(key, self.timeout)
- # Add the key to a set containing all the keys
- pipe.sadd(self.keys_container, key)
- # Execute the instructions in the redis server
- pipe.execute()
- def get(self, key, timeout=None):
- """Given a key, returns an element from the redis table"""
- key = self.pre_identifier + key
- # Check to see if we have this key
- unpickled_entry = self.client.get(key)
- if not unpickled_entry:
- # No hit, return nothing
- return None
- entry = pickle.loads(unpickled_entry)
- # Use provided timeout in arguments if provided
- # otherwise use the one provided during init.
- if timeout is None:
- timeout = self.timeout
- # Make sure entry is not expired
- if self._is_expired(entry, timeout):
- # entry expired, delete and return nothing
- self.delete_entry(key)
- return None
- # entry found and not expired, return it
- return entry[1]
- def count(self):
- """Note: This is not very efficient,
- since it retreives all the keys from the redis
- server to know how many keys we have"""
- return len(self.client.smembers(self.keys_container))
- def delete_entry(self, key):
- """Delete an object from the redis table"""
- pipe = self.client.pipeline()
- pipe.srem(self.keys_container, key)
- pipe.delete(key)
- pipe.execute()
- def cleanup(self):
- """Cleanup all the expired keys"""
- keys = self.client.smembers(self.keys_container)
- for key in keys:
- entry = self.client.get(key)
- if entry:
- entry = pickle.loads(entry)
- if self._is_expired(entry, self.timeout):
- self.delete_entry(key)
- def flush(self):
- """Delete all entries from the cache"""
- keys = self.client.smembers(self.keys_container)
- for key in keys:
- self.delete_entry(key)
- class MongodbCache(Cache):
- """A simple pickle-based MongoDB cache sytem."""
- def __init__(self, db, timeout=3600, collection='tweepy_cache'):
- """Should receive a "database" cursor from pymongo."""
- Cache.__init__(self, timeout)
- self.timeout = timeout
- self.col = db[collection]
- self.col.create_index('created', expireAfterSeconds=timeout)
- def store(self, key, value):
- from bson.binary import Binary
- now = datetime.datetime.utcnow()
- blob = Binary(pickle.dumps(value))
- self.col.insert({'created': now, '_id': key, 'value': blob})
- def get(self, key, timeout=None):
- if timeout:
- raise NotImplementedError
- obj = self.col.find_one({'_id': key})
- if obj:
- return pickle.loads(obj['value'])
- def count(self):
- return self.col.find({}).count()
- def delete_entry(self, key):
- return self.col.remove({'_id': key})
- def cleanup(self):
- """MongoDB will automatically clear expired keys."""
- pass
- def flush(self):
- self.col.drop()
- self.col.create_index('created', expireAfterSeconds=self.timeout)
|