hash.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. import collections
  2. import socket
  3. import time
  4. import logging
  5. import six
  6. from pymemcache.client.base import (
  7. Client,
  8. PooledClient,
  9. check_key_helper,
  10. normalize_server_spec,
  11. )
  12. from pymemcache.client.rendezvous import RendezvousHash
  13. from pymemcache.exceptions import MemcacheError
  14. logger = logging.getLogger(__name__)
  15. class HashClient(object):
  16. """
  17. A client for communicating with a cluster of memcached servers
  18. """
  19. #: :class:`Client` class used to create new clients
  20. client_class = Client
  21. def __init__(
  22. self,
  23. servers,
  24. hasher=RendezvousHash,
  25. serde=None,
  26. serializer=None,
  27. deserializer=None,
  28. connect_timeout=None,
  29. timeout=None,
  30. no_delay=False,
  31. socket_module=socket,
  32. socket_keepalive=None,
  33. key_prefix=b'',
  34. max_pool_size=None,
  35. pool_idle_timeout=0,
  36. lock_generator=None,
  37. retry_attempts=2,
  38. retry_timeout=1,
  39. dead_timeout=60,
  40. use_pooling=False,
  41. ignore_exc=False,
  42. allow_unicode_keys=False,
  43. default_noreply=True,
  44. encoding='ascii',
  45. tls_context=None
  46. ):
  47. """
  48. Constructor.
  49. Args:
  50. servers: list() of tuple(hostname, port) or string containing a UNIX
  51. socket path.
  52. hasher: optional class three functions ``get_node``, ``add_node``,
  53. and ``remove_node``
  54. defaults to Rendezvous (HRW) hash.
  55. use_pooling: use py:class:`.PooledClient` as the default underlying
  56. class. ``max_pool_size`` and ``lock_generator`` can
  57. be used with this. default: False
  58. retry_attempts: Amount of times a client should be tried before it
  59. is marked dead and removed from the pool.
  60. retry_timeout (float): Time in seconds that should pass between retry
  61. attempts.
  62. dead_timeout (float): Time in seconds before attempting to add a node
  63. back in the pool.
  64. encoding: optional str, controls data encoding (defaults to 'ascii').
  65. Further arguments are interpreted as for :py:class:`.Client`
  66. constructor.
  67. """
  68. self.clients = {}
  69. self.retry_attempts = retry_attempts
  70. self.retry_timeout = retry_timeout
  71. self.dead_timeout = dead_timeout
  72. self.use_pooling = use_pooling
  73. self.key_prefix = key_prefix
  74. self.ignore_exc = ignore_exc
  75. self.allow_unicode_keys = allow_unicode_keys
  76. self._failed_clients = {}
  77. self._dead_clients = {}
  78. self._last_dead_check_time = time.time()
  79. self.hasher = hasher()
  80. self.default_kwargs = {
  81. 'connect_timeout': connect_timeout,
  82. 'timeout': timeout,
  83. 'no_delay': no_delay,
  84. 'socket_module': socket_module,
  85. 'socket_keepalive': socket_keepalive,
  86. 'key_prefix': key_prefix,
  87. 'serde': serde,
  88. 'serializer': serializer,
  89. 'deserializer': deserializer,
  90. 'allow_unicode_keys': allow_unicode_keys,
  91. 'default_noreply': default_noreply,
  92. 'encoding': encoding,
  93. 'tls_context': tls_context,
  94. }
  95. if use_pooling is True:
  96. self.default_kwargs.update({
  97. 'max_pool_size': max_pool_size,
  98. 'pool_idle_timeout': pool_idle_timeout,
  99. 'lock_generator': lock_generator
  100. })
  101. for server in servers:
  102. self.add_server(normalize_server_spec(server))
  103. self.encoding = encoding
  104. self.tls_context = tls_context
  105. def _make_client_key(self, server):
  106. if isinstance(server, (list, tuple)) and len(server) == 2:
  107. return '%s:%s' % server
  108. return server
  109. def add_server(self, server, port=None):
  110. # To maintain backward compatibility, if a port is provided, assume
  111. # that server wasn't provided as a (host, port) tuple.
  112. if port is not None:
  113. if not isinstance(server, six.string_types):
  114. raise TypeError('Server must be a string when passing port.')
  115. server = (server, port)
  116. _class = PooledClient if self.use_pooling else self.client_class
  117. client = _class(server, **self.default_kwargs)
  118. if self.use_pooling:
  119. client.client_class = self.client_class
  120. key = self._make_client_key(server)
  121. self.clients[key] = client
  122. self.hasher.add_node(key)
  123. def remove_server(self, server, port=None):
  124. # To maintain backward compatibility, if a port is provided, assume
  125. # that server wasn't provided as a (host, port) tuple.
  126. if port is not None:
  127. if not isinstance(server, six.string_types):
  128. raise TypeError('Server must be a string when passing port.')
  129. server = (server, port)
  130. key = self._make_client_key(server)
  131. dead_time = time.time()
  132. self._failed_clients.pop(server)
  133. self._dead_clients[server] = dead_time
  134. self.hasher.remove_node(key)
  135. def _retry_dead(self):
  136. current_time = time.time()
  137. ldc = self._last_dead_check_time
  138. # We have reached the retry timeout
  139. if current_time - ldc > self.dead_timeout:
  140. candidates = []
  141. for server, dead_time in self._dead_clients.items():
  142. if current_time - dead_time > self.dead_timeout:
  143. candidates.append(server)
  144. for server in candidates:
  145. logger.debug(
  146. 'bringing server back into rotation %s',
  147. server
  148. )
  149. self.add_server(server)
  150. del self._dead_clients[server]
  151. self._last_dead_check_time = current_time
  152. def _get_client(self, key):
  153. check_key_helper(key, self.allow_unicode_keys, self.key_prefix)
  154. if self._dead_clients:
  155. self._retry_dead()
  156. server = self.hasher.get_node(key)
  157. # We've ran out of servers to try
  158. if server is None:
  159. if self.ignore_exc is True:
  160. return
  161. raise MemcacheError('All servers seem to be down right now')
  162. return self.clients[server]
  163. def _safely_run_func(self, client, func, default_val, *args, **kwargs):
  164. try:
  165. if client.server in self._failed_clients:
  166. # This server is currently failing, lets check if it is in
  167. # retry or marked as dead
  168. failed_metadata = self._failed_clients[client.server]
  169. # we haven't tried our max amount yet, if it has been enough
  170. # time lets just retry using it
  171. if failed_metadata['attempts'] < self.retry_attempts:
  172. failed_time = failed_metadata['failed_time']
  173. if time.time() - failed_time > self.retry_timeout:
  174. logger.debug(
  175. 'retrying failed server: %s', client.server
  176. )
  177. result = func(*args, **kwargs)
  178. # we were successful, lets remove it from the failed
  179. # clients
  180. self._failed_clients.pop(client.server)
  181. return result
  182. return default_val
  183. else:
  184. # We've reached our max retry attempts, we need to mark
  185. # the sever as dead
  186. logger.debug('marking server as dead: %s', client.server)
  187. self.remove_server(client.server)
  188. result = func(*args, **kwargs)
  189. return result
  190. # Connecting to the server fail, we should enter
  191. # retry mode
  192. except socket.error:
  193. self._mark_failed_server(client.server)
  194. # if we haven't enabled ignore_exc, don't move on gracefully, just
  195. # raise the exception
  196. if not self.ignore_exc:
  197. raise
  198. return default_val
  199. except Exception:
  200. # any exceptions that aren't socket.error we need to handle
  201. # gracefully as well
  202. if not self.ignore_exc:
  203. raise
  204. return default_val
  205. def _safely_run_set_many(self, client, values, *args, **kwargs):
  206. failed = []
  207. succeeded = []
  208. try:
  209. if client.server in self._failed_clients:
  210. # This server is currently failing, lets check if it is in
  211. # retry or marked as dead
  212. failed_metadata = self._failed_clients[client.server]
  213. # we haven't tried our max amount yet, if it has been enough
  214. # time lets just retry using it
  215. if failed_metadata['attempts'] < self.retry_attempts:
  216. failed_time = failed_metadata['failed_time']
  217. if time.time() - failed_time > self.retry_timeout:
  218. logger.debug(
  219. 'retrying failed server: %s', client.server
  220. )
  221. succeeded, failed, err = self._set_many(
  222. client, values, *args, **kwargs)
  223. if err is not None:
  224. raise err
  225. # we were successful, lets remove it from the failed
  226. # clients
  227. self._failed_clients.pop(client.server)
  228. return failed
  229. return values.keys()
  230. else:
  231. # We've reached our max retry attempts, we need to mark
  232. # the sever as dead
  233. logger.debug('marking server as dead: %s', client.server)
  234. self.remove_server(client.server)
  235. succeeded, failed, err = self._set_many(
  236. client, values, *args, **kwargs
  237. )
  238. if err is not None:
  239. raise err
  240. return failed
  241. # Connecting to the server fail, we should enter
  242. # retry mode
  243. except socket.error:
  244. self._mark_failed_server(client.server)
  245. # if we haven't enabled ignore_exc, don't move on gracefully, just
  246. # raise the exception
  247. if not self.ignore_exc:
  248. raise
  249. return list(set(values.keys()) - set(succeeded))
  250. except Exception:
  251. # any exceptions that aren't socket.error we need to handle
  252. # gracefully as well
  253. if not self.ignore_exc:
  254. raise
  255. return list(set(values.keys()) - set(succeeded))
  256. def _mark_failed_server(self, server):
  257. # This client has never failed, lets mark it for failure
  258. if (
  259. server not in self._failed_clients and
  260. self.retry_attempts > 0
  261. ):
  262. self._failed_clients[server] = {
  263. 'failed_time': time.time(),
  264. 'attempts': 0,
  265. }
  266. # We aren't allowing any retries, we should mark the server as
  267. # dead immediately
  268. elif (
  269. server not in self._failed_clients and
  270. self.retry_attempts <= 0
  271. ):
  272. self._failed_clients[server] = {
  273. 'failed_time': time.time(),
  274. 'attempts': 0,
  275. }
  276. logger.debug("marking server as dead %s", server)
  277. self.remove_server(server)
  278. # This client has failed previously, we need to update the metadata
  279. # to reflect that we have attempted it again
  280. else:
  281. failed_metadata = self._failed_clients[server]
  282. failed_metadata['attempts'] += 1
  283. failed_metadata['failed_time'] = time.time()
  284. self._failed_clients[server] = failed_metadata
  285. def _run_cmd(self, cmd, key, default_val, *args, **kwargs):
  286. client = self._get_client(key)
  287. if client is None:
  288. return default_val
  289. func = getattr(client, cmd)
  290. args = list(args)
  291. args.insert(0, key)
  292. return self._safely_run_func(
  293. client, func, default_val, *args, **kwargs
  294. )
  295. def _set_many(self, client, values, *args, **kwargs):
  296. failed = []
  297. succeeded = []
  298. try:
  299. failed = client.set_many(values, *args, **kwargs)
  300. except Exception as e:
  301. if not self.ignore_exc:
  302. return succeeded, failed, e
  303. succeeded = [key for key in six.iterkeys(values) if key not in failed]
  304. return succeeded, failed, None
  305. def close(self):
  306. for client in self.clients.values():
  307. self._safely_run_func(client, client.close, False)
  308. disconnect_all = close
  309. def set(self, key, *args, **kwargs):
  310. return self._run_cmd('set', key, False, *args, **kwargs)
  311. def get(self, key, *args, **kwargs):
  312. return self._run_cmd('get', key, None, *args, **kwargs)
  313. def incr(self, key, *args, **kwargs):
  314. return self._run_cmd('incr', key, False, *args, **kwargs)
  315. def decr(self, key, *args, **kwargs):
  316. return self._run_cmd('decr', key, False, *args, **kwargs)
  317. def set_many(self, values, *args, **kwargs):
  318. client_batches = collections.defaultdict(dict)
  319. failed = []
  320. for key, value in six.iteritems(values):
  321. client = self._get_client(key)
  322. if client is None:
  323. failed.append(key)
  324. continue
  325. client_batches[client.server][key] = value
  326. for server, values in client_batches.items():
  327. client = self.clients[self._make_client_key(server)]
  328. failed += self._safely_run_set_many(
  329. client, values, *args, **kwargs
  330. )
  331. return failed
  332. set_multi = set_many
  333. def get_many(self, keys, gets=False, *args, **kwargs):
  334. client_batches = collections.defaultdict(list)
  335. end = {}
  336. for key in keys:
  337. client = self._get_client(key)
  338. if client is None:
  339. continue
  340. client_batches[client.server].append(key)
  341. for server, keys in client_batches.items():
  342. client = self.clients[self._make_client_key(server)]
  343. new_args = list(args)
  344. new_args.insert(0, keys)
  345. if gets:
  346. get_func = client.gets_many
  347. else:
  348. get_func = client.get_many
  349. result = self._safely_run_func(
  350. client,
  351. get_func, {}, *new_args, **kwargs
  352. )
  353. end.update(result)
  354. return end
  355. get_multi = get_many
  356. def gets(self, key, *args, **kwargs):
  357. return self._run_cmd('gets', key, None, *args, **kwargs)
  358. def gets_many(self, keys, *args, **kwargs):
  359. return self.get_many(keys, gets=True, *args, **kwargs)
  360. gets_multi = gets_many
  361. def add(self, key, *args, **kwargs):
  362. return self._run_cmd('add', key, False, *args, **kwargs)
  363. def prepend(self, key, *args, **kwargs):
  364. return self._run_cmd('prepend', key, False, *args, **kwargs)
  365. def append(self, key, *args, **kwargs):
  366. return self._run_cmd('append', key, False, *args, **kwargs)
  367. def delete(self, key, *args, **kwargs):
  368. return self._run_cmd('delete', key, False, *args, **kwargs)
  369. def delete_many(self, keys, *args, **kwargs):
  370. for key in keys:
  371. self._run_cmd('delete', key, False, *args, **kwargs)
  372. return True
  373. delete_multi = delete_many
  374. def cas(self, key, *args, **kwargs):
  375. return self._run_cmd('cas', key, False, *args, **kwargs)
  376. def replace(self, key, *args, **kwargs):
  377. return self._run_cmd('replace', key, False, *args, **kwargs)
  378. def touch(self, key, *args, **kwargs):
  379. return self._run_cmd('touch', key, False, *args, **kwargs)
  380. def flush_all(self):
  381. for client in self.clients.values():
  382. self._safely_run_func(client, client.flush_all, False)
  383. def quit(self):
  384. for client in self.clients.values():
  385. self._safely_run_func(client, client.quit, False)