memcache.py 54 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526
  1. #!/usr/bin/env python
  2. """client module for memcached (memory cache daemon)
  3. Overview
  4. ========
  5. See U{the MemCached homepage<http://www.danga.com/memcached>} for more
  6. about memcached.
  7. Usage summary
  8. =============
  9. This should give you a feel for how this module operates::
  10. import memcache
  11. mc = memcache.Client(['127.0.0.1:11211'], debug=0)
  12. mc.set("some_key", "Some value")
  13. value = mc.get("some_key")
  14. mc.set("another_key", 3)
  15. mc.delete("another_key")
  16. mc.set("key", "1") # note that the key used for incr/decr must be
  17. # a string.
  18. mc.incr("key")
  19. mc.decr("key")
  20. The standard way to use memcache with a database is like this:
  21. key = derive_key(obj)
  22. obj = mc.get(key)
  23. if not obj:
  24. obj = backend_api.get(...)
  25. mc.set(key, obj)
  26. # we now have obj, and future passes through this code
  27. # will use the object from the cache.
  28. Detailed Documentation
  29. ======================
  30. More detailed documentation is available in the L{Client} class.
  31. """
  32. from __future__ import print_function
  33. import binascii
  34. import os
  35. import re
  36. import socket
  37. import sys
  38. import threading
  39. import time
  40. import zlib
  41. import six
  42. if six.PY2:
  43. # With Python 2, the faster C implementation has to be imported explicitly.
  44. import cPickle as pickle
  45. else:
  46. import pickle
  47. def cmemcache_hash(key):
  48. return (
  49. (((binascii.crc32(key) & 0xffffffff)
  50. >> 16) & 0x7fff) or 1)
  51. serverHashFunction = cmemcache_hash
  52. def useOldServerHashFunction():
  53. """Use the old python-memcache server hash function."""
  54. global serverHashFunction
  55. serverHashFunction = binascii.crc32
  56. from io import BytesIO
  57. if six.PY2:
  58. try:
  59. unicode
  60. except NameError:
  61. _has_unicode = False
  62. else:
  63. _has_unicode = True
  64. else:
  65. _has_unicode = True
  66. _str_cls = six.string_types
  67. valid_key_chars_re = re.compile(b'[\x21-\x7e\x80-\xff]+$')
  68. # Original author: Evan Martin of Danga Interactive
  69. __author__ = "Sean Reifschneider <jafo-memcached@tummy.com>"
  70. __version__ = "1.58"
  71. __copyright__ = "Copyright (C) 2003 Danga Interactive"
  72. # http://en.wikipedia.org/wiki/Python_Software_Foundation_License
  73. __license__ = "Python Software Foundation License"
  74. SERVER_MAX_KEY_LENGTH = 250
  75. # Storing values larger than 1MB requires starting memcached with -I <size> for
  76. # memcached >= 1.4.2 or recompiling for < 1.4.2. If you do, this value can be
  77. # changed by doing "memcache.SERVER_MAX_VALUE_LENGTH = N" after importing this
  78. # module.
  79. SERVER_MAX_VALUE_LENGTH = 1024 * 1024
  80. class _Error(Exception):
  81. pass
  82. class _ConnectionDeadError(Exception):
  83. pass
  84. _DEAD_RETRY = 30 # number of seconds before retrying a dead server.
  85. _SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout.
  86. class Client(threading.local):
  87. """Object representing a pool of memcache servers.
  88. See L{memcache} for an overview.
  89. In all cases where a key is used, the key can be either:
  90. 1. A simple hashable type (string, integer, etc.).
  91. 2. A tuple of C{(hashvalue, key)}. This is useful if you want
  92. to avoid making this module calculate a hash value. You may
  93. prefer, for example, to keep all of a given user's objects on
  94. the same memcache server, so you could use the user's unique
  95. id as the hash value.
  96. @group Setup: __init__, set_servers, forget_dead_hosts,
  97. disconnect_all, debuglog
  98. @group Insertion: set, add, replace, set_multi
  99. @group Retrieval: get, get_multi
  100. @group Integers: incr, decr
  101. @group Removal: delete, delete_multi
  102. @sort: __init__, set_servers, forget_dead_hosts, disconnect_all,
  103. debuglog,\ set, set_multi, add, replace, get, get_multi,
  104. incr, decr, delete, delete_multi
  105. """
  106. _FLAG_PICKLE = 1 << 0
  107. _FLAG_INTEGER = 1 << 1
  108. _FLAG_LONG = 1 << 2
  109. _FLAG_COMPRESSED = 1 << 3
  110. _SERVER_RETRIES = 10 # how many times to try finding a free server.
  111. # exceptions for Client
  112. class MemcachedKeyError(Exception):
  113. pass
  114. class MemcachedKeyLengthError(MemcachedKeyError):
  115. pass
  116. class MemcachedKeyCharacterError(MemcachedKeyError):
  117. pass
  118. class MemcachedKeyNoneError(MemcachedKeyError):
  119. pass
  120. class MemcachedKeyTypeError(MemcachedKeyError):
  121. pass
  122. class MemcachedStringEncodingError(Exception):
  123. pass
  124. def __init__(self, servers, debug=0, pickleProtocol=0,
  125. pickler=pickle.Pickler, unpickler=pickle.Unpickler,
  126. compressor=zlib.compress, decompressor=zlib.decompress,
  127. pload=None, pid=None,
  128. server_max_key_length=None, server_max_value_length=None,
  129. dead_retry=_DEAD_RETRY, socket_timeout=_SOCKET_TIMEOUT,
  130. cache_cas=False, flush_on_reconnect=0, check_keys=True):
  131. """Create a new Client object with the given list of servers.
  132. @param servers: C{servers} is passed to L{set_servers}.
  133. @param debug: whether to display error messages when a server
  134. can't be contacted.
  135. @param pickleProtocol: number to mandate protocol used by
  136. (c)Pickle.
  137. @param pickler: optional override of default Pickler to allow
  138. subclassing.
  139. @param unpickler: optional override of default Unpickler to
  140. allow subclassing.
  141. @param pload: optional persistent_load function to call on
  142. pickle loading. Useful for cPickle since subclassing isn't
  143. allowed.
  144. @param pid: optional persistent_id function to call on pickle
  145. storing. Useful for cPickle since subclassing isn't allowed.
  146. @param dead_retry: number of seconds before retrying a
  147. blacklisted server. Default to 30 s.
  148. @param socket_timeout: timeout in seconds for all calls to a
  149. server. Defaults to 3 seconds.
  150. @param cache_cas: (default False) If true, cas operations will
  151. be cached. WARNING: This cache is not expired internally, if
  152. you have a long-running process you will need to expire it
  153. manually via client.reset_cas(), or the cache can grow
  154. unlimited.
  155. @param server_max_key_length: (default SERVER_MAX_KEY_LENGTH)
  156. Data that is larger than this will not be sent to the server.
  157. @param server_max_value_length: (default
  158. SERVER_MAX_VALUE_LENGTH) Data that is larger than this will
  159. not be sent to the server.
  160. @param flush_on_reconnect: optional flag which prevents a
  161. scenario that can cause stale data to be read: If there's more
  162. than one memcached server and the connection to one is
  163. interrupted, keys that mapped to that server will get
  164. reassigned to another. If the first server comes back, those
  165. keys will map to it again. If it still has its data, get()s
  166. can read stale data that was overwritten on another
  167. server. This flag is off by default for backwards
  168. compatibility.
  169. @param check_keys: (default True) If True, the key is checked
  170. to ensure it is the correct length and composed of the right
  171. characters.
  172. """
  173. super(Client, self).__init__()
  174. self.debug = debug
  175. self.dead_retry = dead_retry
  176. self.socket_timeout = socket_timeout
  177. self.flush_on_reconnect = flush_on_reconnect
  178. self.set_servers(servers)
  179. self.stats = {}
  180. self.cache_cas = cache_cas
  181. self.reset_cas()
  182. self.do_check_key = check_keys
  183. # Allow users to modify pickling/unpickling behavior
  184. self.pickleProtocol = pickleProtocol
  185. self.pickler = pickler
  186. self.unpickler = unpickler
  187. self.compressor = compressor
  188. self.decompressor = decompressor
  189. self.persistent_load = pload
  190. self.persistent_id = pid
  191. self.server_max_key_length = server_max_key_length
  192. if self.server_max_key_length is None:
  193. self.server_max_key_length = SERVER_MAX_KEY_LENGTH
  194. self.server_max_value_length = server_max_value_length
  195. if self.server_max_value_length is None:
  196. self.server_max_value_length = SERVER_MAX_VALUE_LENGTH
  197. # figure out the pickler style
  198. file = BytesIO()
  199. try:
  200. pickler = self.pickler(file, protocol=self.pickleProtocol)
  201. self.picklerIsKeyword = True
  202. except TypeError:
  203. self.picklerIsKeyword = False
  204. def _encode_key(self, key):
  205. if isinstance(key, tuple):
  206. if isinstance(key[1], six.text_type):
  207. return (key[0], key[1].encode('utf8'))
  208. elif isinstance(key, six.text_type):
  209. return key.encode('utf8')
  210. return key
  211. def _encode_cmd(self, cmd, key, headers, noreply, *args):
  212. cmd_bytes = cmd.encode('utf-8') if six.PY3 else cmd
  213. fullcmd = [cmd_bytes, b' ', key]
  214. if headers:
  215. if six.PY3:
  216. headers = headers.encode('utf-8')
  217. fullcmd.append(b' ')
  218. fullcmd.append(headers)
  219. if noreply:
  220. fullcmd.append(b' noreply')
  221. if args:
  222. fullcmd.append(b' ')
  223. fullcmd.extend(args)
  224. return b''.join(fullcmd)
  225. def reset_cas(self):
  226. """Reset the cas cache.
  227. This is only used if the Client() object was created with
  228. "cache_cas=True". If used, this cache does not expire
  229. internally, so it can grow unbounded if you do not clear it
  230. yourself.
  231. """
  232. self.cas_ids = {}
  233. def set_servers(self, servers):
  234. """Set the pool of servers used by this client.
  235. @param servers: an array of servers.
  236. Servers can be passed in two forms:
  237. 1. Strings of the form C{"host:port"}, which implies a
  238. default weight of 1.
  239. 2. Tuples of the form C{("host:port", weight)}, where
  240. C{weight} is an integer weight value.
  241. """
  242. self.servers = [_Host(s, self.debug, dead_retry=self.dead_retry,
  243. socket_timeout=self.socket_timeout,
  244. flush_on_reconnect=self.flush_on_reconnect)
  245. for s in servers]
  246. self._init_buckets()
  247. def get_stats(self, stat_args=None):
  248. """Get statistics from each of the servers.
  249. @param stat_args: Additional arguments to pass to the memcache
  250. "stats" command.
  251. @return: A list of tuples ( server_identifier,
  252. stats_dictionary ). The dictionary contains a number of
  253. name/value pairs specifying the name of the status field
  254. and the string value associated with it. The values are
  255. not converted from strings.
  256. """
  257. data = []
  258. for s in self.servers:
  259. if not s.connect():
  260. continue
  261. if s.family == socket.AF_INET:
  262. name = '%s:%s (%s)' % (s.ip, s.port, s.weight)
  263. elif s.family == socket.AF_INET6:
  264. name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight)
  265. else:
  266. name = 'unix:%s (%s)' % (s.address, s.weight)
  267. if not stat_args:
  268. s.send_cmd('stats')
  269. else:
  270. s.send_cmd('stats ' + stat_args)
  271. serverData = {}
  272. data.append((name, serverData))
  273. readline = s.readline
  274. while 1:
  275. line = readline()
  276. if not line or line.decode('ascii').strip() == 'END':
  277. break
  278. stats = line.decode('ascii').split(' ', 2)
  279. serverData[stats[1]] = stats[2]
  280. return(data)
  281. def get_slab_stats(self):
  282. data = []
  283. for s in self.servers:
  284. if not s.connect():
  285. continue
  286. if s.family == socket.AF_INET:
  287. name = '%s:%s (%s)' % (s.ip, s.port, s.weight)
  288. elif s.family == socket.AF_INET6:
  289. name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight)
  290. else:
  291. name = 'unix:%s (%s)' % (s.address, s.weight)
  292. serverData = {}
  293. data.append((name, serverData))
  294. s.send_cmd('stats slabs')
  295. readline = s.readline
  296. while 1:
  297. line = readline()
  298. if not line or line.strip() == 'END':
  299. break
  300. item = line.split(' ', 2)
  301. if line.startswith('STAT active_slabs') or line.startswith('STAT total_malloced'):
  302. serverData[item[1]]=item[2]
  303. else:
  304. # 0 = STAT, 1 = ITEM, 2 = Value
  305. slab = item[1].split(':', 2)
  306. # 0 = Slab #, 1 = Name
  307. if slab[0] not in serverData:
  308. serverData[slab[0]] = {}
  309. serverData[slab[0]][slab[1]] = item[2]
  310. return data
  311. def get_slabs(self):
  312. data = []
  313. for s in self.servers:
  314. if not s.connect():
  315. continue
  316. if s.family == socket.AF_INET:
  317. name = '%s:%s (%s)' % (s.ip, s.port, s.weight)
  318. elif s.family == socket.AF_INET6:
  319. name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight)
  320. else:
  321. name = 'unix:%s (%s)' % (s.address, s.weight)
  322. serverData = {}
  323. data.append((name, serverData))
  324. s.send_cmd('stats items')
  325. readline = s.readline
  326. while 1:
  327. line = readline()
  328. if not line or line.strip() == 'END':
  329. break
  330. item = line.split(' ', 2)
  331. # 0 = STAT, 1 = ITEM, 2 = Value
  332. slab = item[1].split(':', 2)
  333. # 0 = items, 1 = Slab #, 2 = Name
  334. if slab[1] not in serverData:
  335. serverData[slab[1]] = {}
  336. serverData[slab[1]][slab[2]] = item[2]
  337. return data
  338. def flush_all(self):
  339. """Expire all data in memcache servers that are reachable."""
  340. for s in self.servers:
  341. if not s.connect():
  342. continue
  343. s.flush()
  344. def debuglog(self, str):
  345. if self.debug:
  346. sys.stderr.write("MemCached: %s\n" % str)
  347. def _statlog(self, func):
  348. if func not in self.stats:
  349. self.stats[func] = 1
  350. else:
  351. self.stats[func] += 1
  352. def forget_dead_hosts(self):
  353. """Reset every host in the pool to an "alive" state."""
  354. for s in self.servers:
  355. s.deaduntil = 0
  356. def _init_buckets(self):
  357. self.buckets = []
  358. for server in self.servers:
  359. for i in range(server.weight):
  360. self.buckets.append(server)
  361. def _get_server(self, key):
  362. if isinstance(key, tuple):
  363. serverhash, key = key
  364. else:
  365. serverhash = serverHashFunction(key)
  366. if not self.buckets:
  367. return None, None
  368. for i in range(Client._SERVER_RETRIES):
  369. server = self.buckets[serverhash % len(self.buckets)]
  370. if server.connect():
  371. # print("(using server %s)" % server,)
  372. return server, key
  373. serverhash = str(serverhash) + str(i)
  374. if isinstance(serverhash, six.text_type):
  375. serverhash = serverhash.encode('ascii')
  376. serverhash = serverHashFunction(serverhash)
  377. return None, None
  378. def disconnect_all(self):
  379. for s in self.servers:
  380. s.close_socket()
  381. def delete_multi(self, keys, time=None, key_prefix='', noreply=False):
  382. """Delete multiple keys in the memcache doing just one query.
  383. >>> notset_keys = mc.set_multi({'a1' : 'val1', 'a2' : 'val2'})
  384. >>> mc.get_multi(['a1', 'a2']) == {'a1' : 'val1','a2' : 'val2'}
  385. 1
  386. >>> mc.delete_multi(['key1', 'key2'])
  387. 1
  388. >>> mc.get_multi(['key1', 'key2']) == {}
  389. 1
  390. This method is recommended over iterated regular L{delete}s as
  391. it reduces total latency, since your app doesn't have to wait
  392. for each round-trip of L{delete} before sending the next one.
  393. @param keys: An iterable of keys to clear
  394. @param time: number of seconds any subsequent set / update
  395. commands should fail. Defaults to 0 for no delay.
  396. @param key_prefix: Optional string to prepend to each key when
  397. sending to memcache. See docs for L{get_multi} and
  398. L{set_multi}.
  399. @param noreply: optional parameter instructs the server to not send the
  400. reply.
  401. @return: 1 if no failure in communication with any memcacheds.
  402. @rtype: int
  403. """
  404. self._statlog('delete_multi')
  405. server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(
  406. keys, key_prefix)
  407. # send out all requests on each server before reading anything
  408. dead_servers = []
  409. rc = 1
  410. for server in six.iterkeys(server_keys):
  411. bigcmd = []
  412. write = bigcmd.append
  413. if time is not None:
  414. headers = str(time)
  415. else:
  416. headers = None
  417. for key in server_keys[server]: # These are mangled keys
  418. cmd = self._encode_cmd('delete', key, headers, noreply, b'\r\n')
  419. write(cmd)
  420. try:
  421. server.send_cmds(b''.join(bigcmd))
  422. except socket.error as msg:
  423. rc = 0
  424. if isinstance(msg, tuple):
  425. msg = msg[1]
  426. server.mark_dead(msg)
  427. dead_servers.append(server)
  428. # if noreply, just return
  429. if noreply:
  430. return rc
  431. # if any servers died on the way, don't expect them to respond.
  432. for server in dead_servers:
  433. del server_keys[server]
  434. for server, keys in six.iteritems(server_keys):
  435. try:
  436. for key in keys:
  437. server.expect(b"DELETED")
  438. except socket.error as msg:
  439. if isinstance(msg, tuple):
  440. msg = msg[1]
  441. server.mark_dead(msg)
  442. rc = 0
  443. return rc
  444. def delete(self, key, time=None, noreply=False):
  445. '''Deletes a key from the memcache.
  446. @return: Nonzero on success.
  447. @param time: number of seconds any subsequent set / update commands
  448. should fail. Defaults to None for no delay.
  449. @param noreply: optional parameter instructs the server to not send the
  450. reply.
  451. @rtype: int
  452. '''
  453. return self._deletetouch([b'DELETED', b'NOT_FOUND'], "delete", key,
  454. time, noreply)
  455. def touch(self, key, time=0, noreply=False):
  456. '''Updates the expiration time of a key in memcache.
  457. @return: Nonzero on success.
  458. @param time: Tells memcached the time which this value should
  459. expire, either as a delta number of seconds, or an absolute
  460. unix time-since-the-epoch value. See the memcached protocol
  461. docs section "Storage Commands" for more info on <exptime>. We
  462. default to 0 == cache forever.
  463. @param noreply: optional parameter instructs the server to not send the
  464. reply.
  465. @rtype: int
  466. '''
  467. return self._deletetouch([b'TOUCHED'], "touch", key, time, noreply)
  468. def _deletetouch(self, expected, cmd, key, time=0, noreply=False):
  469. key = self._encode_key(key)
  470. if self.do_check_key:
  471. self.check_key(key)
  472. server, key = self._get_server(key)
  473. if not server:
  474. return 0
  475. self._statlog(cmd)
  476. if time is not None and time != 0:
  477. headers = str(time)
  478. else:
  479. headers = None
  480. fullcmd = self._encode_cmd(cmd, key, headers, noreply)
  481. try:
  482. server.send_cmd(fullcmd)
  483. if noreply:
  484. return 1
  485. line = server.readline()
  486. if line and line.strip() in expected:
  487. return 1
  488. self.debuglog('%s expected %s, got: %r'
  489. % (cmd, ' or '.join(expected), line))
  490. except socket.error as msg:
  491. if isinstance(msg, tuple):
  492. msg = msg[1]
  493. server.mark_dead(msg)
  494. return 0
  495. def incr(self, key, delta=1, noreply=False):
  496. """Increment value for C{key} by C{delta}
  497. Sends a command to the server to atomically increment the
  498. value for C{key} by C{delta}, or by 1 if C{delta} is
  499. unspecified. Returns None if C{key} doesn't exist on server,
  500. otherwise it returns the new value after incrementing.
  501. Note that the value for C{key} must already exist in the
  502. memcache, and it must be the string representation of an
  503. integer.
  504. >>> mc.set("counter", "20") # returns 1, indicating success
  505. 1
  506. >>> mc.incr("counter")
  507. 21
  508. >>> mc.incr("counter")
  509. 22
  510. Overflow on server is not checked. Be aware of values
  511. approaching 2**32. See L{decr}.
  512. @param delta: Integer amount to increment by (should be zero
  513. or greater).
  514. @param noreply: optional parameter instructs the server to not send the
  515. reply.
  516. @return: New value after incrementing, no None for noreply or error.
  517. @rtype: int
  518. """
  519. return self._incrdecr("incr", key, delta, noreply)
  520. def decr(self, key, delta=1, noreply=False):
  521. """Decrement value for C{key} by C{delta}
  522. Like L{incr}, but decrements. Unlike L{incr}, underflow is
  523. checked and new values are capped at 0. If server value is 1,
  524. a decrement of 2 returns 0, not -1.
  525. @param delta: Integer amount to decrement by (should be zero
  526. or greater).
  527. @param noreply: optional parameter instructs the server to not send the
  528. reply.
  529. @return: New value after decrementing, or None for noreply or error.
  530. @rtype: int
  531. """
  532. return self._incrdecr("decr", key, delta, noreply)
  533. def _incrdecr(self, cmd, key, delta, noreply=False):
  534. key = self._encode_key(key)
  535. if self.do_check_key:
  536. self.check_key(key)
  537. server, key = self._get_server(key)
  538. if not server:
  539. return None
  540. self._statlog(cmd)
  541. fullcmd = self._encode_cmd(cmd, key, str(delta), noreply)
  542. try:
  543. server.send_cmd(fullcmd)
  544. if noreply:
  545. return
  546. line = server.readline()
  547. if line is None or line.strip() == b'NOT_FOUND':
  548. return None
  549. return int(line)
  550. except socket.error as msg:
  551. if isinstance(msg, tuple):
  552. msg = msg[1]
  553. server.mark_dead(msg)
  554. return None
  555. def add(self, key, val, time=0, min_compress_len=0, noreply=False):
  556. '''Add new key with value.
  557. Like L{set}, but only stores in memcache if the key doesn't
  558. already exist.
  559. @return: Nonzero on success.
  560. @rtype: int
  561. '''
  562. return self._set("add", key, val, time, min_compress_len, noreply)
  563. def append(self, key, val, time=0, min_compress_len=0, noreply=False):
  564. '''Append the value to the end of the existing key's value.
  565. Only stores in memcache if key already exists.
  566. Also see L{prepend}.
  567. @return: Nonzero on success.
  568. @rtype: int
  569. '''
  570. return self._set("append", key, val, time, min_compress_len, noreply)
  571. def prepend(self, key, val, time=0, min_compress_len=0, noreply=False):
  572. '''Prepend the value to the beginning of the existing key's value.
  573. Only stores in memcache if key already exists.
  574. Also see L{append}.
  575. @return: Nonzero on success.
  576. @rtype: int
  577. '''
  578. return self._set("prepend", key, val, time, min_compress_len, noreply)
  579. def replace(self, key, val, time=0, min_compress_len=0, noreply=False):
  580. '''Replace existing key with value.
  581. Like L{set}, but only stores in memcache if the key already exists.
  582. The opposite of L{add}.
  583. @return: Nonzero on success.
  584. @rtype: int
  585. '''
  586. return self._set("replace", key, val, time, min_compress_len, noreply)
  587. def set(self, key, val, time=0, min_compress_len=0, noreply=False):
  588. '''Unconditionally sets a key to a given value in the memcache.
  589. The C{key} can optionally be an tuple, with the first element
  590. being the server hash value and the second being the key. If
  591. you want to avoid making this module calculate a hash value.
  592. You may prefer, for example, to keep all of a given user's
  593. objects on the same memcache server, so you could use the
  594. user's unique id as the hash value.
  595. @return: Nonzero on success.
  596. @rtype: int
  597. @param time: Tells memcached the time which this value should
  598. expire, either as a delta number of seconds, or an absolute
  599. unix time-since-the-epoch value. See the memcached protocol
  600. docs section "Storage Commands" for more info on <exptime>. We
  601. default to 0 == cache forever.
  602. @param min_compress_len: The threshold length to kick in
  603. auto-compression of the value using the compressor
  604. routine. If the value being cached is a string, then the
  605. length of the string is measured, else if the value is an
  606. object, then the length of the pickle result is measured. If
  607. the resulting attempt at compression yeilds a larger string
  608. than the input, then it is discarded. For backwards
  609. compatability, this parameter defaults to 0, indicating don't
  610. ever try to compress.
  611. @param noreply: optional parameter instructs the server to not
  612. send the reply.
  613. '''
  614. return self._set("set", key, val, time, min_compress_len, noreply)
  615. def cas(self, key, val, time=0, min_compress_len=0, noreply=False):
  616. '''Check and set (CAS)
  617. Sets a key to a given value in the memcache if it hasn't been
  618. altered since last fetched. (See L{gets}).
  619. The C{key} can optionally be an tuple, with the first element
  620. being the server hash value and the second being the key. If
  621. you want to avoid making this module calculate a hash value.
  622. You may prefer, for example, to keep all of a given user's
  623. objects on the same memcache server, so you could use the
  624. user's unique id as the hash value.
  625. @return: Nonzero on success.
  626. @rtype: int
  627. @param time: Tells memcached the time which this value should
  628. expire, either as a delta number of seconds, or an absolute
  629. unix time-since-the-epoch value. See the memcached protocol
  630. docs section "Storage Commands" for more info on <exptime>. We
  631. default to 0 == cache forever.
  632. @param min_compress_len: The threshold length to kick in
  633. auto-compression of the value using the compressor
  634. routine. If the value being cached is a string, then the
  635. length of the string is measured, else if the value is an
  636. object, then the length of the pickle result is measured. If
  637. the resulting attempt at compression yeilds a larger string
  638. than the input, then it is discarded. For backwards
  639. compatability, this parameter defaults to 0, indicating don't
  640. ever try to compress.
  641. @param noreply: optional parameter instructs the server to not
  642. send the reply.
  643. '''
  644. return self._set("cas", key, val, time, min_compress_len, noreply)
  645. def _map_and_prefix_keys(self, key_iterable, key_prefix):
  646. """Compute the mapping of server (_Host instance) -> list of keys to
  647. stuff onto that server, as well as the mapping of prefixed key
  648. -> original key.
  649. """
  650. key_prefix = self._encode_key(key_prefix)
  651. # Check it just once ...
  652. key_extra_len = len(key_prefix)
  653. if key_prefix and self.do_check_key:
  654. self.check_key(key_prefix)
  655. # server (_Host) -> list of unprefixed server keys in mapping
  656. server_keys = {}
  657. prefixed_to_orig_key = {}
  658. # build up a list for each server of all the keys we want.
  659. for orig_key in key_iterable:
  660. if isinstance(orig_key, tuple):
  661. # Tuple of hashvalue, key ala _get_server(). Caller is
  662. # essentially telling us what server to stuff this on.
  663. # Ensure call to _get_server gets a Tuple as well.
  664. serverhash, key = orig_key
  665. key = self._encode_key(key)
  666. if not isinstance(key, six.binary_type):
  667. # set_multi supports int / long keys.
  668. key = str(key)
  669. if six.PY3:
  670. key = key.encode('utf8')
  671. bytes_orig_key = key
  672. # Gotta pre-mangle key before hashing to a
  673. # server. Returns the mangled key.
  674. server, key = self._get_server(
  675. (serverhash, key_prefix + key))
  676. orig_key = orig_key[1]
  677. else:
  678. key = self._encode_key(orig_key)
  679. if not isinstance(key, six.binary_type):
  680. # set_multi supports int / long keys.
  681. key = str(key)
  682. if six.PY3:
  683. key = key.encode('utf8')
  684. bytes_orig_key = key
  685. server, key = self._get_server(key_prefix + key)
  686. # alert when passed in key is None
  687. if orig_key is None:
  688. self.check_key(orig_key, key_extra_len=key_extra_len)
  689. # Now check to make sure key length is proper ...
  690. if self.do_check_key:
  691. self.check_key(bytes_orig_key, key_extra_len=key_extra_len)
  692. if not server:
  693. continue
  694. if server not in server_keys:
  695. server_keys[server] = []
  696. server_keys[server].append(key)
  697. prefixed_to_orig_key[key] = orig_key
  698. return (server_keys, prefixed_to_orig_key)
  699. def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0,
  700. noreply=False):
  701. '''Sets multiple keys in the memcache doing just one query.
  702. >>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'})
  703. >>> mc.get_multi(['key1', 'key2']) == {'key1' : 'val1',
  704. ... 'key2' : 'val2'}
  705. 1
  706. This method is recommended over regular L{set} as it lowers
  707. the number of total packets flying around your network,
  708. reducing total latency, since your app doesn't have to wait
  709. for each round-trip of L{set} before sending the next one.
  710. @param mapping: A dict of key/value pairs to set.
  711. @param time: Tells memcached the time which this value should
  712. expire, either as a delta number of seconds, or an
  713. absolute unix time-since-the-epoch value. See the
  714. memcached protocol docs section "Storage Commands" for
  715. more info on <exptime>. We default to 0 == cache forever.
  716. @param key_prefix: Optional string to prepend to each key when
  717. sending to memcache. Allows you to efficiently stuff these
  718. keys into a pseudo-namespace in memcache:
  719. >> notset_keys = mc.set_multi(
  720. ... {'key1' : 'val1', 'key2' : 'val2'},
  721. ... key_prefix='subspace_')
  722. >>> len(notset_keys) == 0
  723. True
  724. >>> mc.get_multi(['subspace_key1',
  725. ... 'subspace_key2']) == {'subspace_key1': 'val1',
  726. ... 'subspace_key2' : 'val2'}
  727. True
  728. Causes key 'subspace_key1' and 'subspace_key2' to be
  729. set. Useful in conjunction with a higher-level layer which
  730. applies namespaces to data in memcache. In this case, the
  731. return result would be the list of notset original keys,
  732. prefix not applied.
  733. @param min_compress_len: The threshold length to kick in
  734. auto-compression of the value using the compressor
  735. routine. If the value being cached is a string, then the
  736. length of the string is measured, else if the value is an
  737. object, then the length of the pickle result is
  738. measured. If the resulting attempt at compression yeilds a
  739. larger string than the input, then it is discarded. For
  740. backwards compatability, this parameter defaults to 0,
  741. indicating don't ever try to compress.
  742. @param noreply: optional parameter instructs the server to not
  743. send the reply.
  744. @return: List of keys which failed to be stored [ memcache out
  745. of memory, etc. ].
  746. @rtype: list
  747. '''
  748. self._statlog('set_multi')
  749. server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(
  750. six.iterkeys(mapping), key_prefix)
  751. # send out all requests on each server before reading anything
  752. dead_servers = []
  753. notstored = [] # original keys.
  754. for server in six.iterkeys(server_keys):
  755. bigcmd = []
  756. write = bigcmd.append
  757. try:
  758. for key in server_keys[server]: # These are mangled keys
  759. store_info = self._val_to_store_info(
  760. mapping[prefixed_to_orig_key[key]],
  761. min_compress_len)
  762. if store_info:
  763. flags, len_val, val = store_info
  764. headers = "%d %d %d" % (flags, time, len_val)
  765. fullcmd = self._encode_cmd('set', key, headers,
  766. noreply,
  767. b'\r\n', val, b'\r\n')
  768. write(fullcmd)
  769. else:
  770. notstored.append(prefixed_to_orig_key[key])
  771. server.send_cmds(b''.join(bigcmd))
  772. except socket.error as msg:
  773. if isinstance(msg, tuple):
  774. msg = msg[1]
  775. server.mark_dead(msg)
  776. dead_servers.append(server)
  777. # if noreply, just return early
  778. if noreply:
  779. return notstored
  780. # if any servers died on the way, don't expect them to respond.
  781. for server in dead_servers:
  782. del server_keys[server]
  783. # short-circuit if there are no servers, just return all keys
  784. if not server_keys:
  785. return list(mapping.keys())
  786. for server, keys in six.iteritems(server_keys):
  787. try:
  788. for key in keys:
  789. if server.readline() == b'STORED':
  790. continue
  791. else:
  792. # un-mangle.
  793. notstored.append(prefixed_to_orig_key[key])
  794. except (_Error, socket.error) as msg:
  795. if isinstance(msg, tuple):
  796. msg = msg[1]
  797. server.mark_dead(msg)
  798. return notstored
  799. def _val_to_store_info(self, val, min_compress_len):
  800. """Transform val to a storable representation.
  801. Returns a tuple of the flags, the length of the new value, and
  802. the new value itself.
  803. """
  804. flags = 0
  805. if isinstance(val, six.binary_type):
  806. pass
  807. elif isinstance(val, six.text_type):
  808. val = val.encode('utf-8')
  809. elif isinstance(val, int):
  810. flags |= Client._FLAG_INTEGER
  811. val = '%d' % val
  812. if six.PY3:
  813. val = val.encode('ascii')
  814. # force no attempt to compress this silly string.
  815. min_compress_len = 0
  816. elif six.PY2 and isinstance(val, long):
  817. flags |= Client._FLAG_LONG
  818. val = str(val)
  819. if six.PY3:
  820. val = val.encode('ascii')
  821. # force no attempt to compress this silly string.
  822. min_compress_len = 0
  823. else:
  824. flags |= Client._FLAG_PICKLE
  825. file = BytesIO()
  826. if self.picklerIsKeyword:
  827. pickler = self.pickler(file, protocol=self.pickleProtocol)
  828. else:
  829. pickler = self.pickler(file, self.pickleProtocol)
  830. if self.persistent_id:
  831. pickler.persistent_id = self.persistent_id
  832. pickler.dump(val)
  833. val = file.getvalue()
  834. lv = len(val)
  835. # We should try to compress if min_compress_len > 0
  836. # and this string is longer than our min threshold.
  837. if min_compress_len and lv > min_compress_len:
  838. comp_val = self.compressor(val)
  839. # Only retain the result if the compression result is smaller
  840. # than the original.
  841. if len(comp_val) < lv:
  842. flags |= Client._FLAG_COMPRESSED
  843. val = comp_val
  844. # silently do not store if value length exceeds maximum
  845. if (self.server_max_value_length != 0 and
  846. len(val) > self.server_max_value_length):
  847. return(0)
  848. return (flags, len(val), val)
  849. def _set(self, cmd, key, val, time, min_compress_len=0, noreply=False):
  850. key = self._encode_key(key)
  851. if self.do_check_key:
  852. self.check_key(key)
  853. server, key = self._get_server(key)
  854. if not server:
  855. return 0
  856. def _unsafe_set():
  857. self._statlog(cmd)
  858. if cmd == 'cas' and key not in self.cas_ids:
  859. return self._set('set', key, val, time, min_compress_len,
  860. noreply)
  861. store_info = self._val_to_store_info(val, min_compress_len)
  862. if not store_info:
  863. return(0)
  864. flags, len_val, encoded_val = store_info
  865. if cmd == 'cas':
  866. headers = ("%d %d %d %d"
  867. % (flags, time, len_val, self.cas_ids[key]))
  868. else:
  869. headers = "%d %d %d" % (flags, time, len_val)
  870. fullcmd = self._encode_cmd(cmd, key, headers, noreply,
  871. b'\r\n', encoded_val)
  872. try:
  873. server.send_cmd(fullcmd)
  874. if noreply:
  875. return True
  876. return(server.expect(b"STORED", raise_exception=True)
  877. == b"STORED")
  878. except socket.error as msg:
  879. if isinstance(msg, tuple):
  880. msg = msg[1]
  881. server.mark_dead(msg)
  882. return 0
  883. try:
  884. return _unsafe_set()
  885. except _ConnectionDeadError:
  886. # retry once
  887. try:
  888. if server._get_socket():
  889. return _unsafe_set()
  890. except (_ConnectionDeadError, socket.error) as msg:
  891. server.mark_dead(msg)
  892. return 0
  893. def _get(self, cmd, key):
  894. key = self._encode_key(key)
  895. if self.do_check_key:
  896. self.check_key(key)
  897. server, key = self._get_server(key)
  898. if not server:
  899. return None
  900. def _unsafe_get():
  901. self._statlog(cmd)
  902. try:
  903. cmd_bytes = cmd.encode('utf-8') if six.PY3 else cmd
  904. fullcmd = b''.join((cmd_bytes, b' ', key))
  905. server.send_cmd(fullcmd)
  906. rkey = flags = rlen = cas_id = None
  907. if cmd == 'gets':
  908. rkey, flags, rlen, cas_id, = self._expect_cas_value(
  909. server, raise_exception=True
  910. )
  911. if rkey and self.cache_cas:
  912. self.cas_ids[rkey] = cas_id
  913. else:
  914. rkey, flags, rlen, = self._expectvalue(
  915. server, raise_exception=True
  916. )
  917. if not rkey:
  918. return None
  919. try:
  920. value = self._recv_value(server, flags, rlen)
  921. finally:
  922. server.expect(b"END", raise_exception=True)
  923. except (_Error, socket.error) as msg:
  924. if isinstance(msg, tuple):
  925. msg = msg[1]
  926. server.mark_dead(msg)
  927. return None
  928. return value
  929. try:
  930. return _unsafe_get()
  931. except _ConnectionDeadError:
  932. # retry once
  933. try:
  934. if server.connect():
  935. return _unsafe_get()
  936. return None
  937. except (_ConnectionDeadError, socket.error) as msg:
  938. server.mark_dead(msg)
  939. return None
  940. def get(self, key):
  941. '''Retrieves a key from the memcache.
  942. @return: The value or None.
  943. '''
  944. return self._get('get', key)
  945. def gets(self, key):
  946. '''Retrieves a key from the memcache. Used in conjunction with 'cas'.
  947. @return: The value or None.
  948. '''
  949. return self._get('gets', key)
  950. def get_multi(self, keys, key_prefix=''):
  951. '''Retrieves multiple keys from the memcache doing just one query.
  952. >>> success = mc.set("foo", "bar")
  953. >>> success = mc.set("baz", 42)
  954. >>> mc.get_multi(["foo", "baz", "foobar"]) == {
  955. ... "foo": "bar", "baz": 42
  956. ... }
  957. 1
  958. >>> mc.set_multi({'k1' : 1, 'k2' : 2}, key_prefix='pfx_') == []
  959. 1
  960. This looks up keys 'pfx_k1', 'pfx_k2', ... . Returned dict
  961. will just have unprefixed keys 'k1', 'k2'.
  962. >>> mc.get_multi(['k1', 'k2', 'nonexist'],
  963. ... key_prefix='pfx_') == {'k1' : 1, 'k2' : 2}
  964. 1
  965. get_mult [ and L{set_multi} ] can take str()-ables like ints /
  966. longs as keys too. Such as your db pri key fields. They're
  967. rotored through str() before being passed off to memcache,
  968. with or without the use of a key_prefix. In this mode, the
  969. key_prefix could be a table name, and the key itself a db
  970. primary key number.
  971. >>> mc.set_multi({42: 'douglass adams',
  972. ... 46: 'and 2 just ahead of me'},
  973. ... key_prefix='numkeys_') == []
  974. 1
  975. >>> mc.get_multi([46, 42], key_prefix='numkeys_') == {
  976. ... 42: 'douglass adams',
  977. ... 46: 'and 2 just ahead of me'
  978. ... }
  979. 1
  980. This method is recommended over regular L{get} as it lowers
  981. the number of total packets flying around your network,
  982. reducing total latency, since your app doesn't have to wait
  983. for each round-trip of L{get} before sending the next one.
  984. See also L{set_multi}.
  985. @param keys: An array of keys.
  986. @param key_prefix: A string to prefix each key when we
  987. communicate with memcache. Facilitates pseudo-namespaces
  988. within memcache. Returned dictionary keys will not have this
  989. prefix.
  990. @return: A dictionary of key/value pairs that were
  991. available. If key_prefix was provided, the keys in the retured
  992. dictionary will not have it present.
  993. '''
  994. self._statlog('get_multi')
  995. server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(
  996. keys, key_prefix)
  997. # send out all requests on each server before reading anything
  998. dead_servers = []
  999. for server in six.iterkeys(server_keys):
  1000. try:
  1001. fullcmd = b"get " + b" ".join(server_keys[server])
  1002. server.send_cmd(fullcmd)
  1003. except socket.error as msg:
  1004. if isinstance(msg, tuple):
  1005. msg = msg[1]
  1006. server.mark_dead(msg)
  1007. dead_servers.append(server)
  1008. # if any servers died on the way, don't expect them to respond.
  1009. for server in dead_servers:
  1010. del server_keys[server]
  1011. retvals = {}
  1012. for server in six.iterkeys(server_keys):
  1013. try:
  1014. line = server.readline()
  1015. while line and line != b'END':
  1016. rkey, flags, rlen = self._expectvalue(server, line)
  1017. # Bo Yang reports that this can sometimes be None
  1018. if rkey is not None:
  1019. val = self._recv_value(server, flags, rlen)
  1020. # un-prefix returned key.
  1021. retvals[prefixed_to_orig_key[rkey]] = val
  1022. line = server.readline()
  1023. except (_Error, socket.error) as msg:
  1024. if isinstance(msg, tuple):
  1025. msg = msg[1]
  1026. server.mark_dead(msg)
  1027. return retvals
  1028. def _expect_cas_value(self, server, line=None, raise_exception=False):
  1029. if not line:
  1030. line = server.readline(raise_exception)
  1031. if line and line[:5] == b'VALUE':
  1032. resp, rkey, flags, len, cas_id = line.split()
  1033. return (rkey, int(flags), int(len), int(cas_id))
  1034. else:
  1035. return (None, None, None, None)
  1036. def _expectvalue(self, server, line=None, raise_exception=False):
  1037. if not line:
  1038. line = server.readline(raise_exception)
  1039. if line and line[:5] == b'VALUE':
  1040. resp, rkey, flags, len = line.split()
  1041. flags = int(flags)
  1042. rlen = int(len)
  1043. return (rkey, flags, rlen)
  1044. else:
  1045. return (None, None, None)
  1046. def _recv_value(self, server, flags, rlen):
  1047. rlen += 2 # include \r\n
  1048. buf = server.recv(rlen)
  1049. if len(buf) != rlen:
  1050. raise _Error("received %d bytes when expecting %d"
  1051. % (len(buf), rlen))
  1052. if len(buf) == rlen:
  1053. buf = buf[:-2] # strip \r\n
  1054. if flags & Client._FLAG_COMPRESSED:
  1055. buf = self.decompressor(buf)
  1056. flags &= ~Client._FLAG_COMPRESSED
  1057. if flags == 0:
  1058. # Bare string
  1059. if six.PY3:
  1060. val = buf.decode('utf8')
  1061. else:
  1062. val = buf
  1063. elif flags & Client._FLAG_INTEGER:
  1064. val = int(buf)
  1065. elif flags & Client._FLAG_LONG:
  1066. if six.PY3:
  1067. val = int(buf)
  1068. else:
  1069. val = long(buf)
  1070. elif flags & Client._FLAG_PICKLE:
  1071. try:
  1072. file = BytesIO(buf)
  1073. unpickler = self.unpickler(file)
  1074. if self.persistent_load:
  1075. unpickler.persistent_load = self.persistent_load
  1076. val = unpickler.load()
  1077. except Exception as e:
  1078. self.debuglog('Pickle error: %s\n' % e)
  1079. return None
  1080. else:
  1081. self.debuglog("unknown flags on get: %x\n" % flags)
  1082. raise ValueError('Unknown flags on get: %x' % flags)
  1083. return val
  1084. def check_key(self, key, key_extra_len=0):
  1085. """Checks sanity of key.
  1086. Fails if:
  1087. Key length is > SERVER_MAX_KEY_LENGTH (Raises MemcachedKeyLength).
  1088. Contains control characters (Raises MemcachedKeyCharacterError).
  1089. Is not a string (Raises MemcachedStringEncodingError)
  1090. Is an unicode string (Raises MemcachedStringEncodingError)
  1091. Is not a string (Raises MemcachedKeyError)
  1092. Is None (Raises MemcachedKeyError)
  1093. """
  1094. if isinstance(key, tuple):
  1095. key = key[1]
  1096. if key is None:
  1097. raise Client.MemcachedKeyNoneError("Key is None")
  1098. if key is '':
  1099. if key_extra_len is 0:
  1100. raise Client.MemcachedKeyNoneError("Key is empty")
  1101. # key is empty but there is some other component to key
  1102. return
  1103. if not isinstance(key, six.binary_type):
  1104. raise Client.MemcachedKeyTypeError("Key must be a binary string")
  1105. if (self.server_max_key_length != 0 and
  1106. len(key) + key_extra_len > self.server_max_key_length):
  1107. raise Client.MemcachedKeyLengthError(
  1108. "Key length is > %s" % self.server_max_key_length
  1109. )
  1110. if not valid_key_chars_re.match(key):
  1111. raise Client.MemcachedKeyCharacterError(
  1112. "Control/space characters not allowed (key=%r)" % key)
  1113. class _Host(object):
  1114. def __init__(self, host, debug=0, dead_retry=_DEAD_RETRY,
  1115. socket_timeout=_SOCKET_TIMEOUT, flush_on_reconnect=0):
  1116. self.dead_retry = dead_retry
  1117. self.socket_timeout = socket_timeout
  1118. self.debug = debug
  1119. self.flush_on_reconnect = flush_on_reconnect
  1120. if isinstance(host, tuple):
  1121. host, self.weight = host
  1122. else:
  1123. self.weight = 1
  1124. # parse the connection string
  1125. m = re.match(r'^(?P<proto>unix):(?P<path>.*)$', host)
  1126. if not m:
  1127. m = re.match(r'^(?P<proto>inet6):'
  1128. r'\[(?P<host>[^\[\]]+)\](:(?P<port>[0-9]+))?$', host)
  1129. if not m:
  1130. m = re.match(r'^(?P<proto>inet):'
  1131. r'(?P<host>[^:]+)(:(?P<port>[0-9]+))?$', host)
  1132. if not m:
  1133. m = re.match(r'^(?P<host>[^:]+)(:(?P<port>[0-9]+))?$', host)
  1134. if not m:
  1135. raise ValueError('Unable to parse connection string: "%s"' % host)
  1136. hostData = m.groupdict()
  1137. if hostData.get('proto') == 'unix':
  1138. self.family = socket.AF_UNIX
  1139. self.address = hostData['path']
  1140. elif hostData.get('proto') == 'inet6':
  1141. self.family = socket.AF_INET6
  1142. self.ip = hostData['host']
  1143. self.port = int(hostData.get('port') or 11211)
  1144. self.address = (self.ip, self.port)
  1145. else:
  1146. self.family = socket.AF_INET
  1147. self.ip = hostData['host']
  1148. self.port = int(hostData.get('port') or 11211)
  1149. self.address = (self.ip, self.port)
  1150. self.deaduntil = 0
  1151. self.socket = None
  1152. self.flush_on_next_connect = 0
  1153. self.buffer = b''
  1154. def debuglog(self, str):
  1155. if self.debug:
  1156. sys.stderr.write("MemCached: %s\n" % str)
  1157. def _check_dead(self):
  1158. if self.deaduntil and self.deaduntil > time.time():
  1159. return 1
  1160. self.deaduntil = 0
  1161. return 0
  1162. def connect(self):
  1163. if self._get_socket():
  1164. return 1
  1165. return 0
  1166. def mark_dead(self, reason):
  1167. self.debuglog("MemCache: %s: %s. Marking dead." % (self, reason))
  1168. self.deaduntil = time.time() + self.dead_retry
  1169. if self.flush_on_reconnect:
  1170. self.flush_on_next_connect = 1
  1171. self.close_socket()
  1172. def _get_socket(self):
  1173. if self._check_dead():
  1174. return None
  1175. if self.socket:
  1176. return self.socket
  1177. s = socket.socket(self.family, socket.SOCK_STREAM)
  1178. if hasattr(s, 'settimeout'):
  1179. s.settimeout(self.socket_timeout)
  1180. try:
  1181. s.connect(self.address)
  1182. except socket.timeout as msg:
  1183. self.mark_dead("connect: %s" % msg)
  1184. return None
  1185. except socket.error as msg:
  1186. if isinstance(msg, tuple):
  1187. msg = msg[1]
  1188. self.mark_dead("connect: %s" % msg)
  1189. return None
  1190. self.socket = s
  1191. self.buffer = b''
  1192. if self.flush_on_next_connect:
  1193. self.flush()
  1194. self.flush_on_next_connect = 0
  1195. return s
  1196. def close_socket(self):
  1197. if self.socket:
  1198. self.socket.close()
  1199. self.socket = None
  1200. def send_cmd(self, cmd):
  1201. if isinstance(cmd, six.text_type):
  1202. cmd = cmd.encode('utf8')
  1203. self.socket.sendall(cmd + b'\r\n')
  1204. def send_cmds(self, cmds):
  1205. """cmds already has trailing \r\n's applied."""
  1206. if isinstance(cmds, six.text_type):
  1207. cmds = cmds.encode('utf8')
  1208. self.socket.sendall(cmds)
  1209. def readline(self, raise_exception=False):
  1210. """Read a line and return it.
  1211. If "raise_exception" is set, raise _ConnectionDeadError if the
  1212. read fails, otherwise return an empty string.
  1213. """
  1214. buf = self.buffer
  1215. if self.socket:
  1216. recv = self.socket.recv
  1217. else:
  1218. recv = lambda bufsize: b''
  1219. while True:
  1220. index = buf.find(b'\r\n')
  1221. if index >= 0:
  1222. break
  1223. data = recv(4096)
  1224. if not data:
  1225. # connection close, let's kill it and raise
  1226. self.mark_dead('connection closed in readline()')
  1227. if raise_exception:
  1228. raise _ConnectionDeadError()
  1229. else:
  1230. return ''
  1231. buf += data
  1232. self.buffer = buf[index + 2:]
  1233. return buf[:index]
  1234. def expect(self, text, raise_exception=False):
  1235. line = self.readline(raise_exception)
  1236. if self.debug and line != text:
  1237. if six.PY3:
  1238. text = text.decode('utf8')
  1239. log_line = line.decode('utf8', 'replace')
  1240. else:
  1241. log_line = line
  1242. self.debuglog("while expecting %r, got unexpected response %r"
  1243. % (text, log_line))
  1244. return line
  1245. def recv(self, rlen):
  1246. self_socket_recv = self.socket.recv
  1247. buf = self.buffer
  1248. while len(buf) < rlen:
  1249. foo = self_socket_recv(max(rlen - len(buf), 4096))
  1250. buf += foo
  1251. if not foo:
  1252. raise _Error('Read %d bytes, expecting %d, '
  1253. 'read returned 0 length bytes' % (len(buf), rlen))
  1254. self.buffer = buf[rlen:]
  1255. return buf[:rlen]
  1256. def flush(self):
  1257. self.send_cmd('flush_all')
  1258. self.expect(b'OK')
  1259. def __str__(self):
  1260. d = ''
  1261. if self.deaduntil:
  1262. d = " (dead until %d)" % self.deaduntil
  1263. if self.family == socket.AF_INET:
  1264. return "inet:%s:%d%s" % (self.address[0], self.address[1], d)
  1265. elif self.family == socket.AF_INET6:
  1266. return "inet6:[%s]:%d%s" % (self.address[0], self.address[1], d)
  1267. else:
  1268. return "unix:%s%s" % (self.address, d)
  1269. def _doctest():
  1270. import doctest
  1271. import memcache
  1272. servers = ["127.0.0.1:11211"]
  1273. mc = memcache.Client(servers, debug=1)
  1274. globs = {"mc": mc}
  1275. results = doctest.testmod(memcache, globs=globs)
  1276. mc.disconnect_all()
  1277. print("Doctests: %s" % (results,))
  1278. if results.failed:
  1279. sys.exit(1)
  1280. # vim: ts=4 sw=4 et :