123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539 |
- # Copyright 2012 Pinterest.com
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import errno
- import platform
- import socket
- import sys
- import six
- from library.pymemcache import pool
- from library.pymemcache.serde import LegacyWrappingSerde
- from library.pymemcache.exceptions import (
- MemcacheClientError,
- MemcacheUnknownCommandError,
- MemcacheIllegalInputError,
- MemcacheServerError,
- MemcacheUnknownError,
- MemcacheUnexpectedCloseError
- )
- RECV_SIZE = 4096
- VALID_STORE_RESULTS = {
- b'set': (b'STORED', b'NOT_STORED'),
- b'add': (b'STORED', b'NOT_STORED'),
- b'replace': (b'STORED', b'NOT_STORED'),
- b'append': (b'STORED', b'NOT_STORED'),
- b'prepend': (b'STORED', b'NOT_STORED'),
- b'cas': (b'STORED', b'EXISTS', b'NOT_FOUND'),
- }
- SOCKET_KEEPALIVE_SUPPORTED_SYSTEM = {
- 'Linux',
- }
- STORE_RESULTS_VALUE = {
- b'STORED': True,
- b'NOT_STORED': False,
- b'NOT_FOUND': None,
- b'EXISTS': False
- }
- # Some of the values returned by the "stats" command
- # need mapping into native Python types
- def _parse_bool_int(value):
- return int(value) != 0
- def _parse_bool_string_is_yes(value):
- return value == b'yes'
- def _parse_float(value):
- return float(value.replace(b':', b'.'))
- def _parse_hex(value):
- return int(value, 8)
- STAT_TYPES = {
- # General stats
- b'version': six.binary_type,
- b'rusage_user': _parse_float,
- b'rusage_system': _parse_float,
- b'hash_is_expanding': _parse_bool_int,
- b'slab_reassign_running': _parse_bool_int,
- # Settings stats
- b'inter': six.binary_type,
- b'growth_factor': float,
- b'stat_key_prefix': six.binary_type,
- b'umask': _parse_hex,
- b'detail_enabled': _parse_bool_int,
- b'cas_enabled': _parse_bool_int,
- b'auth_enabled_sasl': _parse_bool_string_is_yes,
- b'maxconns_fast': _parse_bool_int,
- b'slab_reassign': _parse_bool_int,
- b'slab_automove': _parse_bool_int,
- }
- # Common helper functions.
- def check_key_helper(key, allow_unicode_keys, key_prefix=b''):
- """Checks key and add key_prefix."""
- if allow_unicode_keys:
- if isinstance(key, six.text_type):
- key = key.encode('utf8')
- elif isinstance(key, six.string_types):
- try:
- if isinstance(key, six.binary_type):
- key = key.decode().encode('ascii')
- else:
- key = key.encode('ascii')
- except (UnicodeEncodeError, UnicodeDecodeError):
- raise MemcacheIllegalInputError("Non-ASCII key: %r" % key)
- key = key_prefix + key
- parts = key.split()
- if len(key) > 250:
- raise MemcacheIllegalInputError("Key is too long: %r" % key)
- # second statement catches leading or trailing whitespace
- elif len(parts) > 1 or (parts and parts[0] != key):
- raise MemcacheIllegalInputError("Key contains whitespace: %r" % key)
- elif b'\00' in key:
- raise MemcacheIllegalInputError("Key contains null: %r" % key)
- return key
- def normalize_server_spec(server):
- if isinstance(server, tuple) or server is None:
- return server
- if isinstance(server, list):
- return tuple(server) # Assume [host, port] provided.
- if not isinstance(server, six.string_types):
- raise ValueError('Unknown server provided: %r' % server)
- if server.startswith('unix:'):
- return server[5:]
- if server.startswith('/'):
- return server
- if ':' not in server or server.endswith(']'):
- host, port = server, 11211
- else:
- host, port = server.rsplit(':', 1)
- port = int(port)
- if host.startswith('['):
- host = host.strip('[]')
- return (host, port)
- class KeepaliveOpts(object):
- """
- A configuration structure to define the socket keepalive.
- This structure must be passed to a client. The client will configure
- its socket keepalive by using the elements of the structure.
- Args:
- idle: The time (in seconds) the connection needs to remain idle
- before TCP starts sending keepalive probes. Should be a positive
- integer most greater than zero.
- intvl: The time (in seconds) between individual keepalive probes.
- Should be a positive integer most greater than zero.
- cnt: The maximum number of keepalive probes TCP should send before
- dropping the connection. Should be a positive integer most greater
- than zero.
- """
- __slots__ = ('idle', 'intvl', 'cnt')
- def __init__(self, idle=1, intvl=1, cnt=5):
- if idle < 1:
- raise ValueError(
- "The idle parameter must be greater or equal to 1.")
- self.idle = idle
- if intvl < 1:
- raise ValueError(
- "The intvl parameter must be greater or equal to 1.")
- self.intvl = intvl
- if cnt < 1:
- raise ValueError(
- "The cnt parameter must be greater or equal to 1.")
- self.cnt = cnt
- class Client(object):
- """
- A client for a single memcached server.
- *Server Connection*
- The ``server`` parameter controls how the client connects to the memcached
- server. You can either use a (host, port) tuple for a TCP connection or a
- string containing the path to a UNIX domain socket.
- The ``connect_timeout`` and ``timeout`` parameters can be used to set
- socket timeout values. By default, timeouts are disabled.
- When the ``no_delay`` flag is set, the ``TCP_NODELAY`` socket option will
- also be set. This only applies to TCP-based connections.
- Lastly, the ``socket_module`` allows you to specify an alternate socket
- implementation (such as `gevent.socket`_).
- .. _gevent.socket: http://www.gevent.org/api/gevent.socket.html
- *Keys and Values*
- Keys must have a __str__() method which should return a str with no more
- than 250 ASCII characters and no whitespace or control characters. Unicode
- strings must be encoded (as UTF-8, for example) unless they consist only
- of ASCII characters that are neither whitespace nor control characters.
- Values must have a __str__() method to convert themselves to a byte
- string. Unicode objects can be a problem since str() on a Unicode object
- will attempt to encode it as ASCII (which will fail if the value contains
- code points larger than U+127). You can fix this with a serializer or by
- just calling encode on the string (using UTF-8, for instance).
- If you intend to use anything but str as a value, it is a good idea to use
- a serializer. The pymemcache.serde library has an already implemented
- serializer which pickles and unpickles data.
- *Serialization and Deserialization*
- The constructor takes an optional object, the "serializer/deserializer"
- ("serde"), which is responsible for both serialization and deserialization
- of objects. That object must satisfy the serializer interface by providing
- two methods: `serialize` and `deserialize`. `serialize` takes two
- arguments, a key and a value, and returns a tuple of two elements, the
- serialized value, and an integer in the range 0-65535 (the "flags").
- `deserialize` takes three parameters, a key, value, and flags, and returns
- the deserialized value.
- Here is an example using JSON for non-str values:
- .. code-block:: python
- class JSONSerde(object):
- def serialize(self, key, value):
- if isinstance(value, str):
- return value, 1
- return json.dumps(value), 2
- def deserialize(self, key, value, flags):
- if flags == 1:
- return value
- if flags == 2:
- return json.loads(value)
- raise Exception("Unknown flags for value: {1}".format(flags))
- .. note::
- Most write operations allow the caller to provide a ``flags`` value to
- support advanced interaction with the server. This will **override** the
- "flags" value returned by the serializer and should therefore only be
- used when you have a complete understanding of how the value should be
- serialized, stored, and deserialized.
- *Error Handling*
- All of the methods in this class that talk to memcached can throw one of
- the following exceptions:
- * :class:`pymemcache.exceptions.MemcacheUnknownCommandError`
- * :class:`pymemcache.exceptions.MemcacheClientError`
- * :class:`pymemcache.exceptions.MemcacheServerError`
- * :class:`pymemcache.exceptions.MemcacheUnknownError`
- * :class:`pymemcache.exceptions.MemcacheUnexpectedCloseError`
- * :class:`pymemcache.exceptions.MemcacheIllegalInputError`
- * :class:`socket.timeout`
- * :class:`socket.error`
- Instances of this class maintain a persistent connection to memcached
- which is terminated when any of these exceptions are raised. The next
- call to a method on the object will result in a new connection being made
- to memcached.
- """
- def __init__(self,
- server,
- serde=None,
- serializer=None,
- deserializer=None,
- connect_timeout=None,
- timeout=None,
- no_delay=False,
- ignore_exc=False,
- socket_module=socket,
- socket_keepalive=None,
- key_prefix=b'',
- default_noreply=True,
- allow_unicode_keys=False,
- encoding='ascii',
- tls_context=None):
- """
- Constructor.
- Args:
- server: tuple(hostname, port) or string containing a UNIX socket path.
- serde: optional seralizer object, see notes in the class docs.
- serializer: deprecated serialization function
- deserializer: deprecated deserialization function
- connect_timeout: optional float, seconds to wait for a connection to
- the memcached server. Defaults to "forever" (uses the underlying
- default socket timeout, which can be very long).
- timeout: optional float, seconds to wait for send or recv calls on
- the socket connected to memcached. Defaults to "forever" (uses the
- underlying default socket timeout, which can be very long).
- no_delay: optional bool, set the TCP_NODELAY flag, which may help
- with performance in some cases. Defaults to False.
- ignore_exc: optional bool, True to cause the "get", "gets",
- "get_many" and "gets_many" calls to treat any errors as cache
- misses. Defaults to False.
- socket_module: socket module to use, e.g. gevent.socket. Defaults to
- the standard library's socket module.
- socket_keepalive: Activate the socket keepalive feature by passing
- a KeepaliveOpts structure in this parameter. Disabled by default
- (None). This feature is only supported on Linux platforms.
- key_prefix: Prefix of key. You can use this as namespace. Defaults
- to b''.
- default_noreply: bool, the default value for 'noreply' as passed to
- store commands (except from cas, incr, and decr, which default to
- False).
- allow_unicode_keys: bool, support unicode (utf8) keys
- encoding: optional str, controls data encoding (defaults to 'ascii').
- Notes:
- The constructor does not make a connection to memcached. The first
- call to a method on the object will do that.
- """
- self.server = normalize_server_spec(server)
- self.serde = serde or LegacyWrappingSerde(serializer, deserializer)
- self.connect_timeout = connect_timeout
- self.timeout = timeout
- self.no_delay = no_delay
- self.ignore_exc = ignore_exc
- self.socket_module = socket_module
- self.socket_keepalive = socket_keepalive
- user_system = platform.system()
- if self.socket_keepalive is not None:
- if user_system not in SOCKET_KEEPALIVE_SUPPORTED_SYSTEM:
- raise SystemError(
- "Pymemcache's socket keepalive mechaniss doesn't "
- "support your system ({user_system}). If "
- "you see this message it mean that you tried to "
- "configure your socket keepalive on an unsupported "
- "system. To fix the problem pass `socket_"
- "keepalive=False` or use a supported system. "
- "Supported systems are: {systems}".format(
- user_system=user_system,
- systems=", ".join(sorted(
- SOCKET_KEEPALIVE_SUPPORTED_SYSTEM))
- )
- )
- if not isinstance(self.socket_keepalive, KeepaliveOpts):
- raise ValueError(
- "Unsupported keepalive options. If you see this message "
- "it means that you passed an unsupported object within "
- "the param `socket_keepalive`. To fix it "
- "please instantiate and pass to socket_keepalive a "
- "KeepaliveOpts object. That's the only supported type "
- "of structure."
- )
- self.sock = None
- if isinstance(key_prefix, six.text_type):
- key_prefix = key_prefix.encode('ascii')
- if not isinstance(key_prefix, six.binary_type):
- raise TypeError("key_prefix should be bytes.")
- self.key_prefix = key_prefix
- self.default_noreply = default_noreply
- self.allow_unicode_keys = allow_unicode_keys
- self.encoding = encoding
- self.tls_context = tls_context
- self.debug = False
- def debuglog(self, str):
- if self.debug:
- sys.stderr.write("PyMemCached: %s\n" % str)
- def check_key(self, key):
- """Checks key and add key_prefix."""
- return check_key_helper(key, allow_unicode_keys=self.allow_unicode_keys,
- key_prefix=self.key_prefix)
- def _connect(self):
- self.close()
- s = self.socket_module
- if not isinstance(self.server, tuple):
- sockaddr = self.server
- sock = s.socket(s.AF_UNIX, s.SOCK_STREAM)
- else:
- sock = None
- error = None
- host, port = self.server
- info = s.getaddrinfo(host, port, s.AF_UNSPEC, s.SOCK_STREAM,
- s.IPPROTO_TCP)
- for family, socktype, proto, _, sockaddr in info:
- try:
- sock = s.socket(family, socktype, proto)
- if self.no_delay:
- sock.setsockopt(s.IPPROTO_TCP, s.TCP_NODELAY, 1)
- if self.tls_context:
- context = self.tls_context
- sock = context.wrap_socket(sock, server_hostname=host)
- except Exception as e:
- error = e
- if sock is not None:
- sock.close()
- sock = None
- else:
- break
- if error is not None:
- raise error
- try:
- sock.settimeout(self.connect_timeout)
- # sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- except:
- pass
- if self.socket_keepalive is not None:
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
- self.socket_keepalive.idle)
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL,
- self.socket_keepalive.intvl)
- sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT,
- self.socket_keepalive.cnt)
- sock.connect(sockaddr)
- sock.settimeout(self.timeout)
- except Exception:
- sock.close()
- raise
- self.sock = sock
- def close(self):
- """Close the connection to memcached, if it is open. The next call to a
- method that requires a connection will re-open it."""
- if self.sock is not None:
- try:
- self.sock.close()
- except Exception:
- pass
- finally:
- self.sock = None
- disconnect_all = close
- def set(self, key, value, expire=0, noreply=None, flags=None):
- """
- The memcached "set" command.
- Args:
- key: str, see class docs for details.
- value: str, see class docs for details.
- expire: optional int, number of seconds until the item is expired
- from the cache, or zero for no expiry (the default).
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- flags: optional int, arbitrary bit field used for server-specific
- flags
- Returns:
- If no exception is raised, always returns True. If an exception is
- raised, the set may or may not have occurred. If noreply is True,
- then a successful return does not guarantee a successful set.
- """
- if noreply is None:
- noreply = self.default_noreply
- return self._store_cmd(b'set', {key: value}, expire, noreply,
- flags=flags)[key]
- def set_many(self, values, expire=0, noreply=None, flags=None):
- """
- A convenience function for setting multiple values.
- Args:
- values: dict(str, str), a dict of keys and values, see class docs
- for details.
- expire: optional int, number of seconds until the item is expired
- from the cache, or zero for no expiry (the default).
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- flags: optional int, arbitrary bit field used for server-specific
- flags
- Returns:
- Returns a list of keys that failed to be inserted.
- If noreply is True, always returns empty list.
- """
- if noreply is None:
- noreply = self.default_noreply
- result = self._store_cmd(b'set', values, expire, noreply, flags=flags)
- return [k for k, v in six.iteritems(result) if not v]
- set_multi = set_many
- def add(self, key, value, expire=0, noreply=None, flags=None):
- """
- The memcached "add" command.
- Args:
- key: str, see class docs for details.
- value: str, see class docs for details.
- expire: optional int, number of seconds until the item is expired
- from the cache, or zero for no expiry (the default).
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- flags: optional int, arbitrary bit field used for server-specific
- flags
- Returns:
- If ``noreply`` is True (or if it is unset and ``self.default_noreply``
- is True), the return value is always True. Otherwise the return value
- is True if the value was stored, and False if it was not (because
- the key already existed).
- """
- if noreply is None:
- noreply = self.default_noreply
- return self._store_cmd(b'add', {key: value}, expire, noreply,
- flags=flags)[key]
- def replace(self, key, value, expire=0, noreply=None, flags=None):
- """
- The memcached "replace" command.
- Args:
- key: str, see class docs for details.
- value: str, see class docs for details.
- expire: optional int, number of seconds until the item is expired
- from the cache, or zero for no expiry (the default).
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- flags: optional int, arbitrary bit field used for server-specific
- flags
- Returns:
- If ``noreply`` is True (or if it is unset and ``self.default_noreply``
- is True), the return value is always True. Otherwise returns True if
- the value was stored and False if it wasn't (because the key didn't
- already exist).
- """
- if noreply is None:
- noreply = self.default_noreply
- return self._store_cmd(b'replace', {key: value}, expire, noreply,
- flags=flags)[key]
- def append(self, key, value, expire=0, noreply=None, flags=None):
- """
- The memcached "append" command.
- Args:
- key: str, see class docs for details.
- value: str, see class docs for details.
- expire: optional int, number of seconds until the item is expired
- from the cache, or zero for no expiry (the default).
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- flags: optional int, arbitrary bit field used for server-specific
- flags
- Returns:
- True.
- """
- if noreply is None:
- noreply = self.default_noreply
- return self._store_cmd(b'append', {key: value}, expire, noreply,
- flags=flags)[key]
- def prepend(self, key, value, expire=0, noreply=None, flags=None):
- """
- The memcached "prepend" command.
- Args:
- key: str, see class docs for details.
- value: str, see class docs for details.
- expire: optional int, number of seconds until the item is expired
- from the cache, or zero for no expiry (the default).
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- flags: optional int, arbitrary bit field used for server-specific
- flags
- Returns:
- True.
- """
- if noreply is None:
- noreply = self.default_noreply
- return self._store_cmd(b'prepend', {key: value}, expire, noreply,
- flags=flags)[key]
- def cas(self, key, value, cas, expire=0, noreply=False, flags=None):
- """
- The memcached "cas" command.
- Args:
- key: str, see class docs for details.
- value: str, see class docs for details.
- cas: int or str that only contains the characters '0'-'9'.
- expire: optional int, number of seconds until the item is expired
- from the cache, or zero for no expiry (the default).
- noreply: optional bool, False to wait for the reply (the default).
- flags: optional int, arbitrary bit field used for server-specific
- flags
- Returns:
- If ``noreply`` is True (or if it is unset and ``self.default_noreply``
- is True), the return value is always True. Otherwise returns None if
- the key didn't exist, False if it existed but had a different cas
- value and True if it existed and was changed.
- """
- cas = self._check_cas(cas)
- return self._store_cmd(b'cas', {key: value}, expire, noreply,
- flags=flags, cas=cas)[key]
- def get(self, key, default=None):
- """
- The memcached "get" command, but only for one key, as a convenience.
- Args:
- key: str, see class docs for details.
- default: value that will be returned if the key was not found.
- Returns:
- The value for the key, or default if the key wasn't found.
- """
- return self._fetch_cmd(b'get', [key], False).get(key, default)
- def get_many(self, keys):
- """
- The memcached "get" command.
- Args:
- keys: list(str), see class docs for details.
- Returns:
- A dict in which the keys are elements of the "keys" argument list
- and the values are values from the cache. The dict may contain all,
- some or none of the given keys.
- """
- if not keys:
- return {}
- return self._fetch_cmd(b'get', keys, False)
- get_multi = get_many
- def gets(self, key, default=None, cas_default=None):
- """
- The memcached "gets" command for one key, as a convenience.
- Args:
- key: str, see class docs for details.
- default: value that will be returned if the key was not found.
- cas_default: same behaviour as default argument.
- Returns:
- A tuple of (value, cas)
- or (default, cas_defaults) if the key was not found.
- """
- defaults = (default, cas_default)
- return self._fetch_cmd(b'gets', [key], True).get(key, defaults)
- def gets_many(self, keys):
- """
- The memcached "gets" command.
- Args:
- keys: list(str), see class docs for details.
- Returns:
- A dict in which the keys are elements of the "keys" argument list and
- the values are tuples of (value, cas) from the cache. The dict may
- contain all, some or none of the given keys.
- """
- if not keys:
- return {}
- return self._fetch_cmd(b'gets', keys, True)
- def delete(self, key, noreply=None):
- """
- The memcached "delete" command.
- Args:
- key: str, see class docs for details.
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- Returns:
- If ``noreply`` is True (or if it is unset and ``self.default_noreply``
- is True), the return value is always True. Otherwise returns True if
- the key was deleted, and False if it wasn't found.
- """
- if noreply is None:
- noreply = self.default_noreply
- cmd = b'delete ' + self.check_key(key)
- if noreply:
- cmd += b' noreply'
- cmd += b'\r\n'
- results = self._misc_cmd([cmd], b'delete', noreply)
- if noreply:
- return True
- return results[0] == b'DELETED'
- def delete_many(self, keys, noreply=None):
- """
- A convenience function to delete multiple keys.
- Args:
- keys: list(str), the list of keys to delete.
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- Returns:
- True. If an exception is raised then all, some or none of the keys
- may have been deleted. Otherwise all the keys have been sent to
- memcache for deletion and if noreply is False, they have been
- acknowledged by memcache.
- """
- if not keys:
- return True
- if noreply is None:
- noreply = self.default_noreply
- cmds = []
- for key in keys:
- cmds.append(
- b'delete ' + self.check_key(key) +
- (b' noreply' if noreply else b'') +
- b'\r\n')
- self._misc_cmd(cmds, b'delete', noreply)
- return True
- delete_multi = delete_many
- def incr(self, key, value, noreply=False):
- """
- The memcached "incr" command.
- Args:
- key: str, see class docs for details.
- value: int, the amount by which to increment the value.
- noreply: optional bool, False to wait for the reply (the default).
- Returns:
- If noreply is True, always returns None. Otherwise returns the new
- value of the key, or None if the key wasn't found.
- """
- key = self.check_key(key)
- value = self._check_integer(value, "value")
- cmd = b'incr ' + key + b' ' + value
- if noreply:
- cmd += b' noreply'
- cmd += b'\r\n'
- results = self._misc_cmd([cmd], b'incr', noreply)
- if noreply:
- return None
- if results[0] == b'NOT_FOUND':
- return None
- return int(results[0])
- def decr(self, key, value, noreply=False):
- """
- The memcached "decr" command.
- Args:
- key: str, see class docs for details.
- value: int, the amount by which to decrement the value.
- noreply: optional bool, False to wait for the reply (the default).
- Returns:
- If noreply is True, always returns None. Otherwise returns the new
- value of the key, or None if the key wasn't found.
- """
- key = self.check_key(key)
- value = self._check_integer(value, "value")
- cmd = b'decr ' + key + b' ' + value
- if noreply:
- cmd += b' noreply'
- cmd += b'\r\n'
- results = self._misc_cmd([cmd], b'decr', noreply)
- if noreply:
- return None
- if results[0] == b'NOT_FOUND':
- return None
- return int(results[0])
- def touch(self, key, expire=0, noreply=None):
- """
- The memcached "touch" command.
- Args:
- key: str, see class docs for details.
- expire: optional int, number of seconds until the item is expired
- from the cache, or zero for no expiry (the default).
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- Returns:
- True if the expiration time was updated, False if the key wasn't
- found.
- """
- if noreply is None:
- noreply = self.default_noreply
- key = self.check_key(key)
- expire = self._check_integer(expire, "expire")
- cmd = b'touch ' + key + b' ' + expire
- if noreply:
- cmd += b' noreply'
- cmd += b'\r\n'
- results = self._misc_cmd([cmd], b'touch', noreply)
- if noreply:
- return True
- return results[0] == b'TOUCHED'
- def stats(self, *args):
- """
- The memcached "stats" command.
- The returned keys depend on what the "stats" command returns.
- A best effort is made to convert values to appropriate Python
- types, defaulting to strings when a conversion cannot be made.
- Args:
- *arg: extra string arguments to the "stats" command. See the
- memcached protocol documentation for more information.
- Returns:
- A dict of the returned stats.
- """
- result = self._fetch_cmd(b'stats', args, False)
- for key, value in six.iteritems(result):
- converter = STAT_TYPES.get(key, int)
- try:
- result[key] = converter(value)
- except Exception:
- pass
- return result
- def cache_memlimit(self, memlimit):
- """
- The memcached "cache_memlimit" command.
- Args:
- memlimit: int, the number of megabytes to set as the new cache memory
- limit.
- Returns:
- If no exception is raised, always returns True.
- """
- memlimit = self._check_integer(memlimit, "memlimit")
- self._fetch_cmd(b'cache_memlimit', [memlimit], False)
- return True
- def version(self):
- """
- The memcached "version" command.
- Returns:
- A string of the memcached version.
- """
- cmd = b"version\r\n"
- results = self._misc_cmd([cmd], b'version', False)
- before, _, after = results[0].partition(b' ')
- if before != b'VERSION':
- raise MemcacheUnknownError(
- "Received unexpected response: %s" % results[0])
- return after
- def flush_all(self, delay=0, noreply=None):
- """
- The memcached "flush_all" command.
- Args:
- delay: optional int, the number of seconds to wait before flushing,
- or zero to flush immediately (the default).
- noreply: optional bool, True to not wait for the reply (defaults to
- self.default_noreply).
- Returns:
- True.
- """
- if noreply is None:
- noreply = self.default_noreply
- delay = self._check_integer(delay, "delay")
- cmd = b'flush_all ' + delay
- if noreply:
- cmd += b' noreply'
- cmd += b'\r\n'
- results = self._misc_cmd([cmd], b'flush_all', noreply)
- if noreply:
- return True
- return results[0] == b'OK'
- def quit(self):
- """
- The memcached "quit" command.
- This will close the connection with memcached. Calling any other
- method on this object will re-open the connection, so this object can
- be re-used after quit.
- """
- cmd = b"quit\r\n"
- self._misc_cmd([cmd], b'quit', True)
- self.close()
- def shutdown(self, graceful=False):
- """
- The memcached "shutdown" command.
- This will request shutdown and eventual termination of the server,
- optionally preceded by a graceful stop of memcached's internal state
- machine. Note that the server needs to have been started with the
- shutdown protocol command enabled with the --enable-shutdown flag.
- Args:
- graceful: optional bool, True to request a graceful shutdown with
- SIGUSR1 (defaults to False, i.e. SIGINT shutdown).
- """
- cmd = b'shutdown'
- if graceful:
- cmd += b' graceful'
- cmd += b'\r\n'
- # The shutdown command raises a server-side error if the shutdown
- # protocol command is not enabled. Otherwise, a successful shutdown
- # is expected to close the remote end of the transport.
- try:
- self._misc_cmd([cmd], b'shutdown', False)
- except MemcacheUnexpectedCloseError:
- pass
- def _raise_errors(self, line, name):
- if line.startswith(b'ERROR'):
- raise MemcacheUnknownCommandError(name)
- if line.startswith(b'CLIENT_ERROR'):
- error = line[line.find(b' ') + 1:]
- raise MemcacheClientError(error)
- if line.startswith(b'SERVER_ERROR'):
- error = line[line.find(b' ') + 1:]
- raise MemcacheServerError(error)
- def _check_integer(self, value, name):
- """Check that a value is an integer and encode it as a binary string"""
- if not isinstance(value, six.integer_types):
- raise MemcacheIllegalInputError(
- '%s must be integer, got bad value: %r' % (name, value)
- )
- return six.text_type(value).encode(self.encoding)
- def _check_cas(self, cas):
- """Check that a value is a valid input for 'cas' -- either an int or a
- string containing only 0-9
- The value will be (re)encoded so that we can accept strings or bytes.
- """
- # convert non-binary values to binary
- if isinstance(cas, (six.integer_types, six.string_types)):
- try:
- cas = six.text_type(cas).encode(self.encoding)
- except UnicodeEncodeError:
- raise MemcacheIllegalInputError(
- 'non-ASCII cas value: %r' % cas)
- elif not isinstance(cas, six.binary_type):
- raise MemcacheIllegalInputError(
- 'cas must be integer, string, or bytes, got bad value: %r' % cas
- )
- if not cas.isdigit():
- raise MemcacheIllegalInputError(
- 'cas must only contain values in 0-9, got bad value: %r'
- % cas
- )
- return cas
- def _extract_value(self, expect_cas, line, buf, remapped_keys,
- prefixed_keys):
- """
- This function is abstracted from _fetch_cmd to support different ways
- of value extraction. In order to use this feature, _extract_value needs
- to be overridden in the subclass.
- """
- if expect_cas:
- _, key, flags, size, cas = line.split()
- else:
- try:
- _, key, flags, size = line.split()
- except Exception as e:
- raise ValueError("Unable to parse line %s: %s" % (line, e))
- value = None
- try:
- buf, value = _readvalue(self.sock, buf, int(size))
- except MemcacheUnexpectedCloseError:
- self.close()
- raise
- key = remapped_keys[key]
- value = self.serde.deserialize(key, value, int(flags))
- if expect_cas:
- return key, (value, cas), buf
- else:
- return key, value, buf
- def _fetch_cmd(self, name, keys, expect_cas):
- prefixed_keys = [self.check_key(k) for k in keys]
- remapped_keys = dict(zip(prefixed_keys, keys))
- # It is important for all keys to be listed in their original order.
- cmd = name
- if prefixed_keys:
- cmd += b' ' + b' '.join(prefixed_keys)
- cmd += b'\r\n'
- total_line = ''
- try:
- if self.sock is None:
- self._connect()
- self.sock.sendall(cmd)
- buf = b''
- line = None
- result = {}
- while True:
- try:
- buf, line, total_buf = _readline(self.sock, buf)
- total_line += total_buf
- except MemcacheUnexpectedCloseError:
- self.close()
- raise
- self._raise_errors(line, name)
- if line == b'END' or line == b'OK':
- return result
- elif line.startswith(b'VALUE'):
- key, value, buf = self._extract_value(expect_cas, line, buf,
- remapped_keys,
- prefixed_keys)
- result[key] = value
- elif name == b'stats' and line.startswith(b'STAT'):
- key_value = line.split()
- result[key_value[1]] = key_value[2]
- elif name == b'stats' and line.startswith(b'ITEM'):
- # For 'stats cachedump' commands
- key_value = line.split()
- result[key_value[1]] = b' '.join(key_value[2:])
- else:
- raise MemcacheUnknownError(line[:32])
- except Exception:
- self.close()
- if self.ignore_exc:
- return {}
- raise
- finally:
- self.debuglog('cmd = {}; ret = {}'.format(cmd, total_line))
- def _store_cmd(self, name, values, expire, noreply, flags=None, cas=None):
- cmds = []
- keys = []
- extra = b''
- if cas is not None:
- extra += b' ' + cas
- if noreply:
- extra += b' noreply'
- expire = self._check_integer(expire, "expire")
- for key, data in six.iteritems(values):
- # must be able to reliably map responses back to the original order
- keys.append(key)
- key = self.check_key(key)
- data, data_flags = self.serde.serialize(key, data)
- # If 'flags' was explicitly provided, it overrides the value
- # returned by the serializer.
- if flags is not None:
- data_flags = flags
- if not isinstance(data, six.binary_type):
- try:
- data = six.text_type(data).encode(self.encoding)
- except UnicodeEncodeError as e:
- raise MemcacheIllegalInputError(
- "Data values must be binary-safe: %s" % e)
- cmds.append(name + b' ' + key + b' ' +
- six.text_type(data_flags).encode(self.encoding) +
- b' ' + expire +
- b' ' + six.text_type(len(data)).encode(self.encoding) +
- extra + b'\r\n' + data + b'\r\n')
- if self.sock is None:
- self._connect()
- total_line = b''
- try:
- self.sock.sendall(b''.join(cmds))
- if noreply:
- return {k: True for k in keys}
- results = {}
- buf = b''
- line = None
- for key in keys:
- try:
- buf, line, total_buf = _readline(self.sock, buf)
- total_line += total_buf
- except MemcacheUnexpectedCloseError:
- self.close()
- raise
- self._raise_errors(line, name)
- if line in VALID_STORE_RESULTS[name]:
- results[key] = STORE_RESULTS_VALUE[line]
- else:
- raise MemcacheUnknownError(line[:32])
- return results
- except Exception:
- self.close()
- raise
- finally:
- self.debuglog('cmd = {}; ret = {}'.format(b''.join(cmds), total_line))
- def _misc_cmd(self, cmds, cmd_name, noreply):
- if self.sock is None:
- self._connect()
- total_line = b''
- try:
- self.sock.sendall(b''.join(cmds))
- if noreply:
- return []
- results = []
- buf = b''
- line = None
- for cmd in cmds:
- try:
- buf, line, total_buf = _readline(self.sock, buf)
- total_line += total_buf
- except MemcacheUnexpectedCloseError:
- self.close()
- raise
- self._raise_errors(line, cmd_name)
- results.append(line)
- return results
- except Exception:
- self.close()
- raise
- finally:
- self.debuglog('cmd = {}; ret = {}'.format(b''.join(cmds), total_line))
- def __setitem__(self, key, value):
- self.set(key, value, noreply=True)
- def __getitem__(self, key):
- value = self.get(key)
- if value is None:
- raise KeyError
- return value
- def __delitem__(self, key):
- self.delete(key, noreply=True)
- class PooledClient(object):
- """A thread-safe pool of clients (with the same client api).
- Args:
- max_pool_size: maximum pool size to use (going above this amount
- triggers a runtime error), by default this is 2147483648L
- when not provided (or none).
- pool_idle_timeout: pooled connections are discarded if they have been
- unused for this many seconds. A value of 0 indicates
- that pooled connections are never discarded.
- lock_generator: a callback/type that takes no arguments that will
- be called to create a lock or semaphore that can
- protect the pool from concurrent access (for example a
- eventlet lock or semaphore could be used instead)
- Further arguments are interpreted as for :py:class:`.Client` constructor.
- Note: if `serde` is given, the same object will be used for *all* clients
- in the pool. Your serde object must therefore be thread-safe.
- """
- #: :class:`Client` class used to create new clients
- client_class = Client
- def __init__(self,
- server,
- serde=None,
- serializer=None,
- deserializer=None,
- connect_timeout=None,
- timeout=None,
- no_delay=False,
- ignore_exc=False,
- socket_module=socket,
- socket_keepalive=None,
- key_prefix=b'',
- max_pool_size=None,
- pool_idle_timeout=0,
- lock_generator=None,
- default_noreply=True,
- allow_unicode_keys=False,
- encoding='ascii',
- tls_context=None):
- self.server = normalize_server_spec(server)
- self.serde = serde or LegacyWrappingSerde(serializer, deserializer)
- self.connect_timeout = connect_timeout
- self.timeout = timeout
- self.no_delay = no_delay
- self.ignore_exc = ignore_exc
- self.socket_module = socket_module
- self.socket_keepalive = socket_keepalive
- self.default_noreply = default_noreply
- self.allow_unicode_keys = allow_unicode_keys
- if isinstance(key_prefix, six.text_type):
- key_prefix = key_prefix.encode('ascii')
- if not isinstance(key_prefix, six.binary_type):
- raise TypeError("key_prefix should be bytes.")
- self.key_prefix = key_prefix
- self.client_pool = pool.ObjectPool(
- self._create_client,
- after_remove=lambda client: client.close(),
- max_size=max_pool_size,
- idle_timeout=pool_idle_timeout,
- lock_generator=lock_generator)
- self.encoding = encoding
- self.tls_context = tls_context
- def check_key(self, key):
- """Checks key and add key_prefix."""
- return check_key_helper(key, allow_unicode_keys=self.allow_unicode_keys,
- key_prefix=self.key_prefix)
- def _create_client(self):
- return self.client_class(
- self.server,
- serde=self.serde,
- connect_timeout=self.connect_timeout,
- timeout=self.timeout,
- no_delay=self.no_delay,
- # We need to know when it fails *always* so that we
- # can remove/destroy it from the pool...
- ignore_exc=False,
- socket_module=self.socket_module,
- socket_keepalive=self.socket_keepalive,
- key_prefix=self.key_prefix,
- default_noreply=self.default_noreply,
- allow_unicode_keys=self.allow_unicode_keys,
- tls_context=self.tls_context)
- def close(self):
- self.client_pool.clear()
- disconnect_all = close
- def set(self, key, value, expire=0, noreply=None, flags=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.set(key, value, expire=expire, noreply=noreply,
- flags=flags)
- def set_many(self, values, expire=0, noreply=None, flags=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.set_many(values, expire=expire, noreply=noreply,
- flags=flags)
- set_multi = set_many
- def replace(self, key, value, expire=0, noreply=None, flags=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.replace(key, value, expire=expire, noreply=noreply,
- flags=flags)
- def append(self, key, value, expire=0, noreply=None, flags=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.append(key, value, expire=expire, noreply=noreply,
- flags=flags)
- def prepend(self, key, value, expire=0, noreply=None, flags=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.prepend(key, value, expire=expire, noreply=noreply,
- flags=flags)
- def cas(self, key, value, cas, expire=0, noreply=False, flags=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.cas(key, value, cas,
- expire=expire, noreply=noreply, flags=flags)
- def get(self, key, default=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- try:
- return client.get(key, default)
- except Exception:
- if self.ignore_exc:
- return None
- else:
- raise
- def get_many(self, keys):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- try:
- return client.get_many(keys)
- except Exception:
- if self.ignore_exc:
- return {}
- else:
- raise
- get_multi = get_many
- def gets(self, key):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- try:
- return client.gets(key)
- except Exception:
- if self.ignore_exc:
- return (None, None)
- else:
- raise
- def gets_many(self, keys):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- try:
- return client.gets_many(keys)
- except Exception:
- if self.ignore_exc:
- return {}
- else:
- raise
- def delete(self, key, noreply=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.delete(key, noreply=noreply)
- def delete_many(self, keys, noreply=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.delete_many(keys, noreply=noreply)
- delete_multi = delete_many
- def add(self, key, value, expire=0, noreply=None, flags=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.add(key, value, expire=expire, noreply=noreply,
- flags=flags)
- def incr(self, key, value, noreply=False):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.incr(key, value, noreply=noreply)
- def decr(self, key, value, noreply=False):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.decr(key, value, noreply=noreply)
- def touch(self, key, expire=0, noreply=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.touch(key, expire=expire, noreply=noreply)
- def stats(self, *args):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- try:
- return client.stats(*args)
- except Exception:
- if self.ignore_exc:
- return {}
- else:
- raise
- def version(self):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.version()
- def flush_all(self, delay=0, noreply=None):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- return client.flush_all(delay=delay, noreply=noreply)
- def quit(self):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- try:
- client.quit()
- finally:
- self.client_pool.destroy(client)
- def shutdown(self, graceful=False):
- with self.client_pool.get_and_release(destroy_on_fail=True) as client:
- client.shutdown(graceful)
- def __setitem__(self, key, value):
- self.set(key, value, noreply=True)
- def __getitem__(self, key):
- value = self.get(key)
- if value is None:
- raise KeyError
- return value
- def __delitem__(self, key):
- self.delete(key, noreply=True)
- def _readline(sock, buf):
- """Read line of text from the socket.
- Read a line of text (delimited by "\r\n") from the socket, and
- return that line along with any trailing characters read from the
- socket.
- Args:
- sock: Socket object, should be connected.
- buf: String, zero or more characters, returned from an earlier
- call to _readline or _readvalue (pass an empty string on the
- first call).
- Returns:
- A tuple of (buf, line) where line is the full line read from the
- socket (minus the "\r\n" characters) and buf is any trailing
- characters read after the "\r\n" was found (which may be an empty
- string).
- """
- chunks = []
- last_char = b''
- total_buf = b''
- while True:
- # We're reading in chunks, so "\r\n" could appear in one chunk,
- # or across the boundary of two chunks, so we check for both
- # cases.
- # This case must appear first, since the buffer could have
- # later \r\n characters in it and we want to get the first \r\n.
- if last_char == b'\r' and buf[0:1] == b'\n':
- # Strip the last character from the last chunk.
- chunks[-1] = chunks[-1][:-1]
- return buf[1:], b''.join(chunks), total_buf
- elif buf.find(b'\r\n') != -1:
- before, sep, after = buf.partition(b"\r\n")
- chunks.append(before)
- return after, b''.join(chunks), total_buf
- if buf:
- chunks.append(buf)
- last_char = buf[-1:]
- buf = _recv(sock, RECV_SIZE)
- if not buf:
- raise MemcacheUnexpectedCloseError()
- total_buf += buf
- def _readvalue(sock, buf, size):
- """Read specified amount of bytes from the socket.
- Read size bytes, followed by the "\r\n" characters, from the socket,
- and return those bytes and any trailing bytes read after the "\r\n".
- Args:
- sock: Socket object, should be connected.
- buf: String, zero or more characters, returned from an earlier
- call to _readline or _readvalue (pass an empty string on the
- first call).
- size: Integer, number of bytes to read from the socket.
- Returns:
- A tuple of (buf, value) where value is the bytes read from the
- socket (there will be exactly size bytes) and buf is trailing
- characters read after the "\r\n" following the bytes (but not
- including the \r\n).
- """
- chunks = []
- rlen = size + 2
- while rlen - len(buf) > 0:
- if buf:
- rlen -= len(buf)
- chunks.append(buf)
- buf = _recv(sock, RECV_SIZE)
- if not buf:
- raise MemcacheUnexpectedCloseError()
- # Now we need to remove the \r\n from the end. There are two cases we care
- # about: the \r\n is all in the last buffer, or only the \n is in the last
- # buffer, and we need to remove the \r from the penultimate buffer.
- if rlen == 1:
- # replace the last chunk with the same string minus the last character,
- # which is always '\r' in this case.
- chunks[-1] = chunks[-1][:-1]
- else:
- # Just remove the "\r\n" from the latest chunk
- chunks.append(buf[:rlen - 2])
- return buf[rlen:], b''.join(chunks)
- def _recv(sock, size):
- """sock.recv() with retry on EINTR"""
- while True:
- try:
- return sock.recv(size)
- except IOError as e:
- if e.errno != errno.EINTR:
- raise
|