hash.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. import collections
  2. import socket
  3. import time
  4. import logging
  5. import six
  6. from library.pymemcache.client.base import (
  7. Client,
  8. PooledClient,
  9. check_key_helper,
  10. normalize_server_spec,
  11. )
  12. from library.pymemcache.client.rendezvous import RendezvousHash
  13. from library.pymemcache.exceptions import MemcacheError
  14. logger = logging.getLogger(__name__)
  15. class HashClient(object):
  16. """
  17. A client for communicating with a cluster of memcached servers
  18. """
  19. #: :class:`Client` class used to create new clients
  20. client_class = Client
  21. def __init__(
  22. self,
  23. servers,
  24. hasher=RendezvousHash,
  25. serde=None,
  26. serializer=None,
  27. deserializer=None,
  28. connect_timeout=None,
  29. timeout=None,
  30. no_delay=False,
  31. socket_module=socket,
  32. socket_keepalive=None,
  33. key_prefix=b'',
  34. max_pool_size=None,
  35. pool_idle_timeout=0,
  36. lock_generator=None,
  37. retry_attempts=2,
  38. retry_timeout=1,
  39. dead_timeout=60,
  40. use_pooling=False,
  41. # ignore_exc=False,
  42. allow_unicode_keys=False,
  43. default_noreply=True,
  44. encoding='ascii',
  45. tls_context=None
  46. ):
  47. """
  48. Constructor.
  49. Args:
  50. servers: list() of tuple(hostname, port) or string containing a UNIX
  51. socket path.
  52. hasher: optional class three functions ``get_node``, ``add_node``,
  53. and ``remove_node``
  54. defaults to Rendezvous (HRW) hash.
  55. use_pooling: use py:class:`.PooledClient` as the default underlying
  56. class. ``max_pool_size`` and ``lock_generator`` can
  57. be used with this. default: False
  58. retry_attempts: Amount of times a client should be tried before it
  59. is marked dead and removed from the pool.
  60. retry_timeout (float): Time in seconds that should pass between retry
  61. attempts.
  62. dead_timeout (float): Time in seconds before attempting to add a node
  63. back in the pool.
  64. encoding: optional str, controls data encoding (defaults to 'ascii').
  65. Further arguments are interpreted as for :py:class:`.Client`
  66. constructor.
  67. """
  68. self.clients = {}
  69. self.retry_attempts = retry_attempts
  70. self.retry_timeout = retry_timeout
  71. self.dead_timeout = dead_timeout
  72. self.use_pooling = use_pooling
  73. self.key_prefix = key_prefix
  74. # self.ignore_exc = ignore_exc
  75. self.allow_unicode_keys = allow_unicode_keys
  76. self._failed_clients = {}
  77. self._dead_clients = {}
  78. self._last_dead_check_time = time.time()
  79. self.hasher = hasher()
  80. self.default_kwargs = {
  81. 'connect_timeout': connect_timeout,
  82. 'timeout': timeout,
  83. 'no_delay': no_delay,
  84. 'socket_module': socket_module,
  85. 'socket_keepalive': socket_keepalive,
  86. 'key_prefix': key_prefix,
  87. 'serde': serde,
  88. 'serializer': serializer,
  89. 'deserializer': deserializer,
  90. 'allow_unicode_keys': allow_unicode_keys,
  91. 'default_noreply': default_noreply,
  92. 'encoding': encoding,
  93. 'tls_context': tls_context,
  94. }
  95. if use_pooling is True:
  96. self.default_kwargs.update({
  97. 'max_pool_size': max_pool_size,
  98. 'pool_idle_timeout': pool_idle_timeout,
  99. 'lock_generator': lock_generator
  100. })
  101. for server in servers:
  102. self.add_server(normalize_server_spec(server))
  103. self.encoding = encoding
  104. self.tls_context = tls_context
  105. def _make_client_key(self, server):
  106. if isinstance(server, (list, tuple)) and len(server) == 2:
  107. return '%s:%s' % server
  108. return server
  109. def add_server(self, server, port=None):
  110. # To maintain backward compatibility, if a port is provided, assume
  111. # that server wasn't provided as a (host, port) tuple.
  112. if port is not None:
  113. if not isinstance(server, six.string_types):
  114. raise TypeError('Server must be a string when passing port.')
  115. server = (server, port)
  116. _class = PooledClient if self.use_pooling else self.client_class
  117. client = _class(server, **self.default_kwargs)
  118. if self.use_pooling:
  119. client.client_class = self.client_class
  120. key = self._make_client_key(server)
  121. self.clients[key] = client
  122. self.hasher.add_node(key)
  123. def remove_server(self, server, port=None):
  124. # To maintain backward compatibility, if a port is provided, assume
  125. # that server wasn't provided as a (host, port) tuple.
  126. if port is not None:
  127. if not isinstance(server, six.string_types):
  128. raise TypeError('Server must be a string when passing port.')
  129. server = (server, port)
  130. key = self._make_client_key(server)
  131. dead_time = time.time()
  132. self._failed_clients.pop(server)
  133. self._dead_clients[server] = dead_time
  134. self.hasher.remove_node(key)
  135. def _retry_dead(self):
  136. current_time = time.time()
  137. ldc = self._last_dead_check_time
  138. # We have reached the retry timeout
  139. if current_time - ldc > self.dead_timeout:
  140. candidates = []
  141. for server, dead_time in self._dead_clients.items():
  142. if current_time - dead_time > self.dead_timeout:
  143. candidates.append(server)
  144. for server in candidates:
  145. logger.debug(
  146. 'bringing server back into rotation %s',
  147. server
  148. )
  149. self.add_server(server)
  150. del self._dead_clients[server]
  151. self._last_dead_check_time = current_time
  152. def _get_client(self, key, ignore_exc = True):
  153. check_key_helper(key, self.allow_unicode_keys, self.key_prefix)
  154. if self._dead_clients:
  155. self._retry_dead()
  156. server = self.hasher.get_node(key)
  157. # We've ran out of servers to try
  158. if server is None:
  159. if ignore_exc is True:
  160. return
  161. raise MemcacheError('All servers seem to be down right now')
  162. return self.clients[server]
  163. def _safely_run_func(self, client, func, default_val, ignore_exc = True, *args, **kwargs):
  164. try:
  165. if client.server in self._failed_clients:
  166. # This server is currently failing, lets check if it is in
  167. # retry or marked as dead
  168. failed_metadata = self._failed_clients[client.server]
  169. # we haven't tried our max amount yet, if it has been enough
  170. # time lets just retry using it
  171. if failed_metadata['attempts'] < self.retry_attempts:
  172. failed_time = failed_metadata['failed_time']
  173. if time.time() - failed_time > self.retry_timeout:
  174. logger.debug(
  175. 'retrying failed server: %s', client.server
  176. )
  177. result = func(*args, **kwargs)
  178. # we were successful, lets remove it from the failed
  179. # clients
  180. self._failed_clients.pop(client.server)
  181. return result
  182. else:
  183. if not ignore_exc:
  184. raise MemcacheError('reach retry timeout.')
  185. return default_val
  186. else:
  187. # We've reached our max retry attempts, we need to mark
  188. # the sever as dead
  189. logger.debug('marking server as dead: %s', client.server)
  190. self.remove_server(client.server)
  191. result = func(*args, **kwargs)
  192. return result
  193. # Connecting to the server fail, we should enter
  194. # retry mode
  195. except socket.error:
  196. self._mark_failed_server(client.server)
  197. # if we haven't enabled ignore_exc, don't move on gracefully, just
  198. # raise the exception
  199. if not ignore_exc:
  200. raise
  201. return default_val
  202. except Exception:
  203. # any exceptions that aren't socket.error we need to handle
  204. # gracefully as well
  205. if not ignore_exc:
  206. raise
  207. return default_val
  208. def _safely_run_set_many(self, client, values, ignore_exc = True, *args, **kwargs):
  209. failed = []
  210. succeeded = []
  211. try:
  212. if client.server in self._failed_clients:
  213. # This server is currently failing, lets check if it is in
  214. # retry or marked as dead
  215. failed_metadata = self._failed_clients[client.server]
  216. # we haven't tried our max amount yet, if it has been enough
  217. # time lets just retry using it
  218. if failed_metadata['attempts'] < self.retry_attempts:
  219. failed_time = failed_metadata['failed_time']
  220. if time.time() - failed_time > self.retry_timeout:
  221. logger.debug(
  222. 'retrying failed server: %s', client.server
  223. )
  224. succeeded, failed, err = self._set_many(
  225. client, values, ignore_exc, *args, **kwargs)
  226. if err is not None:
  227. raise err
  228. # we were successful, lets remove it from the failed
  229. # clients
  230. self._failed_clients.pop(client.server)
  231. return failed
  232. else:
  233. if not ignore_exc:
  234. raise MemcacheError('reach retry timeout.')
  235. return values.keys()
  236. else:
  237. # We've reached our max retry attempts, we need to mark
  238. # the sever as dead
  239. logger.debug('marking server as dead: %s', client.server)
  240. self.remove_server(client.server)
  241. succeeded, failed, err = self._set_many(
  242. client, values, ignore_exc, *args, **kwargs
  243. )
  244. if err is not None:
  245. raise err
  246. return failed
  247. # Connecting to the server fail, we should enter
  248. # retry mode
  249. except socket.error:
  250. self._mark_failed_server(client.server)
  251. # if we haven't enabled ignore_exc, don't move on gracefully, just
  252. # raise the exception
  253. if not ignore_exc:
  254. raise
  255. return list(set(values.keys()) - set(succeeded))
  256. except Exception:
  257. # any exceptions that aren't socket.error we need to handle
  258. # gracefully as well
  259. if not ignore_exc:
  260. raise
  261. return list(set(values.keys()) - set(succeeded))
  262. def _mark_failed_server(self, server):
  263. # This client has never failed, lets mark it for failure
  264. if (
  265. server not in self._failed_clients and
  266. self.retry_attempts > 0
  267. ):
  268. self._failed_clients[server] = {
  269. 'failed_time': time.time(),
  270. 'attempts': 0,
  271. }
  272. # We aren't allowing any retries, we should mark the server as
  273. # dead immediately
  274. elif (
  275. server not in self._failed_clients and
  276. self.retry_attempts <= 0
  277. ):
  278. self._failed_clients[server] = {
  279. 'failed_time': time.time(),
  280. 'attempts': 0,
  281. }
  282. logger.debug("marking server as dead %s", server)
  283. self.remove_server(server)
  284. # This client has failed previously, we need to update the metadata
  285. # to reflect that we have attempted it again
  286. else:
  287. failed_metadata = self._failed_clients[server]
  288. failed_metadata['attempts'] += 1
  289. failed_metadata['failed_time'] = time.time()
  290. self._failed_clients[server] = failed_metadata
  291. def _run_cmd(self, cmd, key, default_val, *args, **kwargs):
  292. ignore_exc = kwargs.pop('ignore_exc', True)
  293. client = self._get_client(key, ignore_exc = ignore_exc)
  294. if client is None:
  295. return default_val
  296. func = getattr(client, cmd)
  297. args = list(args)
  298. args.insert(0, key)
  299. return self._safely_run_func(client, func, default_val, ignore_exc, *args, **kwargs)
  300. def _set_many(self, client, values, ignore_exc = True, *args, **kwargs):
  301. failed = []
  302. succeeded = []
  303. try:
  304. failed = client.set_many(values, *args, **kwargs)
  305. except Exception as e:
  306. if not ignore_exc:
  307. return succeeded, failed, e
  308. succeeded = [key for key in six.iterkeys(values) if key not in failed]
  309. return succeeded, failed, None
  310. def close(self, ignore_exc = True):
  311. for client in self.clients.values():
  312. self._safely_run_func(client, client.close, False, ignore_exc)
  313. disconnect_all = close
  314. def set(self, key, *args, **kwargs):
  315. return self._run_cmd('set', key, 0, *args, **kwargs)
  316. def get(self, key, *args, **kwargs):
  317. return self._run_cmd('get', key, None, *args, **kwargs)
  318. def incr(self, key, *args, **kwargs):
  319. return self._run_cmd('incr', key, None, *args, **kwargs)
  320. def decr(self, key, *args, **kwargs):
  321. return self._run_cmd('decr', key, None, *args, **kwargs)
  322. def set_many(self, values, *args, **kwargs):
  323. ignore_exc = kwargs.pop('ignore_exc', True)
  324. client_batches = collections.defaultdict(dict)
  325. failed = []
  326. for key, value in six.iteritems(values):
  327. client = self._get_client(key, ignore_exc = ignore_exc)
  328. if client is None:
  329. failed.append(key)
  330. continue
  331. client_batches[client.server][key] = value
  332. for server, values in client_batches.items():
  333. client = self.clients[self._make_client_key(server)]
  334. failed += self._safely_run_set_many(
  335. client, values, ignore_exc, *args, **kwargs
  336. )
  337. return failed
  338. set_multi = set_many
  339. def get_many(self, keys, gets=False, *args, **kwargs):
  340. ignore_exc = kwargs.pop('ignore_exc', True)
  341. client_batches = collections.defaultdict(list)
  342. end = {}
  343. for key in keys:
  344. client = self._get_client(key, ignore_exc = ignore_exc)
  345. if client is None:
  346. continue
  347. client_batches[client.server].append(key)
  348. for server, keys in client_batches.items():
  349. client = self.clients[self._make_client_key(server)]
  350. new_args = list(args)
  351. new_args.insert(0, keys)
  352. if gets:
  353. get_func = client.gets_many
  354. else:
  355. get_func = client.get_many
  356. result = self._safely_run_func(
  357. client,
  358. get_func, {}, ignore_exc, *new_args, **kwargs
  359. )
  360. end.update(result)
  361. return end
  362. get_multi = get_many
  363. def gets(self, key, *args, **kwargs):
  364. return self._run_cmd('gets', key, None, *args, **kwargs)
  365. def gets_many(self, keys, ignore_exc = True, *args, **kwargs):
  366. return self.get_many(keys, True, ignore_exc, *args, **kwargs)
  367. gets_multi = gets_many
  368. def add(self, key, *args, **kwargs):
  369. return self._run_cmd('add', key, 0, *args, **kwargs)
  370. def prepend(self, key, *args, **kwargs):
  371. return self._run_cmd('prepend', key, 0, *args, **kwargs)
  372. def append(self, key, *args, **kwargs):
  373. return self._run_cmd('append', key, 0, *args, **kwargs)
  374. def delete(self, key, *args, **kwargs):
  375. return self._run_cmd('delete', key, False, *args, **kwargs)
  376. def delete_many(self, keys, *args, **kwargs):
  377. for key in keys:
  378. self._run_cmd('delete', key, False, *args, **kwargs)
  379. return True
  380. delete_multi = delete_many
  381. def cas(self, key, *args, **kwargs):
  382. return self._run_cmd('cas', key, 0, *args, **kwargs)
  383. def replace(self, key, *args, **kwargs):
  384. return self._run_cmd('replace', key, 0, *args, **kwargs)
  385. def touch(self, key, *args, **kwargs):
  386. return self._run_cmd('touch', key, False, *args, **kwargs)
  387. def flush_all(self, ignore_exc = True):
  388. for client in self.clients.values():
  389. self._safely_run_func(client, client.flush_all, False, ignore_exc)
  390. def quit(self, ignore_exc = True):
  391. for client in self.clients.values():
  392. self._safely_run_func(client, client.quit, False, ignore_exc)