123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901 |
- from datetime import datetime, timedelta
- import logging
- import re
- import socket
- import struct
- import threading
- try:
- from urllib import splitport
- except ImportError:
- from urllib.parse import splitport
- import zlib
- from io import BytesIO
- import six
- from six import binary_type, text_type
- from bmemcached.compat import long
- from bmemcached.exceptions import AuthenticationNotSupported, InvalidCredentials, MemcachedException
- from bmemcached.utils import str_to_bytes
- logger = logging.getLogger(__name__)
- class Protocol(threading.local):
- """
- This class is used by Client class to communicate with server.
- """
- HEADER_STRUCT = '!BBHBBHLLQ'
- HEADER_SIZE = 24
- MAGIC = {
- 'request': 0x80,
- 'response': 0x81
- }
- # All structures will be appended to HEADER_STRUCT
- COMMANDS = {
- 'get': {'command': 0x00, 'struct': '%ds'},
- 'getk': {'command': 0x0C, 'struct': '%ds'},
- 'getkq': {'command': 0x0D, 'struct': '%ds'},
- 'set': {'command': 0x01, 'struct': 'LL%ds%ds'},
- 'setq': {'command': 0x11, 'struct': 'LL%ds%ds'},
- 'add': {'command': 0x02, 'struct': 'LL%ds%ds'},
- 'addq': {'command': 0x12, 'struct': 'LL%ds%ds'},
- 'replace': {'command': 0x03, 'struct': 'LL%ds%ds'},
- 'delete': {'command': 0x04, 'struct': '%ds'},
- 'incr': {'command': 0x05, 'struct': 'QQL%ds'},
- 'decr': {'command': 0x06, 'struct': 'QQL%ds'},
- 'flush': {'command': 0x08, 'struct': 'I'},
- 'noop': {'command': 0x0a, 'struct': ''},
- 'stat': {'command': 0x10},
- 'auth_negotiation': {'command': 0x20},
- 'auth_request': {'command': 0x21, 'struct': '%ds%ds'},
- }
- STATUS = {
- 'success': 0x00,
- 'key_not_found': 0x01,
- 'key_exists': 0x02,
- 'auth_error': 0x08,
- 'unknown_command': 0x81,
- # This is used internally, and is never returned by the server. (The server returns a 16-bit
- # value, so it's not capable of returning this value.)
- 'server_disconnected': 0xFFFFFFFF,
- }
- FLAGS = {
- 'object': 1 << 0,
- 'integer': 1 << 1,
- 'long': 1 << 2,
- 'compressed': 1 << 3,
- 'binary': 1 << 4,
- }
- MAXIMUM_EXPIRE_TIME = 0xfffffffe
- COMPRESSION_THRESHOLD = 128
- def __init__(self, server, username=None, password=None, compression=None, socket_timeout=None,
- pickle_protocol=None, pickler=None, unpickler=None):
- super(Protocol, self).__init__()
- self.server = server
- self._username = username
- self._password = password
- self.compression = zlib if compression is None else compression
- self.connection = None
- self.authenticated = False
- self.socket_timeout = socket_timeout
- self.pickle_protocol = pickle_protocol
- self.pickler = pickler
- self.unpickler = unpickler
- self.reconnects_deferred_until = None
- if not server.startswith('/'):
- self.host, self.port = self.split_host_port(self.server)
- self.set_retry_delay(5)
- else:
- self.host = self.port = None
- self.set_retry_delay(0)
- @property
- def server_uses_unix_socket(self):
- return self.host is None
- def set_retry_delay(self, value):
- self.retry_delay = value
- def _open_connection(self):
- if self.connection:
- return
- self.authenticated = False
- # If we're deferring a reconnection attempt, wait.
- if self.reconnects_deferred_until and self.reconnects_deferred_until > datetime.now():
- return
- try:
- if self.host:
- self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connection.settimeout(self.socket_timeout)
- self.connection.connect((self.host, self.port))
- else:
- self.connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.connection.connect(self.server)
- self._send_authentication()
- except socket.error:
- # If the connection attempt fails, start delaying retries.
- self.reconnects_deferred_until = datetime.now() + timedelta(seconds=self.retry_delay)
- raise
- def _connection_error(self, exception):
- # On error, clear our dead connection.
- self.disconnect()
- @classmethod
- def split_host_port(cls, server):
- """
- Return (host, port) from server.
- Port defaults to 11211.
- >>> split_host_port('127.0.0.1:11211')
- ('127.0.0.1', 11211)
- >>> split_host_port('127.0.0.1')
- ('127.0.0.1', 11211)
- """
- host, port = splitport(server)
- if port is None:
- port = 11211
- port = int(port)
- if re.search(':.*$', host):
- host = re.sub(':.*$', '', host)
- return host, port
- def _read_socket(self, size):
- """
- Reads data from socket.
- :param size: Size in bytes to be read.
- :type size: int
- :return: Data from socket
- :rtype: six.string_types
- """
- value = b''
- while len(value) < size:
- data = self.connection.recv(size - len(value))
- if not data:
- break
- value += data
- # If we got less data than we requested, the server disconnected.
- if len(value) < size:
- raise socket.error()
- return value
- def _get_response(self):
- """
- Get memcached response from socket.
- :return: A tuple with binary values from memcached.
- :rtype: tuple
- """
- try:
- self._open_connection()
- if self.connection is None:
- # The connection wasn't opened, which means we're deferring a reconnection attempt.
- # Raise a socket.error, so we'll return the same server_disconnected message as we
- # do below.
- raise socket.error('Delaying reconnection attempt')
- header = self._read_socket(self.HEADER_SIZE)
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas) = struct.unpack(self.HEADER_STRUCT, header)
- assert magic == self.MAGIC['response']
- extra_content = None
- if bodylen:
- extra_content = self._read_socket(bodylen)
- return (magic, opcode, keylen, extlen, datatype, status, bodylen,
- opaque, cas, extra_content)
- except socket.error as e:
- self._connection_error(e)
- # (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque, cas, extra_content)
- message = str(e)
- return (self.MAGIC['response'], -1, 0, 0, 0, self.STATUS['server_disconnected'], 0, 0, 0, message)
- def _send(self, data):
- try:
- self._open_connection()
- if self.connection is None:
- return
- self.connection.sendall(data)
- except socket.error as e:
- self._connection_error(e)
- def authenticate(self, username, password):
- """
- Authenticate user on server.
- :param username: Username used to be authenticated.
- :type username: six.string_types
- :param password: Password used to be authenticated.
- :type password: six.string_types
- :return: True if successful.
- :raises: InvalidCredentials, AuthenticationNotSupported, MemcachedException
- :rtype: bool
- """
- self._username = username
- self._password = password
- # Reopen the connection with the new credentials.
- self.disconnect()
- self._open_connection()
- return self.authenticated
- def _send_authentication(self):
- if not self._username or not self._password:
- return False
- logger.info('Authenticating as %s', self._username)
- self._send(struct.pack(self.HEADER_STRUCT,
- self.MAGIC['request'],
- self.COMMANDS['auth_negotiation']['command'],
- 0, 0, 0, 0, 0, 0, 0))
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- if status == self.STATUS['server_disconnected']:
- return False
- if status == self.STATUS['unknown_command']:
- logger.debug('Server does not requires authentication.')
- self.authenticated = True
- return True
- methods = extra_content
- if b'PLAIN' not in methods:
- raise AuthenticationNotSupported('This module only supports '
- 'PLAIN auth for now.')
- method = b'PLAIN'
- auth = '\x00%s\x00%s' % (self._username, self._password)
- if isinstance(auth, text_type):
- auth = auth.encode()
- self._send(struct.pack(self.HEADER_STRUCT +
- self.COMMANDS['auth_request']['struct'] % (len(method), len(auth)),
- self.MAGIC['request'], self.COMMANDS['auth_request']['command'],
- len(method), 0, 0, 0, len(method) + len(auth), 0, 0, method, auth))
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- if status == self.STATUS['server_disconnected']:
- return False
- if status == self.STATUS['auth_error']:
- raise InvalidCredentials("Incorrect username or password")
- if status != self.STATUS['success']:
- raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
- logger.debug('Auth OK. Code: %d Message: %s', status, extra_content)
- self.authenticated = True
- return True
- def serialize(self, value, compress_level=-1):
- """
- Serializes a value based on its type.
- :param value: Something to be serialized
- :type value: six.string_types, int, long, object
- :param compress_level: How much to compress.
- 0 = no compression, 1 = fastest, 9 = slowest but best,
- -1 = default compression level.
- :type compress_level: int
- :return: Serialized type
- :rtype: str
- """
- flags = 0
- if isinstance(value, binary_type):
- flags |= self.FLAGS['binary']
- elif isinstance(value, text_type):
- value = value.encode('utf8')
- elif isinstance(value, int) and isinstance(value, bool) is False:
- flags |= self.FLAGS['integer']
- value = str(value)
- elif isinstance(value, long) and isinstance(value, bool) is False:
- flags |= self.FLAGS['long']
- value = str(value)
- else:
- flags |= self.FLAGS['object']
- buf = BytesIO()
- pickler = self.pickler(buf, self.pickle_protocol)
- pickler.dump(value)
- value = buf.getvalue()
- if compress_level != 0 and len(value) > self.COMPRESSION_THRESHOLD:
- if compress_level is not None and compress_level > 0:
- # Use the specified compression level.
- compressed_value = self.compression.compress(value, compress_level)
- else:
- # Use the default compression level.
- compressed_value = self.compression.compress(value)
- # Use the compressed value only if it is actually smaller.
- if compressed_value and len(compressed_value) < len(value):
- value = compressed_value
- flags |= self.FLAGS['compressed']
- return flags, value
- def deserialize(self, value, flags):
- """
- Deserialized values based on flags or just return it if it is not serialized.
- :param value: Serialized or not value.
- :type value: six.string_types, int
- :param flags: Value flags
- :type flags: int
- :return: Deserialized value
- :rtype: six.string_types|int
- """
- FLAGS = self.FLAGS
- if flags & FLAGS['compressed']: # pragma: no branch
- value = self.compression.decompress(value)
- if flags & FLAGS['binary']:
- return value
- if flags & FLAGS['integer']:
- return int(value)
- elif flags & FLAGS['long']:
- return long(value)
- elif flags & FLAGS['object']:
- buf = BytesIO(value)
- unpickler = self.unpickler(buf)
- return unpickler.load()
- if six.PY3:
- return value.decode('utf8')
- # In Python 2, mimic the behavior of the json library: return a str
- # unless the value contains unicode characters.
- try:
- value.decode('ascii')
- except UnicodeDecodeError:
- return value.decode('utf8')
- else:
- return value
- def get(self, key):
- """
- Get a key and its CAS value from server. If the value isn't cached, return
- (None, None).
- :param key: Key's name
- :type key: six.string_types
- :return: Returns (value, cas).
- :rtype: object
- """
- logger.debug('Getting key %s', key)
- data = struct.pack(self.HEADER_STRUCT +
- self.COMMANDS['get']['struct'] % (len(key)),
- self.MAGIC['request'],
- self.COMMANDS['get']['command'],
- len(key), 0, 0, 0, len(key), 0, 0, str_to_bytes(key))
- self._send(data)
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- logger.debug('Value Length: %d. Body length: %d. Data type: %d',
- extlen, bodylen, datatype)
- if status != self.STATUS['success']:
- if status == self.STATUS['key_not_found']:
- logger.debug('Key not found. Message: %s', extra_content)
- return None, None
- if status == self.STATUS['server_disconnected']:
- return None, None
- raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
- flags, value = struct.unpack('!L%ds' % (bodylen - 4, ), extra_content)
- return self.deserialize(value, flags), cas
- def get_multi(self, keys):
- """
- Get multiple keys from server.
- :param keys: A list of keys to from server.
- :type keys: list
- :return: A dict with all requested keys.
- :rtype: dict
- """
- # pipeline N-1 getkq requests, followed by a regular getk to uncork the
- # server
- keys, last = keys[:-1], keys[-1]
- if six.PY2:
- msg = ''
- else:
- msg = b''
- msg = msg.join([
- struct.pack(self.HEADER_STRUCT +
- self.COMMANDS['getkq']['struct'] % (len(key)),
- self.MAGIC['request'],
- self.COMMANDS['getkq']['command'],
- len(key), 0, 0, 0, len(key), 0, 0, str_to_bytes(key))
- for key in keys])
- msg += struct.pack(self.HEADER_STRUCT +
- self.COMMANDS['getk']['struct'] % (len(last)),
- self.MAGIC['request'],
- self.COMMANDS['getk']['command'],
- len(last), 0, 0, 0, len(last), 0, 0, last.encode())
- self._send(msg)
- d = {}
- opcode = -1
- while opcode != self.COMMANDS['getk']['command']:
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- if status == self.STATUS['success']:
- flags, key, value = struct.unpack('!L%ds%ds' %
- (keylen, bodylen - keylen - 4),
- extra_content)
- d[key.decode()] = self.deserialize(value, flags), cas
- elif status == self.STATUS['server_disconnected']:
- break
- elif status != self.STATUS['key_not_found']:
- raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
- return d
- def _set_add_replace(self, command, key, value, time, cas=0, compress_level=-1):
- """
- Function to set/add/replace commands.
- :param key: Key's name
- :type key: six.string_types
- :param value: A value to be stored on server.
- :type value: object
- :param time: Time in seconds that your key will expire.
- :type time: int
- :param cas: The CAS value that must be matched for this operation to complete, or 0 for no CAS.
- :type cas: int
- :param compress_level: How much to compress.
- 0 = no compression, 1 = fastest, 9 = slowest but best,
- -1 = default compression level.
- :type compress_level: int
- :return: True in case of success and False in case of failure
- :rtype: bool
- """
- time = time if time >= 0 else self.MAXIMUM_EXPIRE_TIME
- logger.debug('Setting/adding/replacing key %s.', key)
- flags, value = self.serialize(value, compress_level=compress_level)
- logger.debug('Value bytes %s.', len(value))
- if isinstance(value, text_type):
- value = value.encode('utf8')
- self._send(struct.pack(self.HEADER_STRUCT +
- self.COMMANDS[command]['struct'] % (len(key), len(value)),
- self.MAGIC['request'],
- self.COMMANDS[command]['command'],
- len(key), 8, 0, 0, len(key) + len(value) + 8, 0, cas, flags,
- time, str_to_bytes(key), value))
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- if status != self.STATUS['success']:
- if status == self.STATUS['key_exists']:
- return False
- elif status == self.STATUS['key_not_found']:
- return False
- elif status == self.STATUS['server_disconnected']:
- return False
- raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
- return True
- def set(self, key, value, time, compress_level=-1):
- """
- Set a value for a key on server.
- :param key: Key's name
- :type key: six.string_types
- :param value: A value to be stored on server.
- :type value: object
- :param time: Time in seconds that your key will expire.
- :type time: int
- :param compress_level: How much to compress.
- 0 = no compression, 1 = fastest, 9 = slowest but best,
- -1 = default compression level.
- :type compress_level: int
- :return: True in case of success and False in case of failure
- :rtype: bool
- """
- return self._set_add_replace('set', key, value, time, compress_level=compress_level)
- def cas(self, key, value, cas, time, compress_level=-1):
- """
- Add a key/value to server ony if it does not exist.
- :param key: Key's name
- :type key: six.string_types
- :param value: A value to be stored on server.
- :type value: object
- :param time: Time in seconds that your key will expire.
- :type time: int
- :param compress_level: How much to compress.
- 0 = no compression, 1 = fastest, 9 = slowest but best,
- -1 = default compression level.
- :type compress_level: int
- :return: True if key is added False if key already exists and has a different CAS
- :rtype: bool
- """
- # The protocol CAS value 0 means "no cas". Calling cas() with that value is
- # probably unintentional. Don't allow it, since it would overwrite the value
- # without performing CAS at all.
- assert cas != 0, '0 is an invalid CAS value'
- # If we get a cas of None, interpret that as "compare against nonexistant and set",
- # which is simply Add.
- if cas is None:
- return self._set_add_replace('add', key, value, time, compress_level=compress_level)
- else:
- return self._set_add_replace('set', key, value, time, cas=cas, compress_level=compress_level)
- def add(self, key, value, time, compress_level=-1):
- """
- Add a key/value to server ony if it does not exist.
- :param key: Key's name
- :type key: six.string_types
- :param value: A value to be stored on server.
- :type value: object
- :param time: Time in seconds that your key will expire.
- :type time: int
- :param compress_level: How much to compress.
- 0 = no compression, 1 = fastest, 9 = slowest but best,
- -1 = default compression level.
- :type compress_level: int
- :return: True if key is added False if key already exists
- :rtype: bool
- """
- return self._set_add_replace('add', key, value, time, compress_level=compress_level)
- def replace(self, key, value, time, compress_level=-1):
- """
- Replace a key/value to server ony if it does exist.
- :param key: Key's name
- :type key: six.string_types
- :param value: A value to be stored on server.
- :type value: object
- :param time: Time in seconds that your key will expire.
- :type time: int
- :param compress_level: How much to compress.
- 0 = no compression, 1 = fastest, 9 = slowest but best,
- -1 = default compression level.
- :type compress_level: int
- :return: True if key is replace False if key does not exists
- :rtype: bool
- """
- return self._set_add_replace('replace', key, value, time, compress_level=compress_level)
- def set_multi(self, mappings, time=100, compress_level=-1):
- """
- Set multiple keys with its values on server.
- If a key is a (key, cas) tuple, insert as if cas(key, value, cas) had
- been called.
- :param mappings: A dict with keys/values
- :type mappings: dict
- :param time: Time in seconds that your key will expire.
- :type time: int
- :param compress_level: How much to compress.
- 0 = no compression, 1 = fastest, 9 = slowest but best,
- -1 = default compression level.
- :type compress_level: int
- :return: True
- :rtype: bool
- """
- mappings = mappings.items()
- msg = []
- for key, value in mappings:
- if isinstance(key, tuple):
- key, cas = key
- else:
- cas = None
- if cas == 0:
- # Like cas(), if the cas value is 0, treat it as compare-and-set against not
- # existing.
- command = 'addq'
- else:
- command = 'setq'
- flags, value = self.serialize(value, compress_level=compress_level)
- m = struct.pack(self.HEADER_STRUCT +
- self.COMMANDS[command]['struct'] % (len(key), len(value)),
- self.MAGIC['request'],
- self.COMMANDS[command]['command'],
- len(key),
- 8, 0, 0, len(key) + len(value) + 8, 0, cas or 0,
- flags, time, str_to_bytes(key), value)
- msg.append(m)
- m = struct.pack(self.HEADER_STRUCT +
- self.COMMANDS['noop']['struct'],
- self.MAGIC['request'],
- self.COMMANDS['noop']['command'],
- 0, 0, 0, 0, 0, 0, 0)
- msg.append(m)
- if six.PY2:
- msg = ''.join(msg)
- else:
- msg = b''.join(msg)
- self._send(msg)
- opcode = -1
- retval = True
- while opcode != self.COMMANDS['noop']['command']:
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- if status != self.STATUS['success']:
- retval = False
- if status == self.STATUS['server_disconnected']:
- break
- return retval
- def _incr_decr(self, command, key, value, default, time):
- """
- Function which increments and decrements.
- :param key: Key's name
- :type key: six.string_types
- :param value: Number to be (de|in)cremented
- :type value: int
- :param default: Default value if key does not exist.
- :type default: int
- :param time: Time in seconds to expire key.
- :type time: int
- :return: Actual value of the key on server
- :rtype: int
- """
- time = time if time >= 0 else self.MAXIMUM_EXPIRE_TIME
- self._send(struct.pack(self.HEADER_STRUCT +
- self.COMMANDS[command]['struct'] % len(key),
- self.MAGIC['request'],
- self.COMMANDS[command]['command'],
- len(key),
- 20, 0, 0, len(key) + 20, 0, 0, value,
- default, time, str_to_bytes(key)))
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- if status not in (self.STATUS['success'], self.STATUS['server_disconnected']):
- raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
- if status == self.STATUS['server_disconnected']:
- return 0
- return struct.unpack('!Q', extra_content)[0]
- def incr(self, key, value, default=0, time=1000000):
- """
- Increment a key, if it exists, returns its actual value, if it doesn't, return 0.
- :param key: Key's name
- :type key: six.string_types
- :param value: Number to be incremented
- :type value: int
- :param default: Default value if key does not exist.
- :type default: int
- :param time: Time in seconds to expire key.
- :type time: int
- :return: Actual value of the key on server
- :rtype: int
- """
- return self._incr_decr('incr', key, value, default, time)
- def decr(self, key, value, default=0, time=100):
- """
- Decrement a key, if it exists, returns its actual value, if it doesn't, return 0.
- Minimum value of decrement return is 0.
- :param key: Key's name
- :type key: six.string_types
- :param value: Number to be decremented
- :type value: int
- :param default: Default value if key does not exist.
- :type default: int
- :param time: Time in seconds to expire key.
- :type time: int
- :return: Actual value of the key on server
- :rtype: int
- """
- return self._incr_decr('decr', key, value, default, time)
- def delete(self, key, cas=0):
- """
- Delete a key/value from server. If key existed and was deleted, return True.
- :param key: Key's name to be deleted
- :type key: six.string_types
- :param cas: If set, only delete the key if its CAS value matches.
- :type cas: int
- :return: True in case o success and False in case of failure.
- :rtype: bool
- """
- logger.debug('Deleting key %s', key)
- self._send(struct.pack(self.HEADER_STRUCT +
- self.COMMANDS['delete']['struct'] % len(key),
- self.MAGIC['request'],
- self.COMMANDS['delete']['command'],
- len(key), 0, 0, 0, len(key), 0, cas, str_to_bytes(key)))
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- if status == self.STATUS['server_disconnected']:
- return False
- if status != self.STATUS['success'] and status not in (self.STATUS['key_not_found'], self.STATUS['key_exists']):
- raise MemcachedException('Code: %d message: %s' % (status, extra_content))
- logger.debug('Key deleted %s', key)
- return status != self.STATUS['key_exists']
- def delete_multi(self, keys):
- """
- Delete multiple keys from server in one command.
- :param keys: A list of keys to be deleted
- :type keys: list
- :return: True in case of success and False in case of failure.
- :rtype: bool
- """
- logger.debug('Deleting keys %r', keys)
- if six.PY2:
- msg = ''
- else:
- msg = b''
- for key in keys:
- msg += struct.pack(
- self.HEADER_STRUCT +
- self.COMMANDS['delete']['struct'] % len(key),
- self.MAGIC['request'],
- self.COMMANDS['delete']['command'],
- len(key), 0, 0, 0, len(key), 0, 0, str_to_bytes(key))
- msg += struct.pack(
- self.HEADER_STRUCT +
- self.COMMANDS['noop']['struct'],
- self.MAGIC['request'],
- self.COMMANDS['noop']['command'],
- 0, 0, 0, 0, 0, 0, 0)
- self._send(msg)
- opcode = -1
- retval = True
- while opcode != self.COMMANDS['noop']['command']:
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- if status != self.STATUS['success']:
- retval = False
- if status == self.STATUS['server_disconnected']:
- break
- return retval
- def flush_all(self, time):
- """
- Send a command to server flush|delete all keys.
- :param time: Time to wait until flush in seconds.
- :type time: int
- :return: True in case of success, False in case of failure
- :rtype: bool
- """
- logger.info('Flushing memcached')
- self._send(struct.pack(self.HEADER_STRUCT +
- self.COMMANDS['flush']['struct'],
- self.MAGIC['request'],
- self.COMMANDS['flush']['command'],
- 0, 4, 0, 0, 4, 0, 0, time))
- (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
- cas, extra_content) = self._get_response()
- if status not in (self.STATUS['success'], self.STATUS['server_disconnected']):
- raise MemcachedException('Code: %d message: %s' % (status, extra_content))
- logger.debug('Memcached flushed')
- return True
- def stats(self, key=None):
- """
- Return server stats.
- :param key: Optional if you want status from a key.
- :type key: six.string_types
- :return: A dict with server stats
- :rtype: dict
- """
- # TODO: Stats with key is not working.
- if key is not None:
- if isinstance(key, text_type):
- key = str_to_bytes(key)
- keylen = len(key)
- packed = struct.pack(
- self.HEADER_STRUCT + '%ds' % keylen,
- self.MAGIC['request'],
- self.COMMANDS['stat']['command'],
- keylen, 0, 0, 0, keylen, 0, 0, key)
- else:
- packed = struct.pack(
- self.HEADER_STRUCT,
- self.MAGIC['request'],
- self.COMMANDS['stat']['command'],
- 0, 0, 0, 0, 0, 0, 0)
- self._send(packed)
- value = {}
- while True:
- response = self._get_response()
- status = response[5]
- if status == self.STATUS['server_disconnected']:
- break
- keylen = response[2]
- bodylen = response[6]
- if keylen == 0 and bodylen == 0:
- break
- extra_content = response[-1]
- key = extra_content[:keylen]
- body = extra_content[keylen:bodylen]
- value[key.decode() if isinstance(key, bytes) else key] = body
- return value
- def disconnect(self):
- """
- Disconnects from server. A new connection will be established the next time a request is made.
- :return: Nothing
- :rtype: None
- """
- if self.connection:
- self.connection.close()
- self.connection = None
|