123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480 |
- import collections
- import socket
- import time
- import logging
- import six
- from library.pymemcache.client.base import (
- Client,
- PooledClient,
- check_key_helper,
- normalize_server_spec,
- )
- from library.pymemcache.client.rendezvous import RendezvousHash
- from library.pymemcache.exceptions import MemcacheError
- logger = logging.getLogger(__name__)
- class HashClient(object):
- """
- A client for communicating with a cluster of memcached servers
- """
- #: :class:`Client` class used to create new clients
- client_class = Client
- def __init__(
- self,
- servers,
- hasher=RendezvousHash,
- serde=None,
- serializer=None,
- deserializer=None,
- connect_timeout=None,
- timeout=None,
- no_delay=False,
- socket_module=socket,
- socket_keepalive=None,
- key_prefix=b'',
- max_pool_size=None,
- pool_idle_timeout=0,
- lock_generator=None,
- retry_attempts=2,
- retry_timeout=1,
- dead_timeout=60,
- use_pooling=False,
- # ignore_exc=False,
- allow_unicode_keys=False,
- default_noreply=True,
- encoding='ascii',
- tls_context=None
- ):
- """
- Constructor.
- Args:
- servers: list() of tuple(hostname, port) or string containing a UNIX
- socket path.
- hasher: optional class three functions ``get_node``, ``add_node``,
- and ``remove_node``
- defaults to Rendezvous (HRW) hash.
- use_pooling: use py:class:`.PooledClient` as the default underlying
- class. ``max_pool_size`` and ``lock_generator`` can
- be used with this. default: False
- retry_attempts: Amount of times a client should be tried before it
- is marked dead and removed from the pool.
- retry_timeout (float): Time in seconds that should pass between retry
- attempts.
- dead_timeout (float): Time in seconds before attempting to add a node
- back in the pool.
- encoding: optional str, controls data encoding (defaults to 'ascii').
- Further arguments are interpreted as for :py:class:`.Client`
- constructor.
- """
- self.clients = {}
- self.retry_attempts = retry_attempts
- self.retry_timeout = retry_timeout
- self.dead_timeout = dead_timeout
- self.use_pooling = use_pooling
- self.key_prefix = key_prefix
- # self.ignore_exc = ignore_exc
- self.allow_unicode_keys = allow_unicode_keys
- self._failed_clients = {}
- self._dead_clients = {}
- self._last_dead_check_time = time.time()
- self.hasher = hasher()
- self.default_kwargs = {
- 'connect_timeout': connect_timeout,
- 'timeout': timeout,
- 'no_delay': no_delay,
- 'socket_module': socket_module,
- 'socket_keepalive': socket_keepalive,
- 'key_prefix': key_prefix,
- 'serde': serde,
- 'serializer': serializer,
- 'deserializer': deserializer,
- 'allow_unicode_keys': allow_unicode_keys,
- 'default_noreply': default_noreply,
- 'encoding': encoding,
- 'tls_context': tls_context,
- }
- if use_pooling is True:
- self.default_kwargs.update({
- 'max_pool_size': max_pool_size,
- 'pool_idle_timeout': pool_idle_timeout,
- 'lock_generator': lock_generator
- })
- for server in servers:
- self.add_server(normalize_server_spec(server))
- self.encoding = encoding
- self.tls_context = tls_context
- def _make_client_key(self, server):
- if isinstance(server, (list, tuple)) and len(server) == 2:
- return '%s:%s' % server
- return server
- def add_server(self, server, port=None):
- # To maintain backward compatibility, if a port is provided, assume
- # that server wasn't provided as a (host, port) tuple.
- if port is not None:
- if not isinstance(server, six.string_types):
- raise TypeError('Server must be a string when passing port.')
- server = (server, port)
- _class = PooledClient if self.use_pooling else self.client_class
- client = _class(server, **self.default_kwargs)
- if self.use_pooling:
- client.client_class = self.client_class
- key = self._make_client_key(server)
- self.clients[key] = client
- self.hasher.add_node(key)
- def remove_server(self, server, port=None):
- # To maintain backward compatibility, if a port is provided, assume
- # that server wasn't provided as a (host, port) tuple.
- if port is not None:
- if not isinstance(server, six.string_types):
- raise TypeError('Server must be a string when passing port.')
- server = (server, port)
- key = self._make_client_key(server)
- dead_time = time.time()
- self._failed_clients.pop(server)
- self._dead_clients[server] = dead_time
- self.hasher.remove_node(key)
- def _retry_dead(self):
- current_time = time.time()
- ldc = self._last_dead_check_time
- # We have reached the retry timeout
- if current_time - ldc > self.dead_timeout:
- candidates = []
- for server, dead_time in self._dead_clients.items():
- if current_time - dead_time > self.dead_timeout:
- candidates.append(server)
- for server in candidates:
- logger.debug(
- 'bringing server back into rotation %s',
- server
- )
- self.add_server(server)
- del self._dead_clients[server]
- self._last_dead_check_time = current_time
- def _get_client(self, key, ignore_exc = True):
- check_key_helper(key, self.allow_unicode_keys, self.key_prefix)
- if self._dead_clients:
- self._retry_dead()
- server = self.hasher.get_node(key)
- # We've ran out of servers to try
- if server is None:
- if ignore_exc is True:
- return
- raise MemcacheError('All servers seem to be down right now')
- return self.clients[server]
- def _safely_run_func(self, client, func, default_val, ignore_exc = True, *args, **kwargs):
- try:
- if client.server in self._failed_clients:
- # This server is currently failing, lets check if it is in
- # retry or marked as dead
- failed_metadata = self._failed_clients[client.server]
- # we haven't tried our max amount yet, if it has been enough
- # time lets just retry using it
- if failed_metadata['attempts'] < self.retry_attempts:
- failed_time = failed_metadata['failed_time']
- if time.time() - failed_time > self.retry_timeout:
- logger.debug(
- 'retrying failed server: %s', client.server
- )
- result = func(*args, **kwargs)
- # we were successful, lets remove it from the failed
- # clients
- self._failed_clients.pop(client.server)
- return result
- else:
- if not ignore_exc:
- raise MemcacheError('reach retry timeout.')
- return default_val
- else:
- # We've reached our max retry attempts, we need to mark
- # the sever as dead
- logger.debug('marking server as dead: %s', client.server)
- self.remove_server(client.server)
- result = func(*args, **kwargs)
- return result
- # Connecting to the server fail, we should enter
- # retry mode
- except socket.error:
- self._mark_failed_server(client.server)
- # if we haven't enabled ignore_exc, don't move on gracefully, just
- # raise the exception
- if not ignore_exc:
- raise
- return default_val
- except Exception:
- # any exceptions that aren't socket.error we need to handle
- # gracefully as well
- if not ignore_exc:
- raise
- return default_val
- def _safely_run_set_many(self, client, values, ignore_exc = True, *args, **kwargs):
- failed = []
- succeeded = []
- try:
- if client.server in self._failed_clients:
- # This server is currently failing, lets check if it is in
- # retry or marked as dead
- failed_metadata = self._failed_clients[client.server]
- # we haven't tried our max amount yet, if it has been enough
- # time lets just retry using it
- if failed_metadata['attempts'] < self.retry_attempts:
- failed_time = failed_metadata['failed_time']
- if time.time() - failed_time > self.retry_timeout:
- logger.debug(
- 'retrying failed server: %s', client.server
- )
- succeeded, failed, err = self._set_many(
- client, values, ignore_exc, *args, **kwargs)
- if err is not None:
- raise err
- # we were successful, lets remove it from the failed
- # clients
- self._failed_clients.pop(client.server)
- return failed
- else:
- if not ignore_exc:
- raise MemcacheError('reach retry timeout.')
- return values.keys()
- else:
- # We've reached our max retry attempts, we need to mark
- # the sever as dead
- logger.debug('marking server as dead: %s', client.server)
- self.remove_server(client.server)
- succeeded, failed, err = self._set_many(
- client, values, ignore_exc, *args, **kwargs
- )
- if err is not None:
- raise err
- return failed
- # Connecting to the server fail, we should enter
- # retry mode
- except socket.error:
- self._mark_failed_server(client.server)
- # if we haven't enabled ignore_exc, don't move on gracefully, just
- # raise the exception
- if not ignore_exc:
- raise
- return list(set(values.keys()) - set(succeeded))
- except Exception:
- # any exceptions that aren't socket.error we need to handle
- # gracefully as well
- if not ignore_exc:
- raise
- return list(set(values.keys()) - set(succeeded))
- def _mark_failed_server(self, server):
- # This client has never failed, lets mark it for failure
- if (
- server not in self._failed_clients and
- self.retry_attempts > 0
- ):
- self._failed_clients[server] = {
- 'failed_time': time.time(),
- 'attempts': 0,
- }
- # We aren't allowing any retries, we should mark the server as
- # dead immediately
- elif (
- server not in self._failed_clients and
- self.retry_attempts <= 0
- ):
- self._failed_clients[server] = {
- 'failed_time': time.time(),
- 'attempts': 0,
- }
- logger.debug("marking server as dead %s", server)
- self.remove_server(server)
- # This client has failed previously, we need to update the metadata
- # to reflect that we have attempted it again
- else:
- failed_metadata = self._failed_clients[server]
- failed_metadata['attempts'] += 1
- failed_metadata['failed_time'] = time.time()
- self._failed_clients[server] = failed_metadata
- def _run_cmd(self, cmd, key, default_val, *args, **kwargs):
- ignore_exc = kwargs.pop('ignore_exc', True)
- client = self._get_client(key, ignore_exc = ignore_exc)
- if client is None:
- return default_val
- func = getattr(client, cmd)
- args = list(args)
- args.insert(0, key)
- return self._safely_run_func(client, func, default_val, ignore_exc, *args, **kwargs)
- def _set_many(self, client, values, ignore_exc = True, *args, **kwargs):
- failed = []
- succeeded = []
- try:
- failed = client.set_many(values, *args, **kwargs)
- except Exception as e:
- if not ignore_exc:
- return succeeded, failed, e
- succeeded = [key for key in six.iterkeys(values) if key not in failed]
- return succeeded, failed, None
- def close(self, ignore_exc = True):
- for client in self.clients.values():
- self._safely_run_func(client, client.close, False, ignore_exc)
- disconnect_all = close
- def set(self, key, *args, **kwargs):
- return self._run_cmd('set', key, 0, *args, **kwargs)
- def get(self, key, *args, **kwargs):
- return self._run_cmd('get', key, None, *args, **kwargs)
- def incr(self, key, *args, **kwargs):
- return self._run_cmd('incr', key, None, *args, **kwargs)
- def decr(self, key, *args, **kwargs):
- return self._run_cmd('decr', key, None, *args, **kwargs)
- def set_many(self, values, *args, **kwargs):
- ignore_exc = kwargs.pop('ignore_exc', True)
- client_batches = collections.defaultdict(dict)
- failed = []
- for key, value in six.iteritems(values):
- client = self._get_client(key, ignore_exc = ignore_exc)
- if client is None:
- failed.append(key)
- continue
- client_batches[client.server][key] = value
- for server, values in client_batches.items():
- client = self.clients[self._make_client_key(server)]
- failed += self._safely_run_set_many(
- client, values, ignore_exc, *args, **kwargs
- )
- return failed
- set_multi = set_many
- def get_many(self, keys, gets=False, *args, **kwargs):
- ignore_exc = kwargs.pop('ignore_exc', True)
- client_batches = collections.defaultdict(list)
- end = {}
- for key in keys:
- client = self._get_client(key, ignore_exc = ignore_exc)
- if client is None:
- continue
- client_batches[client.server].append(key)
- for server, keys in client_batches.items():
- client = self.clients[self._make_client_key(server)]
- new_args = list(args)
- new_args.insert(0, keys)
- if gets:
- get_func = client.gets_many
- else:
- get_func = client.get_many
- result = self._safely_run_func(
- client,
- get_func, {}, ignore_exc, *new_args, **kwargs
- )
- end.update(result)
- return end
- get_multi = get_many
- def gets(self, key, *args, **kwargs):
- return self._run_cmd('gets', key, None, *args, **kwargs)
- def gets_many(self, keys, ignore_exc = True, *args, **kwargs):
- return self.get_many(keys, True, ignore_exc, *args, **kwargs)
- gets_multi = gets_many
- def add(self, key, *args, **kwargs):
- return self._run_cmd('add', key, 0, *args, **kwargs)
- def prepend(self, key, *args, **kwargs):
- return self._run_cmd('prepend', key, 0, *args, **kwargs)
- def append(self, key, *args, **kwargs):
- return self._run_cmd('append', key, 0, *args, **kwargs)
- def delete(self, key, *args, **kwargs):
- return self._run_cmd('delete', key, False, *args, **kwargs)
- def delete_many(self, keys, *args, **kwargs):
- for key in keys:
- self._run_cmd('delete', key, False, *args, **kwargs)
- return True
- delete_multi = delete_many
- def cas(self, key, *args, **kwargs):
- return self._run_cmd('cas', key, 0, *args, **kwargs)
- def replace(self, key, *args, **kwargs):
- return self._run_cmd('replace', key, 0, *args, **kwargs)
- def touch(self, key, *args, **kwargs):
- return self._run_cmd('touch', key, False, *args, **kwargs)
- def flush_all(self, ignore_exc = True):
- for client in self.clients.values():
- self._safely_run_func(client, client.flush_all, False, ignore_exc)
- def quit(self, ignore_exc = True):
- for client in self.clients.values():
- self._safely_run_func(client, client.quit, False, ignore_exc)
|