| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030 |
- import struct
- import io
- import datetime
- import re
- from rdbtools.encodehelpers import STRING_ESCAPE_RAW, apply_escape_bytes, bval
- from .compat import range, str2regexp
- from .iowrapper import IOWrapper
- try:
- try:
- from cStringIO import StringIO as BytesIO
- except ImportError:
- from StringIO import StringIO as BytesIO
- except ImportError:
- from io import BytesIO
- try:
- import lzf
- HAS_PYTHON_LZF = True
- except ImportError:
- HAS_PYTHON_LZF = False
-
- REDIS_RDB_6BITLEN = 0
- REDIS_RDB_14BITLEN = 1
- REDIS_RDB_32BITLEN = 0x80
- REDIS_RDB_64BITLEN = 0x81
- REDIS_RDB_ENCVAL = 3
- REDIS_RDB_OPCODE_AUX = 250
- REDIS_RDB_OPCODE_RESIZEDB = 251
- REDIS_RDB_OPCODE_EXPIRETIME_MS = 252
- REDIS_RDB_OPCODE_EXPIRETIME = 253
- REDIS_RDB_OPCODE_SELECTDB = 254
- REDIS_RDB_OPCODE_EOF = 255
- REDIS_RDB_TYPE_STRING = 0
- REDIS_RDB_TYPE_LIST = 1
- REDIS_RDB_TYPE_SET = 2
- REDIS_RDB_TYPE_ZSET = 3
- REDIS_RDB_TYPE_HASH = 4
- REDIS_RDB_TYPE_ZSET_2 = 5 # ZSET version 2 with doubles stored in binary.
- REDIS_RDB_TYPE_MODULE = 6
- REDIS_RDB_TYPE_MODULE_2 = 7
- REDIS_RDB_TYPE_HASH_ZIPMAP = 9
- REDIS_RDB_TYPE_LIST_ZIPLIST = 10
- REDIS_RDB_TYPE_SET_INTSET = 11
- REDIS_RDB_TYPE_ZSET_ZIPLIST = 12
- REDIS_RDB_TYPE_HASH_ZIPLIST = 13
- REDIS_RDB_TYPE_LIST_QUICKLIST = 14
- REDIS_RDB_ENC_INT8 = 0
- REDIS_RDB_ENC_INT16 = 1
- REDIS_RDB_ENC_INT32 = 2
- REDIS_RDB_ENC_LZF = 3
- REDIS_RDB_MODULE_OPCODE_EOF = 0 # End of module value.
- REDIS_RDB_MODULE_OPCODE_SINT = 1
- REDIS_RDB_MODULE_OPCODE_UINT = 2
- REDIS_RDB_MODULE_OPCODE_FLOAT = 3
- REDIS_RDB_MODULE_OPCODE_DOUBLE = 4
- REDIS_RDB_MODULE_OPCODE_STRING = 5
- DATA_TYPE_MAPPING = {
- 0 : "string", 1 : "list", 2 : "set", 3 : "sortedset", 4 : "hash", 5 : "sortedset", 6 : "module", 7: "module",
- 9 : "hash", 10 : "list", 11 : "set", 12 : "sortedset", 13 : "hash", 14 : "list"}
- class RdbCallback(object):
- """
- A Callback to handle events as the Redis dump file is parsed.
- This callback provides a serial and fast access to the dump file.
-
- """
- def __init__(self, string_escape):
- if string_escape is None:
- self._escape = STRING_ESCAPE_RAW
- else:
- self._escape = string_escape
- def encode_key(self, key):
- """
- Escape a given key bytes with the instance chosen escape method.
- Key is not escaped if it contains only 'ASCII printable' bytes.
- """
- return apply_escape_bytes(key, self._escape, skip_printable=True)
- def encode_value(self, val):
- """Escape a given value bytes with the instance chosen escape method."""
- return apply_escape_bytes(val, self._escape)
- def start_rdb(self):
- """
- Called once we know we are dealing with a valid redis dump file
-
- """
- pass
- def aux_field(self, key, value):
- """"
- Called in the beginning of the RDB with various meta data fields such as:
- redis-ver, redis-bits, ctime, used-mem
- exists since redis 3.2 (RDB v7)
- """
- pass
- def start_database(self, db_number):
- """
- Called to indicate database the start of database `db_number`
-
- Once a database starts, another database cannot start unless
- the first one completes and then `end_database` method is called
-
- Typically, callbacks store the current database number in a class variable
-
- """
- pass
- def start_module(self, key, module_name, expiry):
- """
- Called to indicate start of a module key
- :param key: string
- :param module_name: string
- :param expiry:
- :return: boolean to indicate whatever to record the full buffer or not
- """
- return False
- def handle_module_data(self, key, opcode, data):
- pass
- def end_module(self, key, buffer_size, buffer=None):
- pass
- def db_size(self, db_size, expires_size):
- """
- Called per database before the keys, with the key count in the main dictioney and the total voletaile key count
- exists since redis 3.2 (RDB v7)
- """
- pass
- def set(self, key, value, expiry, info):
- """
- Callback to handle a key with a string value and an optional expiry
-
- `key` is the redis key
- `value` is a string or a number
- `expiry` is a datetime object. None and can be None
- `info` is a dictionary containing additional information about this object.
-
- """
- pass
-
- def start_hash(self, key, length, expiry, info):
- """Callback to handle the start of a hash
-
- `key` is the redis key
- `length` is the number of elements in this hash.
- `expiry` is a `datetime` object. None means the object does not expire
- `info` is a dictionary containing additional information about this object.
-
- After `start_hash`, the method `hset` will be called with this `key` exactly `length` times.
- After that, the `end_hash` method will be called.
-
- """
- pass
-
- def hset(self, key, field, value):
- """
- Callback to insert a field=value pair in an existing hash
-
- `key` is the redis key for this hash
- `field` is a string
- `value` is the value to store for this field
-
- """
- pass
-
- def end_hash(self, key):
- """
- Called when there are no more elements in the hash
-
- `key` is the redis key for the hash
-
- """
- pass
-
- def start_set(self, key, cardinality, expiry, info):
- """
- Callback to handle the start of a hash
-
- `key` is the redis key
- `cardinality` is the number of elements in this set
- `expiry` is a `datetime` object. None means the object does not expire
- `info` is a dictionary containing additional information about this object.
-
- After `start_set`, the method `sadd` will be called with `key` exactly `cardinality` times
- After that, the `end_set` method will be called to indicate the end of the set.
-
- Note : This callback handles both Int Sets and Regular Sets
-
- """
- pass
- def sadd(self, key, member):
- """
- Callback to inser a new member to this set
-
- `key` is the redis key for this set
- `member` is the member to insert into this set
-
- """
- pass
-
- def end_set(self, key):
- """
- Called when there are no more elements in this set
-
- `key` the redis key for this set
-
- """
- pass
-
- def start_list(self, key, expiry, info):
- """
- Callback to handle the start of a list
-
- `key` is the redis key for this list
- `expiry` is a `datetime` object. None means the object does not expire
- `info` is a dictionary containing additional information about this object.
-
- After `start_list`, the method `rpush` will be called with `key` exactly `length` times
- After that, the `end_list` method will be called to indicate the end of the list
-
- Note : This callback handles both Zip Lists and Linked Lists.
-
- """
- pass
-
- def rpush(self, key, value):
- """
- Callback to insert a new value into this list
-
- `key` is the redis key for this list
- `value` is the value to be inserted
-
- Elements must be inserted to the end (i.e. tail) of the existing list.
-
- """
- pass
-
- def end_list(self, key, info):
- """
- Called when there are no more elements in this list
-
- `key` the redis key for this list
- `info` is a dictionary containing additional information about this object that wasn't known in start_list.
- """
- pass
-
- def start_sorted_set(self, key, length, expiry, info):
- """
- Callback to handle the start of a sorted set
-
- `key` is the redis key for this sorted
- `length` is the number of elements in this sorted set
- `expiry` is a `datetime` object. None means the object does not expire
- `info` is a dictionary containing additional information about this object.
-
- After `start_sorted_set`, the method `zadd` will be called with `key` exactly `length` times.
- Also, `zadd` will be called in a sorted order, so as to preserve the ordering of this sorted set.
- After that, the `end_sorted_set` method will be called to indicate the end of this sorted set
-
- Note : This callback handles sorted sets in that are stored as ziplists or skiplists
-
- """
- pass
-
- def zadd(self, key, score, member):
- """Callback to insert a new value into this sorted set
-
- `key` is the redis key for this sorted set
- `score` is the score for this `value`
- `value` is the element being inserted
- """
- pass
-
- def end_sorted_set(self, key):
- """
- Called when there are no more elements in this sorted set
-
- `key` is the redis key for this sorted set
-
- """
- pass
-
- def end_database(self, db_number):
- """
- Called when the current database ends
-
- After `end_database`, one of the methods are called -
- 1) `start_database` with a new database number
- OR
- 2) `end_rdb` to indicate we have reached the end of the file
-
- """
- pass
-
- def end_rdb(self):
- """Called to indicate we have completed parsing of the dump file"""
- pass
- class RdbParser(object):
- """
- A Parser for Redis RDB Files
-
- This class is similar in spirit to a SAX parser for XML files.
- The dump file is parsed sequentially. As and when objects are discovered,
- appropriate methods in the callback are called.
-
- Typical usage :
- callback = MyRdbCallback() # Typically a subclass of RdbCallback
- parser = RdbParser(callback)
- parser.parse('/var/redis/6379/dump.rdb')
-
- filter is a dictionary with the following keys
- {"dbs" : [0, 1], "keys" : "foo.*", "types" : ["hash", "set", "sortedset", "list", "string"]}
-
- If filter is None, results will not be filtered
- If dbs, keys or types is None or Empty, no filtering will be done on that axis
- """
- def __init__(self, callback, filters = None) :
- """
- `callback` is the object that will receive parse events
- """
- self._callback = callback
- self._key = None
- self._expiry = None
- self.init_filter(filters)
- self._rdb_version = 0
- def parse(self, filename):
- """
- Parse a redis rdb dump file, and call methods in the
- callback object during the parsing operation.
- """
- self.parse_fd(open(filename, "rb"))
- def parse_fd(self, fd):
- with fd as f:
- self.verify_magic_string(f.read(5))
- self.verify_version(f.read(4))
- self._callback.start_rdb()
- is_first_database = True
- db_number = 0
- while True :
- self._expiry = None
- data_type = read_unsigned_char(f)
- if data_type == REDIS_RDB_OPCODE_EXPIRETIME_MS :
- self._expiry = to_datetime(read_unsigned_long(f) * 1000)
- data_type = read_unsigned_char(f)
- elif data_type == REDIS_RDB_OPCODE_EXPIRETIME :
- self._expiry = to_datetime(read_unsigned_int(f) * 1000000)
- data_type = read_unsigned_char(f)
- if data_type == REDIS_RDB_OPCODE_SELECTDB :
- if not is_first_database :
- self._callback.end_database(db_number)
- is_first_database = False
- db_number = self.read_length(f)
- self._callback.start_database(db_number)
- continue
- if data_type == REDIS_RDB_OPCODE_AUX:
- aux_key = self.read_string(f)
- aux_val = self.read_string(f)
- ret = self._callback.aux_field(aux_key, aux_val)
- if ret:
- break # TODO: make all callbacks return abort flag
- continue
- if data_type == REDIS_RDB_OPCODE_RESIZEDB:
- db_size = self.read_length(f)
- expire_size = self.read_length(f)
- self._callback.db_size(db_size, expire_size)
- continue
- if data_type == REDIS_RDB_OPCODE_EOF:
- self._callback.end_database(db_number)
- self._callback.end_rdb()
- if self._rdb_version >= 5:
- f.read(8)
- break
- if self.matches_filter(db_number):
- self._key = self.read_string(f)
- if self.matches_filter(db_number, self._key, data_type):
- self.read_object(f, data_type)
- else:
- self.skip_object(f, data_type)
- else :
- self.skip_key_and_object(f, data_type)
- def read_length_with_encoding(self, f):
- length = 0
- is_encoded = False
- bytes = []
- bytes.append(read_unsigned_char(f))
- enc_type = (bytes[0] & 0xC0) >> 6
- if enc_type == REDIS_RDB_ENCVAL:
- is_encoded = True
- length = bytes[0] & 0x3F
- elif enc_type == REDIS_RDB_6BITLEN:
- length = bytes[0] & 0x3F
- elif enc_type == REDIS_RDB_14BITLEN:
- bytes.append(read_unsigned_char(f))
- length = ((bytes[0] & 0x3F) << 8) | bytes[1]
- elif bytes[0] == REDIS_RDB_32BITLEN:
- length = read_unsigned_int_be(f)
- elif bytes[0] == REDIS_RDB_64BITLEN:
- length = read_unsigned_long_be(f)
- else:
- raise Exception('read_length_with_encoding', "Invalid string encoding %s (encoding byte 0x%X)" % (enc_type, bytes[0]))
- return (length, is_encoded)
- def read_length(self, f) :
- return self.read_length_with_encoding(f)[0]
- def read_string(self, f) :
- tup = self.read_length_with_encoding(f)
- length = tup[0]
- is_encoded = tup[1]
- val = None
- if is_encoded :
- if length == REDIS_RDB_ENC_INT8 :
- val = read_signed_char(f)
- elif length == REDIS_RDB_ENC_INT16 :
- val = read_signed_short(f)
- elif length == REDIS_RDB_ENC_INT32 :
- val = read_signed_int(f)
- elif length == REDIS_RDB_ENC_LZF :
- clen = self.read_length(f)
- l = self.read_length(f)
- val = self.lzf_decompress(f.read(clen), l)
- else:
- raise Exception('read_string', "Invalid string encoding %s"%(length))
- else :
- val = f.read(length)
- return val
- def read_float(self, f):
- dbl_length = read_unsigned_char(f)
- if dbl_length == 253:
- return float('nan')
- elif dbl_length == 254:
- return float('inf')
- elif dbl_length == 255:
- return float('-inf')
- data = f.read(dbl_length)
- if isinstance(data, str):
- return float(data)
- return data # bug?
- # Read an object for the stream
- # f is the redis file
- # enc_type is the type of object
- def read_object(self, f, enc_type) :
- if enc_type == REDIS_RDB_TYPE_STRING :
- val = self.read_string(f)
- self._callback.set(self._key, val, self._expiry, info={'encoding':'string'})
- elif enc_type == REDIS_RDB_TYPE_LIST :
- # A redis list is just a sequence of strings
- # We successively read strings from the stream and create a list from it
- # The lists are in order i.e. the first string is the head,
- # and the last string is the tail of the list
- length = self.read_length(f)
- self._callback.start_list(self._key, self._expiry, info={'encoding':'linkedlist' })
- for count in range(0, length) :
- val = self.read_string(f)
- self._callback.rpush(self._key, val)
- self._callback.end_list(self._key, info={'encoding':'linkedlist' })
- elif enc_type == REDIS_RDB_TYPE_SET:
- # A redis list is just a sequence of strings
- # We successively read strings from the stream and create a set from it
- # Note that the order of strings is non-deterministic
- length = self.read_length(f)
- self._callback.start_set(self._key, length, self._expiry, info={'encoding':'hashtable'})
- for count in range(0, length):
- val = self.read_string(f)
- self._callback.sadd(self._key, val)
- self._callback.end_set(self._key)
- elif enc_type == REDIS_RDB_TYPE_ZSET or enc_type == REDIS_RDB_TYPE_ZSET_2 :
- length = self.read_length(f)
- self._callback.start_sorted_set(self._key, length, self._expiry, info={'encoding':'skiplist'})
- for count in range(0, length):
- val = self.read_string(f)
- score = read_double(f) if enc_type == REDIS_RDB_TYPE_ZSET_2 else self.read_float(f)
- self._callback.zadd(self._key, score, val)
- self._callback.end_sorted_set(self._key)
- elif enc_type == REDIS_RDB_TYPE_HASH:
- length = self.read_length(f)
- self._callback.start_hash(self._key, length, self._expiry, info={'encoding':'hashtable'})
- for count in range(0, length):
- field = self.read_string(f)
- value = self.read_string(f)
- self._callback.hset(self._key, field, value)
- self._callback.end_hash(self._key)
- elif enc_type == REDIS_RDB_TYPE_HASH_ZIPMAP:
- self.read_zipmap(f)
- elif enc_type == REDIS_RDB_TYPE_LIST_ZIPLIST:
- self.read_ziplist(f)
- elif enc_type == REDIS_RDB_TYPE_SET_INTSET:
- self.read_intset(f)
- elif enc_type == REDIS_RDB_TYPE_ZSET_ZIPLIST:
- self.read_zset_from_ziplist(f)
- elif enc_type == REDIS_RDB_TYPE_HASH_ZIPLIST:
- self.read_hash_from_ziplist(f)
- elif enc_type == REDIS_RDB_TYPE_LIST_QUICKLIST:
- self.read_list_from_quicklist(f)
- elif enc_type == REDIS_RDB_TYPE_MODULE:
- raise Exception('read_object', 'Unable to read Redis Modules RDB objects (key %s)' % self._key)
- elif enc_type == REDIS_RDB_TYPE_MODULE_2:
- self.read_module(f)
- else:
- raise Exception('read_object', 'Invalid object type %d for key %s' % (enc_type, self._key))
- def skip_key_and_object(self, f, data_type):
- self.skip_string(f)
- self.skip_object(f, data_type)
- def skip_string(self, f):
- tup = self.read_length_with_encoding(f)
- length = tup[0]
- is_encoded = tup[1]
- bytes_to_skip = 0
- if is_encoded :
- if length == REDIS_RDB_ENC_INT8 :
- bytes_to_skip = 1
- elif length == REDIS_RDB_ENC_INT16 :
- bytes_to_skip = 2
- elif length == REDIS_RDB_ENC_INT32 :
- bytes_to_skip = 4
- elif length == REDIS_RDB_ENC_LZF :
- clen = self.read_length(f)
- l = self.read_length(f)
- bytes_to_skip = clen
- else :
- bytes_to_skip = length
-
- skip(f, bytes_to_skip)
-
- def skip_float(self, f):
- dbl_length = read_unsigned_char(f)
- if dbl_length < 253:
- skip(f, dbl_length)
-
- def skip_binary_double(self, f):
- skip(f, 8)
- def skip_object(self, f, enc_type):
- skip_strings = 0
- if enc_type == REDIS_RDB_TYPE_STRING :
- skip_strings = 1
- elif enc_type == REDIS_RDB_TYPE_LIST :
- skip_strings = self.read_length(f)
- elif enc_type == REDIS_RDB_TYPE_SET :
- skip_strings = self.read_length(f)
- elif enc_type == REDIS_RDB_TYPE_ZSET or enc_type == REDIS_RDB_TYPE_ZSET_2 :
- length = self.read_length(f)
- for x in range(length):
- self.skip_string(f)
- self.skip_binary_double(f) if enc_type == REDIS_RDB_TYPE_ZSET_2 else self.skip_float(f)
- elif enc_type == REDIS_RDB_TYPE_HASH :
- skip_strings = self.read_length(f) * 2
- elif enc_type == REDIS_RDB_TYPE_HASH_ZIPMAP :
- skip_strings = 1
- elif enc_type == REDIS_RDB_TYPE_LIST_ZIPLIST :
- skip_strings = 1
- elif enc_type == REDIS_RDB_TYPE_SET_INTSET :
- skip_strings = 1
- elif enc_type == REDIS_RDB_TYPE_ZSET_ZIPLIST :
- skip_strings = 1
- elif enc_type == REDIS_RDB_TYPE_HASH_ZIPLIST :
- skip_strings = 1
- elif enc_type == REDIS_RDB_TYPE_LIST_QUICKLIST:
- skip_strings = self.read_length(f)
- elif enc_type == REDIS_RDB_TYPE_MODULE:
- raise Exception('skip_object', 'Unable to skip Redis Modules RDB objects (key %s)' % self._key)
- elif enc_type == REDIS_RDB_TYPE_MODULE_2:
- self.read_module(f)
- else:
- raise Exception('skip_object', 'Invalid object type %d for key %s' % (enc_type, self._key))
- for x in range(0, skip_strings):
- self.skip_string(f)
- def read_intset(self, f) :
- raw_string = self.read_string(f)
- buff = BytesIO(raw_string)
- encoding = read_unsigned_int(buff)
- num_entries = read_unsigned_int(buff)
- self._callback.start_set(self._key, num_entries, self._expiry, info={'encoding':'intset', 'sizeof_value':len(raw_string)})
- for x in range(0, num_entries) :
- if encoding == 8 :
- entry = read_signed_long(buff)
- elif encoding == 4 :
- entry = read_signed_int(buff)
- elif encoding == 2 :
- entry = read_signed_short(buff)
- else :
- raise Exception('read_intset', 'Invalid encoding %d for key %s' % (encoding, self._key))
- self._callback.sadd(self._key, entry)
- self._callback.end_set(self._key)
- def read_ziplist(self, f) :
- raw_string = self.read_string(f)
- buff = BytesIO(raw_string)
- zlbytes = read_unsigned_int(buff)
- tail_offset = read_unsigned_int(buff)
- num_entries = read_unsigned_short(buff)
- self._callback.start_list(self._key, self._expiry, info={'encoding':'ziplist', 'sizeof_value':len(raw_string)})
- for x in range(0, num_entries) :
- val = self.read_ziplist_entry(buff)
- self._callback.rpush(self._key, val)
- zlist_end = read_unsigned_char(buff)
- if zlist_end != 255 :
- raise Exception('read_ziplist', "Invalid zip list end - %d for key %s" % (zlist_end, self._key))
- self._callback.end_list(self._key, info={'encoding':'ziplist'})
- def read_list_from_quicklist(self, f):
- count = self.read_length(f)
- total_size = 0
- self._callback.start_list(self._key, self._expiry, info={'encoding': 'quicklist', 'zips': count})
- for i in range(0, count):
- raw_string = self.read_string(f)
- total_size += len(raw_string)
- buff = BytesIO(raw_string)
- zlbytes = read_unsigned_int(buff)
- tail_offset = read_unsigned_int(buff)
- num_entries = read_unsigned_short(buff)
- for x in range(0, num_entries):
- self._callback.rpush(self._key, self.read_ziplist_entry(buff))
- zlist_end = read_unsigned_char(buff)
- if zlist_end != 255:
- raise Exception('read_quicklist', "Invalid zip list end - %d for key %s" % (zlist_end, self._key))
- self._callback.end_list(self._key, info={'encoding': 'quicklist', 'zips': count, 'sizeof_value': total_size})
- def read_zset_from_ziplist(self, f) :
- raw_string = self.read_string(f)
- buff = BytesIO(raw_string)
- zlbytes = read_unsigned_int(buff)
- tail_offset = read_unsigned_int(buff)
- num_entries = read_unsigned_short(buff)
- if (num_entries % 2) :
- raise Exception('read_zset_from_ziplist', "Expected even number of elements, but found %d for key %s" % (num_entries, self._key))
- num_entries = num_entries // 2
- self._callback.start_sorted_set(self._key, num_entries, self._expiry, info={'encoding':'ziplist', 'sizeof_value':len(raw_string)})
- for x in range(0, num_entries) :
- member = self.read_ziplist_entry(buff)
- score = self.read_ziplist_entry(buff)
- if isinstance(score, bytes) :
- score = float(score)
- self._callback.zadd(self._key, score, member)
- zlist_end = read_unsigned_char(buff)
- if zlist_end != 255 :
- raise Exception('read_zset_from_ziplist', "Invalid zip list end - %d for key %s" % (zlist_end, self._key))
- self._callback.end_sorted_set(self._key)
- def read_hash_from_ziplist(self, f) :
- raw_string = self.read_string(f)
- buff = BytesIO(raw_string)
- zlbytes = read_unsigned_int(buff)
- tail_offset = read_unsigned_int(buff)
- num_entries = read_unsigned_short(buff)
- if (num_entries % 2) :
- raise Exception('read_hash_from_ziplist', "Expected even number of elements, but found %d for key %s" % (num_entries, self._key))
- num_entries = num_entries // 2
- self._callback.start_hash(self._key, num_entries, self._expiry, info={'encoding':'ziplist', 'sizeof_value':len(raw_string)})
- for x in range(0, num_entries) :
- field = self.read_ziplist_entry(buff)
- value = self.read_ziplist_entry(buff)
- self._callback.hset(self._key, field, value)
- zlist_end = read_unsigned_char(buff)
- if zlist_end != 255 :
- raise Exception('read_hash_from_ziplist', "Invalid zip list end - %d for key %s" % (zlist_end, self._key))
- self._callback.end_hash(self._key)
-
-
- def read_ziplist_entry(self, f) :
- length = 0
- value = None
- prev_length = read_unsigned_char(f)
- if prev_length == 254 :
- prev_length = read_unsigned_int(f)
- entry_header = read_unsigned_char(f)
- if (entry_header >> 6) == 0 :
- length = entry_header & 0x3F
- value = f.read(length)
- elif (entry_header >> 6) == 1 :
- length = ((entry_header & 0x3F) << 8) | read_unsigned_char(f)
- value = f.read(length)
- elif (entry_header >> 6) == 2 :
- length = read_unsigned_int_be(f)
- value = f.read(length)
- elif (entry_header >> 4) == 12 :
- value = read_signed_short(f)
- elif (entry_header >> 4) == 13 :
- value = read_signed_int(f)
- elif (entry_header >> 4) == 14 :
- value = read_signed_long(f)
- elif (entry_header == 240) :
- value = read_24bit_signed_number(f)
- elif (entry_header == 254) :
- value = read_signed_char(f)
- elif (entry_header >= 241 and entry_header <= 253) :
- value = entry_header - 241
- else :
- raise Exception('read_ziplist_entry', 'Invalid entry_header %d for key %s' % (entry_header, self._key))
- return value
-
- def read_zipmap(self, f) :
- raw_string = self.read_string(f)
- buff = io.BytesIO(bytearray(raw_string))
- num_entries = read_unsigned_char(buff)
- self._callback.start_hash(self._key, num_entries, self._expiry, info={'encoding':'zipmap', 'sizeof_value':len(raw_string)})
- while True :
- next_length = self.read_zipmap_next_length(buff)
- if next_length is None :
- break
- key = buff.read(next_length)
- next_length = self.read_zipmap_next_length(buff)
- if next_length is None :
- raise Exception('read_zip_map', 'Unexepcted end of zip map for key %s' % self._key)
- free = read_unsigned_char(buff)
- value = buff.read(next_length)
- try:
- value = int(value)
- except ValueError:
- pass
-
- skip(buff, free)
- self._callback.hset(self._key, key, value)
- self._callback.end_hash(self._key)
- def read_zipmap_next_length(self, f) :
- num = read_unsigned_char(f)
- if num < 254:
- return num
- elif num == 254:
- return read_unsigned_int(f)
- else:
- return None
- def read_module(self, f):
- # this method is based on the actual implementation in redis (src/rdb.c:rdbLoadObject)
- iowrapper = IOWrapper(f)
- iowrapper.start_recording_size()
- iowrapper.start_recording()
- length, encoding = self.read_length_with_encoding(iowrapper)
- record_buffer = self._callback.start_module(self._key, self._decode_module_id(length), self._expiry)
- if not record_buffer:
- iowrapper.stop_recording()
- opcode = self.read_length(iowrapper)
- while opcode != REDIS_RDB_MODULE_OPCODE_EOF:
- if opcode == REDIS_RDB_MODULE_OPCODE_SINT or opcode == REDIS_RDB_MODULE_OPCODE_UINT:
- data = self.read_length(iowrapper)
- elif opcode == REDIS_RDB_MODULE_OPCODE_FLOAT:
- data = self.read_float(iowrapper)
- elif opcode == REDIS_RDB_MODULE_OPCODE_DOUBLE:
- data = read_double(iowrapper)
- elif opcode == REDIS_RDB_MODULE_OPCODE_STRING:
- data = self.read_string(iowrapper)
- else:
- raise Exception("Unknown module opcode %s" % opcode)
- self._callback.handle_module_data(self._key, opcode, data)
- # read the next item in the module data type
- opcode = self.read_length(iowrapper)
- buffer = None
- if record_buffer:
- # prepand the buffer with REDIS_RDB_TYPE_MODULE_2 type
- buffer = struct.pack('B', REDIS_RDB_TYPE_MODULE_2) + iowrapper.get_recorded_buffer()
- iowrapper.stop_recording()
- self._callback.end_module(self._key, buffer_size=iowrapper.get_recorded_size(), buffer=buffer)
- charset = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'
- def _decode_module_id(self, module_id):
- """
- decode module id to string
- based on @antirez moduleTypeNameByID function from redis/src/module.c
- :param module_id: 64bit integer
- :return: string
- """
- name = [''] * 9
- module_id >>= 10
- for i in reversed(range(9)):
- name[i] = self.charset[module_id & 63]
- module_id >>= 6
- return ''.join(name)
- def verify_magic_string(self, magic_string) :
- if magic_string != b'REDIS' :
- raise Exception('verify_magic_string', 'Invalid File Format')
- def verify_version(self, version_str) :
- version = int(version_str)
- if version < 1 or version > 8:
- raise Exception('verify_version', 'Invalid RDB version number %d' % version)
- self._rdb_version = version
- def init_filter(self, filters):
- self._filters = {}
- if not filters:
- filters={}
- if not 'dbs' in filters:
- self._filters['dbs'] = None
- elif isinstance(filters['dbs'], int):
- self._filters['dbs'] = (filters['dbs'], )
- elif isinstance(filters['dbs'], list):
- self._filters['dbs'] = [int(x) for x in filters['dbs']]
- else:
- raise Exception('init_filter', 'invalid value for dbs in filter %s' %filters['dbs'])
-
- if not ('keys' in filters and filters['keys']):
- self._filters['keys'] = re.compile(b".*")
- else:
- self._filters['keys'] = str2regexp(filters['keys'])
-
- if not ('not_keys' in filters and filters['not_keys']):
- self._filters['not_keys'] = None
- else:
- self._filters['not_keys'] = str2regexp(filters['not_keys'])
- if not 'types' in filters:
- self._filters['types'] = ('set', 'hash', 'sortedset', 'module', 'string', 'list')
- elif isinstance(filters['types'], bytes):
- self._filters['types'] = (filters['types'], )
- elif isinstance(filters['types'], list):
- self._filters['types'] = [str(x) for x in filters['types']]
- else:
- raise Exception('init_filter', 'invalid value for types in filter %s' %filters['types'])
-
- def matches_filter(self, db_number, key=None, data_type=None):
- if isinstance(key, bytes):
- key_to_match = key
- elif isinstance(key, str): # bytes key in python2
- key_to_match = key
- else:
- key_to_match = str(key).encode('utf-8')
- if self._filters['dbs'] and (not db_number in self._filters['dbs']):
- return False
- if key and self._filters['not_keys'] and (self._filters['not_keys'].match(key_to_match)):
- return False
- if key and (not self._filters['keys'].match(key_to_match)):
- return False
- if data_type is not None and (not self.get_logical_type(data_type) in self._filters['types']):
- return False
- return True
-
- def get_logical_type(self, data_type):
- return DATA_TYPE_MAPPING[data_type]
-
- def lzf_decompress(self, compressed, expected_length):
- if HAS_PYTHON_LZF:
- return lzf.decompress(compressed, expected_length)
- else:
- in_stream = bytearray(compressed)
- in_len = len(in_stream)
- in_index = 0
- out_stream = bytearray()
- out_index = 0
- while in_index < in_len :
- ctrl = in_stream[in_index]
- if not isinstance(ctrl, int) :
- raise Exception('lzf_decompress', 'ctrl should be a number %s for key %s' % (str(ctrl), self._key))
- in_index = in_index + 1
- if ctrl < 32 :
- for x in range(0, ctrl + 1) :
- out_stream.append(in_stream[in_index])
- #sys.stdout.write(chr(in_stream[in_index]))
- in_index = in_index + 1
- out_index = out_index + 1
- else :
- length = ctrl >> 5
- if length == 7 :
- length = length + in_stream[in_index]
- in_index = in_index + 1
- ref = out_index - ((ctrl & 0x1f) << 8) - in_stream[in_index] - 1
- in_index = in_index + 1
- for x in range(0, length + 2) :
- out_stream.append(out_stream[ref])
- ref = ref + 1
- out_index = out_index + 1
- if len(out_stream) != expected_length :
- raise Exception('lzf_decompress', 'Expected lengths do not match %d != %d for key %s' % (len(out_stream), expected_length, self._key))
- return bytes(out_stream)
- def skip(f, free):
- if free :
- f.read(free)
- def to_datetime(usecs_since_epoch):
- seconds_since_epoch = usecs_since_epoch // 1000000
- if seconds_since_epoch > 221925052800 :
- seconds_since_epoch = 221925052800
- useconds = usecs_since_epoch % 1000000
- dt = datetime.datetime.utcfromtimestamp(seconds_since_epoch)
- delta = datetime.timedelta(microseconds = useconds)
- return dt + delta
-
- def read_signed_char(f) :
- return struct.unpack('b', f.read(1))[0]
-
- def read_unsigned_char(f) :
- return struct.unpack('B', f.read(1))[0]
- def read_signed_short(f) :
- return struct.unpack('h', f.read(2))[0]
-
- def read_unsigned_short(f) :
- return struct.unpack('H', f.read(2))[0]
- def read_signed_int(f) :
- return struct.unpack('i', f.read(4))[0]
-
- def read_unsigned_int(f) :
- return struct.unpack('I', f.read(4))[0]
- def read_unsigned_int_be(f):
- return struct.unpack('>I', f.read(4))[0]
- def read_24bit_signed_number(f):
- s = b'0' + f.read(3)
- num = struct.unpack('i', s)[0]
- return num >> 8
-
- def read_signed_long(f) :
- return struct.unpack('q', f.read(8))[0]
-
- def read_unsigned_long(f) :
- return struct.unpack('Q', f.read(8))[0]
-
- def read_unsigned_long_be(f) :
- return struct.unpack('>Q', f.read(8))[0]
- def read_double(f) :
- return struct.unpack('d', f.read(8))[0]
- def string_as_hexcode(string) :
- for s in string :
- if isinstance(s, int) :
- print(hex(s))
- else :
- print(hex(ord(s)))
- class DebugCallback(RdbCallback) :
- def start_rdb(self):
- print('[')
-
- def aux_field(self, key, value):
- print('aux:[%s:%s]' % (key, value))
- def start_database(self, db_number):
- print('{')
- def db_size(self, db_size, expires_size):
- print('db_size: %s, expires_size %s' % (db_size, expires_size))
-
- def set(self, key, value, expiry):
- print('"%s" : "%s"' % (str(key), str(value)))
-
- def start_hash(self, key, length, expiry):
- print('"%s" : {' % str(key))
- pass
-
- def hset(self, key, field, value):
- print('"%s" : "%s"' % (str(field), str(value)))
-
- def end_hash(self, key):
- print('}')
-
- def start_set(self, key, cardinality, expiry):
- print('"%s" : [' % str(key))
- def sadd(self, key, member):
- print('"%s"' % str(member))
-
- def end_set(self, key):
- print(']')
-
- def start_list(self, key, expiry, info):
- print('"%s" : [' % str(key))
-
- def rpush(self, key, value) :
- print('"%s"' % str(value))
-
- def end_list(self, key, info):
- print(']')
-
- def start_sorted_set(self, key, length, expiry):
- print('"%s" : {' % str(key))
-
- def zadd(self, key, score, member):
- print('"%s" : "%s"' % (str(member), str(score)))
-
- def end_sorted_set(self, key):
- print('}')
-
- def end_database(self, db_number):
- print('}')
-
- def end_rdb(self):
- print(']')
|