protocol.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901
  1. from datetime import datetime, timedelta
  2. import logging
  3. import re
  4. import socket
  5. import struct
  6. import threading
  7. try:
  8. from urllib import splitport
  9. except ImportError:
  10. from urllib.parse import splitport
  11. import zlib
  12. from io import BytesIO
  13. import six
  14. from six import binary_type, text_type
  15. from bmemcached.compat import long
  16. from bmemcached.exceptions import AuthenticationNotSupported, InvalidCredentials, MemcachedException
  17. from bmemcached.utils import str_to_bytes
  18. logger = logging.getLogger(__name__)
  19. class Protocol(threading.local):
  20. """
  21. This class is used by Client class to communicate with server.
  22. """
  23. HEADER_STRUCT = '!BBHBBHLLQ'
  24. HEADER_SIZE = 24
  25. MAGIC = {
  26. 'request': 0x80,
  27. 'response': 0x81
  28. }
  29. # All structures will be appended to HEADER_STRUCT
  30. COMMANDS = {
  31. 'get': {'command': 0x00, 'struct': '%ds'},
  32. 'getk': {'command': 0x0C, 'struct': '%ds'},
  33. 'getkq': {'command': 0x0D, 'struct': '%ds'},
  34. 'set': {'command': 0x01, 'struct': 'LL%ds%ds'},
  35. 'setq': {'command': 0x11, 'struct': 'LL%ds%ds'},
  36. 'add': {'command': 0x02, 'struct': 'LL%ds%ds'},
  37. 'addq': {'command': 0x12, 'struct': 'LL%ds%ds'},
  38. 'replace': {'command': 0x03, 'struct': 'LL%ds%ds'},
  39. 'delete': {'command': 0x04, 'struct': '%ds'},
  40. 'incr': {'command': 0x05, 'struct': 'QQL%ds'},
  41. 'decr': {'command': 0x06, 'struct': 'QQL%ds'},
  42. 'flush': {'command': 0x08, 'struct': 'I'},
  43. 'noop': {'command': 0x0a, 'struct': ''},
  44. 'stat': {'command': 0x10},
  45. 'auth_negotiation': {'command': 0x20},
  46. 'auth_request': {'command': 0x21, 'struct': '%ds%ds'},
  47. }
  48. STATUS = {
  49. 'success': 0x00,
  50. 'key_not_found': 0x01,
  51. 'key_exists': 0x02,
  52. 'auth_error': 0x08,
  53. 'unknown_command': 0x81,
  54. # This is used internally, and is never returned by the server. (The server returns a 16-bit
  55. # value, so it's not capable of returning this value.)
  56. 'server_disconnected': 0xFFFFFFFF,
  57. }
  58. FLAGS = {
  59. 'object': 1 << 0,
  60. 'integer': 1 << 1,
  61. 'long': 1 << 2,
  62. 'compressed': 1 << 3,
  63. 'binary': 1 << 4,
  64. }
  65. MAXIMUM_EXPIRE_TIME = 0xfffffffe
  66. COMPRESSION_THRESHOLD = 128
  67. def __init__(self, server, username=None, password=None, compression=None, socket_timeout=None,
  68. pickle_protocol=None, pickler=None, unpickler=None):
  69. super(Protocol, self).__init__()
  70. self.server = server
  71. self._username = username
  72. self._password = password
  73. self.compression = zlib if compression is None else compression
  74. self.connection = None
  75. self.authenticated = False
  76. self.socket_timeout = socket_timeout
  77. self.pickle_protocol = pickle_protocol
  78. self.pickler = pickler
  79. self.unpickler = unpickler
  80. self.reconnects_deferred_until = None
  81. if not server.startswith('/'):
  82. self.host, self.port = self.split_host_port(self.server)
  83. self.set_retry_delay(5)
  84. else:
  85. self.host = self.port = None
  86. self.set_retry_delay(0)
  87. @property
  88. def server_uses_unix_socket(self):
  89. return self.host is None
  90. def set_retry_delay(self, value):
  91. self.retry_delay = value
  92. def _open_connection(self):
  93. if self.connection:
  94. return
  95. self.authenticated = False
  96. # If we're deferring a reconnection attempt, wait.
  97. if self.reconnects_deferred_until and self.reconnects_deferred_until > datetime.now():
  98. return
  99. try:
  100. if self.host:
  101. self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  102. self.connection.settimeout(self.socket_timeout)
  103. self.connection.connect((self.host, self.port))
  104. else:
  105. self.connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  106. self.connection.connect(self.server)
  107. self._send_authentication()
  108. except socket.error:
  109. # If the connection attempt fails, start delaying retries.
  110. self.reconnects_deferred_until = datetime.now() + timedelta(seconds=self.retry_delay)
  111. raise
  112. def _connection_error(self, exception):
  113. # On error, clear our dead connection.
  114. self.disconnect()
  115. @classmethod
  116. def split_host_port(cls, server):
  117. """
  118. Return (host, port) from server.
  119. Port defaults to 11211.
  120. >>> split_host_port('127.0.0.1:11211')
  121. ('127.0.0.1', 11211)
  122. >>> split_host_port('127.0.0.1')
  123. ('127.0.0.1', 11211)
  124. """
  125. host, port = splitport(server)
  126. if port is None:
  127. port = 11211
  128. port = int(port)
  129. if re.search(':.*$', host):
  130. host = re.sub(':.*$', '', host)
  131. return host, port
  132. def _read_socket(self, size):
  133. """
  134. Reads data from socket.
  135. :param size: Size in bytes to be read.
  136. :type size: int
  137. :return: Data from socket
  138. :rtype: six.string_types
  139. """
  140. value = b''
  141. while len(value) < size:
  142. data = self.connection.recv(size - len(value))
  143. if not data:
  144. break
  145. value += data
  146. # If we got less data than we requested, the server disconnected.
  147. if len(value) < size:
  148. raise socket.error()
  149. return value
  150. def _get_response(self):
  151. """
  152. Get memcached response from socket.
  153. :return: A tuple with binary values from memcached.
  154. :rtype: tuple
  155. """
  156. try:
  157. self._open_connection()
  158. if self.connection is None:
  159. # The connection wasn't opened, which means we're deferring a reconnection attempt.
  160. # Raise a socket.error, so we'll return the same server_disconnected message as we
  161. # do below.
  162. raise socket.error('Delaying reconnection attempt')
  163. header = self._read_socket(self.HEADER_SIZE)
  164. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  165. cas) = struct.unpack(self.HEADER_STRUCT, header)
  166. assert magic == self.MAGIC['response']
  167. extra_content = None
  168. if bodylen:
  169. extra_content = self._read_socket(bodylen)
  170. return (magic, opcode, keylen, extlen, datatype, status, bodylen,
  171. opaque, cas, extra_content)
  172. except socket.error as e:
  173. self._connection_error(e)
  174. # (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque, cas, extra_content)
  175. message = str(e)
  176. return (self.MAGIC['response'], -1, 0, 0, 0, self.STATUS['server_disconnected'], 0, 0, 0, message)
  177. def _send(self, data):
  178. try:
  179. self._open_connection()
  180. if self.connection is None:
  181. return
  182. self.connection.sendall(data)
  183. except socket.error as e:
  184. self._connection_error(e)
  185. def authenticate(self, username, password):
  186. """
  187. Authenticate user on server.
  188. :param username: Username used to be authenticated.
  189. :type username: six.string_types
  190. :param password: Password used to be authenticated.
  191. :type password: six.string_types
  192. :return: True if successful.
  193. :raises: InvalidCredentials, AuthenticationNotSupported, MemcachedException
  194. :rtype: bool
  195. """
  196. self._username = username
  197. self._password = password
  198. # Reopen the connection with the new credentials.
  199. self.disconnect()
  200. self._open_connection()
  201. return self.authenticated
  202. def _send_authentication(self):
  203. if not self._username or not self._password:
  204. return False
  205. logger.info('Authenticating as %s', self._username)
  206. self._send(struct.pack(self.HEADER_STRUCT,
  207. self.MAGIC['request'],
  208. self.COMMANDS['auth_negotiation']['command'],
  209. 0, 0, 0, 0, 0, 0, 0))
  210. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  211. cas, extra_content) = self._get_response()
  212. if status == self.STATUS['server_disconnected']:
  213. return False
  214. if status == self.STATUS['unknown_command']:
  215. logger.debug('Server does not requires authentication.')
  216. self.authenticated = True
  217. return True
  218. methods = extra_content
  219. if b'PLAIN' not in methods:
  220. raise AuthenticationNotSupported('This module only supports '
  221. 'PLAIN auth for now.')
  222. method = b'PLAIN'
  223. auth = '\x00%s\x00%s' % (self._username, self._password)
  224. if isinstance(auth, text_type):
  225. auth = auth.encode()
  226. self._send(struct.pack(self.HEADER_STRUCT +
  227. self.COMMANDS['auth_request']['struct'] % (len(method), len(auth)),
  228. self.MAGIC['request'], self.COMMANDS['auth_request']['command'],
  229. len(method), 0, 0, 0, len(method) + len(auth), 0, 0, method, auth))
  230. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  231. cas, extra_content) = self._get_response()
  232. if status == self.STATUS['server_disconnected']:
  233. return False
  234. if status == self.STATUS['auth_error']:
  235. raise InvalidCredentials("Incorrect username or password")
  236. if status != self.STATUS['success']:
  237. raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
  238. logger.debug('Auth OK. Code: %d Message: %s', status, extra_content)
  239. self.authenticated = True
  240. return True
  241. def serialize(self, value, compress_level=-1):
  242. """
  243. Serializes a value based on its type.
  244. :param value: Something to be serialized
  245. :type value: six.string_types, int, long, object
  246. :param compress_level: How much to compress.
  247. 0 = no compression, 1 = fastest, 9 = slowest but best,
  248. -1 = default compression level.
  249. :type compress_level: int
  250. :return: Serialized type
  251. :rtype: str
  252. """
  253. flags = 0
  254. if isinstance(value, binary_type):
  255. flags |= self.FLAGS['binary']
  256. elif isinstance(value, text_type):
  257. value = value.encode('utf8')
  258. elif isinstance(value, int) and isinstance(value, bool) is False:
  259. flags |= self.FLAGS['integer']
  260. value = str(value)
  261. elif isinstance(value, long) and isinstance(value, bool) is False:
  262. flags |= self.FLAGS['long']
  263. value = str(value)
  264. else:
  265. flags |= self.FLAGS['object']
  266. buf = BytesIO()
  267. pickler = self.pickler(buf, self.pickle_protocol)
  268. pickler.dump(value)
  269. value = buf.getvalue()
  270. if compress_level != 0 and len(value) > self.COMPRESSION_THRESHOLD:
  271. if compress_level is not None and compress_level > 0:
  272. # Use the specified compression level.
  273. compressed_value = self.compression.compress(value, compress_level)
  274. else:
  275. # Use the default compression level.
  276. compressed_value = self.compression.compress(value)
  277. # Use the compressed value only if it is actually smaller.
  278. if compressed_value and len(compressed_value) < len(value):
  279. value = compressed_value
  280. flags |= self.FLAGS['compressed']
  281. return flags, value
  282. def deserialize(self, value, flags):
  283. """
  284. Deserialized values based on flags or just return it if it is not serialized.
  285. :param value: Serialized or not value.
  286. :type value: six.string_types, int
  287. :param flags: Value flags
  288. :type flags: int
  289. :return: Deserialized value
  290. :rtype: six.string_types|int
  291. """
  292. FLAGS = self.FLAGS
  293. if flags & FLAGS['compressed']: # pragma: no branch
  294. value = self.compression.decompress(value)
  295. if flags & FLAGS['binary']:
  296. return value
  297. if flags & FLAGS['integer']:
  298. return int(value)
  299. elif flags & FLAGS['long']:
  300. return long(value)
  301. elif flags & FLAGS['object']:
  302. buf = BytesIO(value)
  303. unpickler = self.unpickler(buf)
  304. return unpickler.load()
  305. if six.PY3:
  306. return value.decode('utf8')
  307. # In Python 2, mimic the behavior of the json library: return a str
  308. # unless the value contains unicode characters.
  309. try:
  310. value.decode('ascii')
  311. except UnicodeDecodeError:
  312. return value.decode('utf8')
  313. else:
  314. return value
  315. def get(self, key):
  316. """
  317. Get a key and its CAS value from server. If the value isn't cached, return
  318. (None, None).
  319. :param key: Key's name
  320. :type key: six.string_types
  321. :return: Returns (value, cas).
  322. :rtype: object
  323. """
  324. logger.debug('Getting key %s', key)
  325. data = struct.pack(self.HEADER_STRUCT +
  326. self.COMMANDS['get']['struct'] % (len(key)),
  327. self.MAGIC['request'],
  328. self.COMMANDS['get']['command'],
  329. len(key), 0, 0, 0, len(key), 0, 0, str_to_bytes(key))
  330. self._send(data)
  331. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  332. cas, extra_content) = self._get_response()
  333. logger.debug('Value Length: %d. Body length: %d. Data type: %d',
  334. extlen, bodylen, datatype)
  335. if status != self.STATUS['success']:
  336. if status == self.STATUS['key_not_found']:
  337. logger.debug('Key not found. Message: %s', extra_content)
  338. return None, None
  339. if status == self.STATUS['server_disconnected']:
  340. return None, None
  341. raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
  342. flags, value = struct.unpack('!L%ds' % (bodylen - 4, ), extra_content)
  343. return self.deserialize(value, flags), cas
  344. def get_multi(self, keys):
  345. """
  346. Get multiple keys from server.
  347. :param keys: A list of keys to from server.
  348. :type keys: list
  349. :return: A dict with all requested keys.
  350. :rtype: dict
  351. """
  352. # pipeline N-1 getkq requests, followed by a regular getk to uncork the
  353. # server
  354. keys, last = keys[:-1], keys[-1]
  355. if six.PY2:
  356. msg = ''
  357. else:
  358. msg = b''
  359. msg = msg.join([
  360. struct.pack(self.HEADER_STRUCT +
  361. self.COMMANDS['getkq']['struct'] % (len(key)),
  362. self.MAGIC['request'],
  363. self.COMMANDS['getkq']['command'],
  364. len(key), 0, 0, 0, len(key), 0, 0, str_to_bytes(key))
  365. for key in keys])
  366. msg += struct.pack(self.HEADER_STRUCT +
  367. self.COMMANDS['getk']['struct'] % (len(last)),
  368. self.MAGIC['request'],
  369. self.COMMANDS['getk']['command'],
  370. len(last), 0, 0, 0, len(last), 0, 0, last.encode())
  371. self._send(msg)
  372. d = {}
  373. opcode = -1
  374. while opcode != self.COMMANDS['getk']['command']:
  375. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  376. cas, extra_content) = self._get_response()
  377. if status == self.STATUS['success']:
  378. flags, key, value = struct.unpack('!L%ds%ds' %
  379. (keylen, bodylen - keylen - 4),
  380. extra_content)
  381. d[key.decode()] = self.deserialize(value, flags), cas
  382. elif status == self.STATUS['server_disconnected']:
  383. break
  384. elif status != self.STATUS['key_not_found']:
  385. raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
  386. return d
  387. def _set_add_replace(self, command, key, value, time, cas=0, compress_level=-1):
  388. """
  389. Function to set/add/replace commands.
  390. :param key: Key's name
  391. :type key: six.string_types
  392. :param value: A value to be stored on server.
  393. :type value: object
  394. :param time: Time in seconds that your key will expire.
  395. :type time: int
  396. :param cas: The CAS value that must be matched for this operation to complete, or 0 for no CAS.
  397. :type cas: int
  398. :param compress_level: How much to compress.
  399. 0 = no compression, 1 = fastest, 9 = slowest but best,
  400. -1 = default compression level.
  401. :type compress_level: int
  402. :return: True in case of success and False in case of failure
  403. :rtype: bool
  404. """
  405. time = time if time >= 0 else self.MAXIMUM_EXPIRE_TIME
  406. logger.debug('Setting/adding/replacing key %s.', key)
  407. flags, value = self.serialize(value, compress_level=compress_level)
  408. logger.debug('Value bytes %s.', len(value))
  409. if isinstance(value, text_type):
  410. value = value.encode('utf8')
  411. self._send(struct.pack(self.HEADER_STRUCT +
  412. self.COMMANDS[command]['struct'] % (len(key), len(value)),
  413. self.MAGIC['request'],
  414. self.COMMANDS[command]['command'],
  415. len(key), 8, 0, 0, len(key) + len(value) + 8, 0, cas, flags,
  416. time, str_to_bytes(key), value))
  417. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  418. cas, extra_content) = self._get_response()
  419. if status != self.STATUS['success']:
  420. if status == self.STATUS['key_exists']:
  421. return False
  422. elif status == self.STATUS['key_not_found']:
  423. return False
  424. elif status == self.STATUS['server_disconnected']:
  425. return False
  426. raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
  427. return True
  428. def set(self, key, value, time, compress_level=-1):
  429. """
  430. Set a value for a key on server.
  431. :param key: Key's name
  432. :type key: six.string_types
  433. :param value: A value to be stored on server.
  434. :type value: object
  435. :param time: Time in seconds that your key will expire.
  436. :type time: int
  437. :param compress_level: How much to compress.
  438. 0 = no compression, 1 = fastest, 9 = slowest but best,
  439. -1 = default compression level.
  440. :type compress_level: int
  441. :return: True in case of success and False in case of failure
  442. :rtype: bool
  443. """
  444. return self._set_add_replace('set', key, value, time, compress_level=compress_level)
  445. def cas(self, key, value, cas, time, compress_level=-1):
  446. """
  447. Add a key/value to server ony if it does not exist.
  448. :param key: Key's name
  449. :type key: six.string_types
  450. :param value: A value to be stored on server.
  451. :type value: object
  452. :param time: Time in seconds that your key will expire.
  453. :type time: int
  454. :param compress_level: How much to compress.
  455. 0 = no compression, 1 = fastest, 9 = slowest but best,
  456. -1 = default compression level.
  457. :type compress_level: int
  458. :return: True if key is added False if key already exists and has a different CAS
  459. :rtype: bool
  460. """
  461. # The protocol CAS value 0 means "no cas". Calling cas() with that value is
  462. # probably unintentional. Don't allow it, since it would overwrite the value
  463. # without performing CAS at all.
  464. assert cas != 0, '0 is an invalid CAS value'
  465. # If we get a cas of None, interpret that as "compare against nonexistant and set",
  466. # which is simply Add.
  467. if cas is None:
  468. return self._set_add_replace('add', key, value, time, compress_level=compress_level)
  469. else:
  470. return self._set_add_replace('set', key, value, time, cas=cas, compress_level=compress_level)
  471. def add(self, key, value, time, compress_level=-1):
  472. """
  473. Add a key/value to server ony if it does not exist.
  474. :param key: Key's name
  475. :type key: six.string_types
  476. :param value: A value to be stored on server.
  477. :type value: object
  478. :param time: Time in seconds that your key will expire.
  479. :type time: int
  480. :param compress_level: How much to compress.
  481. 0 = no compression, 1 = fastest, 9 = slowest but best,
  482. -1 = default compression level.
  483. :type compress_level: int
  484. :return: True if key is added False if key already exists
  485. :rtype: bool
  486. """
  487. return self._set_add_replace('add', key, value, time, compress_level=compress_level)
  488. def replace(self, key, value, time, compress_level=-1):
  489. """
  490. Replace a key/value to server ony if it does exist.
  491. :param key: Key's name
  492. :type key: six.string_types
  493. :param value: A value to be stored on server.
  494. :type value: object
  495. :param time: Time in seconds that your key will expire.
  496. :type time: int
  497. :param compress_level: How much to compress.
  498. 0 = no compression, 1 = fastest, 9 = slowest but best,
  499. -1 = default compression level.
  500. :type compress_level: int
  501. :return: True if key is replace False if key does not exists
  502. :rtype: bool
  503. """
  504. return self._set_add_replace('replace', key, value, time, compress_level=compress_level)
  505. def set_multi(self, mappings, time=100, compress_level=-1):
  506. """
  507. Set multiple keys with its values on server.
  508. If a key is a (key, cas) tuple, insert as if cas(key, value, cas) had
  509. been called.
  510. :param mappings: A dict with keys/values
  511. :type mappings: dict
  512. :param time: Time in seconds that your key will expire.
  513. :type time: int
  514. :param compress_level: How much to compress.
  515. 0 = no compression, 1 = fastest, 9 = slowest but best,
  516. -1 = default compression level.
  517. :type compress_level: int
  518. :return: True
  519. :rtype: bool
  520. """
  521. mappings = mappings.items()
  522. msg = []
  523. for key, value in mappings:
  524. if isinstance(key, tuple):
  525. key, cas = key
  526. else:
  527. cas = None
  528. if cas == 0:
  529. # Like cas(), if the cas value is 0, treat it as compare-and-set against not
  530. # existing.
  531. command = 'addq'
  532. else:
  533. command = 'setq'
  534. flags, value = self.serialize(value, compress_level=compress_level)
  535. m = struct.pack(self.HEADER_STRUCT +
  536. self.COMMANDS[command]['struct'] % (len(key), len(value)),
  537. self.MAGIC['request'],
  538. self.COMMANDS[command]['command'],
  539. len(key),
  540. 8, 0, 0, len(key) + len(value) + 8, 0, cas or 0,
  541. flags, time, str_to_bytes(key), value)
  542. msg.append(m)
  543. m = struct.pack(self.HEADER_STRUCT +
  544. self.COMMANDS['noop']['struct'],
  545. self.MAGIC['request'],
  546. self.COMMANDS['noop']['command'],
  547. 0, 0, 0, 0, 0, 0, 0)
  548. msg.append(m)
  549. if six.PY2:
  550. msg = ''.join(msg)
  551. else:
  552. msg = b''.join(msg)
  553. self._send(msg)
  554. opcode = -1
  555. retval = True
  556. while opcode != self.COMMANDS['noop']['command']:
  557. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  558. cas, extra_content) = self._get_response()
  559. if status != self.STATUS['success']:
  560. retval = False
  561. if status == self.STATUS['server_disconnected']:
  562. break
  563. return retval
  564. def _incr_decr(self, command, key, value, default, time):
  565. """
  566. Function which increments and decrements.
  567. :param key: Key's name
  568. :type key: six.string_types
  569. :param value: Number to be (de|in)cremented
  570. :type value: int
  571. :param default: Default value if key does not exist.
  572. :type default: int
  573. :param time: Time in seconds to expire key.
  574. :type time: int
  575. :return: Actual value of the key on server
  576. :rtype: int
  577. """
  578. time = time if time >= 0 else self.MAXIMUM_EXPIRE_TIME
  579. self._send(struct.pack(self.HEADER_STRUCT +
  580. self.COMMANDS[command]['struct'] % len(key),
  581. self.MAGIC['request'],
  582. self.COMMANDS[command]['command'],
  583. len(key),
  584. 20, 0, 0, len(key) + 20, 0, 0, value,
  585. default, time, str_to_bytes(key)))
  586. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  587. cas, extra_content) = self._get_response()
  588. if status not in (self.STATUS['success'], self.STATUS['server_disconnected']):
  589. raise MemcachedException('Code: %d Message: %s' % (status, extra_content))
  590. if status == self.STATUS['server_disconnected']:
  591. return 0
  592. return struct.unpack('!Q', extra_content)[0]
  593. def incr(self, key, value, default=0, time=1000000):
  594. """
  595. Increment a key, if it exists, returns its actual value, if it doesn't, return 0.
  596. :param key: Key's name
  597. :type key: six.string_types
  598. :param value: Number to be incremented
  599. :type value: int
  600. :param default: Default value if key does not exist.
  601. :type default: int
  602. :param time: Time in seconds to expire key.
  603. :type time: int
  604. :return: Actual value of the key on server
  605. :rtype: int
  606. """
  607. return self._incr_decr('incr', key, value, default, time)
  608. def decr(self, key, value, default=0, time=100):
  609. """
  610. Decrement a key, if it exists, returns its actual value, if it doesn't, return 0.
  611. Minimum value of decrement return is 0.
  612. :param key: Key's name
  613. :type key: six.string_types
  614. :param value: Number to be decremented
  615. :type value: int
  616. :param default: Default value if key does not exist.
  617. :type default: int
  618. :param time: Time in seconds to expire key.
  619. :type time: int
  620. :return: Actual value of the key on server
  621. :rtype: int
  622. """
  623. return self._incr_decr('decr', key, value, default, time)
  624. def delete(self, key, cas=0):
  625. """
  626. Delete a key/value from server. If key existed and was deleted, return True.
  627. :param key: Key's name to be deleted
  628. :type key: six.string_types
  629. :param cas: If set, only delete the key if its CAS value matches.
  630. :type cas: int
  631. :return: True in case o success and False in case of failure.
  632. :rtype: bool
  633. """
  634. logger.debug('Deleting key %s', key)
  635. self._send(struct.pack(self.HEADER_STRUCT +
  636. self.COMMANDS['delete']['struct'] % len(key),
  637. self.MAGIC['request'],
  638. self.COMMANDS['delete']['command'],
  639. len(key), 0, 0, 0, len(key), 0, cas, str_to_bytes(key)))
  640. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  641. cas, extra_content) = self._get_response()
  642. if status == self.STATUS['server_disconnected']:
  643. return False
  644. if status != self.STATUS['success'] and status not in (self.STATUS['key_not_found'], self.STATUS['key_exists']):
  645. raise MemcachedException('Code: %d message: %s' % (status, extra_content))
  646. logger.debug('Key deleted %s', key)
  647. return status != self.STATUS['key_exists']
  648. def delete_multi(self, keys):
  649. """
  650. Delete multiple keys from server in one command.
  651. :param keys: A list of keys to be deleted
  652. :type keys: list
  653. :return: True in case of success and False in case of failure.
  654. :rtype: bool
  655. """
  656. logger.debug('Deleting keys %r', keys)
  657. if six.PY2:
  658. msg = ''
  659. else:
  660. msg = b''
  661. for key in keys:
  662. msg += struct.pack(
  663. self.HEADER_STRUCT +
  664. self.COMMANDS['delete']['struct'] % len(key),
  665. self.MAGIC['request'],
  666. self.COMMANDS['delete']['command'],
  667. len(key), 0, 0, 0, len(key), 0, 0, str_to_bytes(key))
  668. msg += struct.pack(
  669. self.HEADER_STRUCT +
  670. self.COMMANDS['noop']['struct'],
  671. self.MAGIC['request'],
  672. self.COMMANDS['noop']['command'],
  673. 0, 0, 0, 0, 0, 0, 0)
  674. self._send(msg)
  675. opcode = -1
  676. retval = True
  677. while opcode != self.COMMANDS['noop']['command']:
  678. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  679. cas, extra_content) = self._get_response()
  680. if status != self.STATUS['success']:
  681. retval = False
  682. if status == self.STATUS['server_disconnected']:
  683. break
  684. return retval
  685. def flush_all(self, time):
  686. """
  687. Send a command to server flush|delete all keys.
  688. :param time: Time to wait until flush in seconds.
  689. :type time: int
  690. :return: True in case of success, False in case of failure
  691. :rtype: bool
  692. """
  693. logger.info('Flushing memcached')
  694. self._send(struct.pack(self.HEADER_STRUCT +
  695. self.COMMANDS['flush']['struct'],
  696. self.MAGIC['request'],
  697. self.COMMANDS['flush']['command'],
  698. 0, 4, 0, 0, 4, 0, 0, time))
  699. (magic, opcode, keylen, extlen, datatype, status, bodylen, opaque,
  700. cas, extra_content) = self._get_response()
  701. if status not in (self.STATUS['success'], self.STATUS['server_disconnected']):
  702. raise MemcachedException('Code: %d message: %s' % (status, extra_content))
  703. logger.debug('Memcached flushed')
  704. return True
  705. def stats(self, key=None):
  706. """
  707. Return server stats.
  708. :param key: Optional if you want status from a key.
  709. :type key: six.string_types
  710. :return: A dict with server stats
  711. :rtype: dict
  712. """
  713. # TODO: Stats with key is not working.
  714. if key is not None:
  715. if isinstance(key, text_type):
  716. key = str_to_bytes(key)
  717. keylen = len(key)
  718. packed = struct.pack(
  719. self.HEADER_STRUCT + '%ds' % keylen,
  720. self.MAGIC['request'],
  721. self.COMMANDS['stat']['command'],
  722. keylen, 0, 0, 0, keylen, 0, 0, key)
  723. else:
  724. packed = struct.pack(
  725. self.HEADER_STRUCT,
  726. self.MAGIC['request'],
  727. self.COMMANDS['stat']['command'],
  728. 0, 0, 0, 0, 0, 0, 0)
  729. self._send(packed)
  730. value = {}
  731. while True:
  732. response = self._get_response()
  733. status = response[5]
  734. if status == self.STATUS['server_disconnected']:
  735. break
  736. keylen = response[2]
  737. bodylen = response[6]
  738. if keylen == 0 and bodylen == 0:
  739. break
  740. extra_content = response[-1]
  741. key = extra_content[:keylen]
  742. body = extra_content[keylen:bodylen]
  743. value[key.decode() if isinstance(key, bytes) else key] = body
  744. return value
  745. def disconnect(self):
  746. """
  747. Disconnects from server. A new connection will be established the next time a request is made.
  748. :return: Nothing
  749. :rtype: None
  750. """
  751. if self.connection:
  752. self.connection.close()
  753. self.connection = None