callbacks.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. import calendar
  2. import codecs
  3. import json
  4. from rdbtools.parser import RdbCallback
  5. from rdbtools import encodehelpers
  6. class JSONCallback(RdbCallback):
  7. def __init__(self, out, string_escape=None):
  8. if string_escape is None:
  9. string_escape = encodehelpers.STRING_ESCAPE_UTF8
  10. super(JSONCallback, self).__init__(string_escape)
  11. self._out = out
  12. self._is_first_db = True
  13. self._has_databases = False
  14. self._is_first_key_in_db = True
  15. self._elements_in_key = 0
  16. self._element_index = 0
  17. def encode_key(self, key):
  18. key = encodehelpers.bytes_to_unicode(key, self._escape, skip_printable=True)
  19. return codecs.encode(json.dumps(key), 'utf-8')
  20. def encode_value(self, val):
  21. val = encodehelpers.bytes_to_unicode(val, self._escape)
  22. return codecs.encode(json.dumps(val), 'utf-8')
  23. def start_rdb(self):
  24. self._out.write(b'[')
  25. def start_database(self, db_number):
  26. if not self._is_first_db:
  27. self._out.write(b'},')
  28. self._out.write(b'{')
  29. self._is_first_db = False
  30. self._has_databases = True
  31. self._is_first_key_in_db = True
  32. def end_database(self, db_number):
  33. pass
  34. def end_rdb(self):
  35. if self._has_databases:
  36. self._out.write(b'}')
  37. self._out.write(b']')
  38. def _start_key(self, key, length):
  39. if not self._is_first_key_in_db:
  40. self._out.write(b',')
  41. self._out.write(b'\r\n')
  42. self._is_first_key_in_db = False
  43. self._elements_in_key = length
  44. self._element_index = 0
  45. def _end_key(self, key):
  46. pass
  47. def _write_comma(self):
  48. if self._element_index > 0 and self._element_index < self._elements_in_key :
  49. self._out.write(b',')
  50. self._element_index = self._element_index + 1
  51. def set(self, key, value, expiry, info):
  52. self._start_key(key, 0)
  53. self._out.write(self.encode_key(key) + b':' + self.encode_value(value))
  54. def start_hash(self, key, length, expiry, info):
  55. self._start_key(key, length)
  56. self._out.write(self.encode_key(key) + b':{')
  57. def hset(self, key, field, value):
  58. self._write_comma()
  59. self._out.write(self.encode_key(field) + b':' + self.encode_value(value))
  60. def end_hash(self, key):
  61. self._end_key(key)
  62. self._out.write(b'}')
  63. def start_set(self, key, cardinality, expiry, info):
  64. self._start_key(key, cardinality)
  65. self._out.write(self.encode_key(key) + b':[')
  66. def sadd(self, key, member):
  67. self._write_comma()
  68. self._out.write(self.encode_value(member))
  69. def end_set(self, key):
  70. self._end_key(key)
  71. self._out.write(b']')
  72. def start_list(self, key, expiry, info):
  73. self._start_key(key, 0)
  74. self._out.write(self.encode_key(key) + b':[')
  75. def rpush(self, key, value) :
  76. self._elements_in_key += 1
  77. self._write_comma()
  78. self._out.write(self.encode_value(value))
  79. def end_list(self, key, info):
  80. self._end_key(key)
  81. self._out.write(b']')
  82. def start_sorted_set(self, key, length, expiry, info):
  83. self._start_key(key, length)
  84. self._out.write(self.encode_key(key) + b':{')
  85. def zadd(self, key, score, member):
  86. self._write_comma()
  87. self._out.write(self.encode_key(member) + b':' + self.encode_value(score))
  88. def end_sorted_set(self, key):
  89. self._end_key(key)
  90. self._out.write(b'}')
  91. class KeysOnlyCallback(RdbCallback):
  92. def __init__(self, out, string_escape=None):
  93. super(KeysOnlyCallback, self).__init__(string_escape)
  94. self._out = out
  95. def _keyout(self, key):
  96. self._out.write(self.encode_key(key) + b'\n')
  97. def set(self, key, value, expiry, info):
  98. self._keyout(key)
  99. def start_hash(self, key, length, expiry, info):
  100. self._keyout(key)
  101. def hset(self, key, field, value):
  102. self._keyout(key)
  103. def start_set(self, key, cardinality, expiry, info):
  104. self._keyout(key)
  105. def sadd(self, key, member):
  106. self._keyout(key)
  107. def start_list(self, key, expiry, info):
  108. self._keyout(key)
  109. def rpush(self, key, value) :
  110. self._keyout(key)
  111. def start_sorted_set(self, key, length, expiry, info):
  112. self._keyout(key)
  113. def zadd(self, key, score, member):
  114. self._keyout(key)
  115. class KeyValsOnlyCallback(RdbCallback):
  116. def __init__(self, out, string_escape=None):
  117. super(KeyValsOnlyCallback, self).__init__(string_escape)
  118. self._out = out
  119. self._is_first_db = True
  120. self._has_databases = False
  121. self._is_first_key_in_db = True
  122. self._elements_in_key = 0
  123. self._element_index = 0
  124. def _start_key(self, key, length):
  125. if not self._is_first_key_in_db:
  126. self._out.write(b',')
  127. self._out.write(b'\r\n')
  128. self._is_first_key_in_db = False
  129. self._elements_in_key = length
  130. self._element_index = 0
  131. def _end_key(self, key):
  132. pass
  133. def _write_comma(self):
  134. if self._element_index > 0 and self._element_index < self._elements_in_key :
  135. self._out.write(b',')
  136. self._element_index = self._element_index + 1
  137. def set(self, key, value, expiry, info):
  138. self._start_key(key, 0)
  139. self._out.write(self.encode_key(key) + b' ' + self.encode_value(value))
  140. def start_hash(self, key, length, expiry, info):
  141. self._start_key(key, length)
  142. self._out.write(self.encode_key(key) + b' ')
  143. def hset(self, key, field, value):
  144. self._write_comma()
  145. self._out.write(self.encode_key(field) + b' ' + self.encode_value(value))
  146. def end_hash(self, key):
  147. self._end_key(key)
  148. def start_set(self, key, cardinality, expiry, info):
  149. self._start_key(key, cardinality)
  150. self._out.write(self.encode_key(key) + b' ')
  151. def sadd(self, key, member):
  152. self._write_comma()
  153. self._out.write(self.encode_value(member))
  154. def end_set(self, key):
  155. self._end_key(key)
  156. def start_list(self, key, expiry, info):
  157. self._start_key(key, 0)
  158. self._out.write(self.encode_key(key) + b' ')
  159. def rpush(self, key, value) :
  160. self._elements_in_key += 1
  161. self._write_comma()
  162. self._out.write(self.encode_value(value))
  163. def end_list(self, key, info):
  164. self._end_key(key)
  165. def start_sorted_set(self, key, length, expiry, info):
  166. self._start_key(key, length)
  167. self._out.write(self.encode_key(key) + b' ')
  168. def zadd(self, key, score, member):
  169. self._write_comma()
  170. self._out.write(self.encode_key(member) + b' ' + self.encode_value(score))
  171. def end_sorted_set(self, key):
  172. self._end_key(key)
  173. class DiffCallback(RdbCallback):
  174. '''Prints the contents of RDB in a format that is unix sort friendly,
  175. so that two rdb files can be diffed easily'''
  176. def __init__(self, out, string_escape=None):
  177. if string_escape is None:
  178. string_escape = encodehelpers.STRING_ESCAPE_PRINT
  179. super(DiffCallback, self).__init__(string_escape)
  180. self._out = out
  181. self._index = 0
  182. self._dbnum = 0
  183. def dbstr(self):
  184. return b'db=' + encodehelpers.num2bytes(self._dbnum) + b' '
  185. def start_rdb(self):
  186. pass
  187. def start_database(self, db_number):
  188. self._dbnum = db_number
  189. def end_database(self, db_number):
  190. pass
  191. def end_rdb(self):
  192. pass
  193. def set(self, key, value, expiry, info):
  194. self._out.write(self.dbstr() + self.encode_key(key) + b' -> ' + self.encode_value(value))
  195. self.newline()
  196. def start_hash(self, key, length, expiry, info):
  197. pass
  198. def hset(self, key, field, value):
  199. self._out.write(
  200. self.dbstr() + self.encode_key(key) + b' . ' + self.encode_key(field) + b' -> ' + self.encode_value(value))
  201. self.newline()
  202. def end_hash(self, key):
  203. pass
  204. def start_set(self, key, cardinality, expiry, info):
  205. pass
  206. def sadd(self, key, member):
  207. self._out.write(self.dbstr() + self.encode_key(key) + b' { ' + self.encode_value(member) + b' }')
  208. self.newline()
  209. def end_set(self, key):
  210. pass
  211. def start_list(self, key, expiry, info):
  212. self._index = 0
  213. def rpush(self, key, value) :
  214. istr = encodehelpers.num2bytes(self._index)
  215. self._out.write(self.dbstr() + self.encode_key(key) + b'[' + istr + b'] -> ' + self.encode_value(value))
  216. self.newline()
  217. self._index = self._index + 1
  218. def end_list(self, key, info):
  219. pass
  220. def start_sorted_set(self, key, length, expiry, info):
  221. pass
  222. def zadd(self, key, score, member):
  223. self._out.write(self.dbstr() + self.encode_key(key) +
  224. b' -> {' + self.encode_key(member) + b', score=' + self.encode_value(score) + b'}')
  225. self.newline()
  226. def end_sorted_set(self, key):
  227. pass
  228. def newline(self):
  229. self._out.write(b'\r\n')
  230. def _unix_timestamp(dt):
  231. return calendar.timegm(dt.utctimetuple())
  232. class ProtocolCallback(RdbCallback):
  233. def __init__(self, out, string_escape=None):
  234. super(ProtocolCallback, self).__init__(string_escape)
  235. self._out = out
  236. self.reset()
  237. def reset(self):
  238. self._expires = {}
  239. def set_expiry(self, key, dt):
  240. self._expires[key] = dt
  241. def get_expiry_seconds(self, key):
  242. if key in self._expires:
  243. return _unix_timestamp(self._expires[key])
  244. return None
  245. def expires(self, key):
  246. return key in self._expires
  247. def pre_expiry(self, key, expiry):
  248. if expiry is not None:
  249. self.set_expiry(key, expiry)
  250. def post_expiry(self, key):
  251. if self.expires(key):
  252. self.expireat(key, self.get_expiry_seconds(key))
  253. def emit(self, *args):
  254. self._out.write(codecs.encode("*%s\r\n" % len(args), 'ascii'))
  255. for arg in args:
  256. val = encodehelpers.apply_escape_bytes(arg, self._escape)
  257. self._out.write(codecs.encode("$%s\r\n" % len(val), 'ascii'))
  258. self._out.write(val + b"\r\n")
  259. def start_database(self, db_number):
  260. self.reset()
  261. self.select(db_number)
  262. # String handling
  263. def set(self, key, value, expiry, info):
  264. self.pre_expiry(key, expiry)
  265. self.emit(b'SET', key, value)
  266. self.post_expiry(key)
  267. # Hash handling
  268. def start_hash(self, key, length, expiry, info):
  269. self.pre_expiry(key, expiry)
  270. def hset(self, key, field, value):
  271. self.emit(b'HSET', key, field, value)
  272. def end_hash(self, key):
  273. self.post_expiry(key)
  274. # Set handling
  275. def start_set(self, key, cardinality, expiry, info):
  276. self.pre_expiry(key, expiry)
  277. def sadd(self, key, member):
  278. self.emit(b'SADD', key, member)
  279. def end_set(self, key):
  280. self.post_expiry(key)
  281. # List handling
  282. def start_list(self, key, expiry, info):
  283. self.pre_expiry(key, expiry)
  284. def rpush(self, key, value):
  285. self.emit(b'RPUSH', key, value)
  286. def end_list(self, key, info):
  287. self.post_expiry(key)
  288. # Sorted set handling
  289. def start_sorted_set(self, key, length, expiry, info):
  290. self.pre_expiry(key, expiry)
  291. def zadd(self, key, score, member):
  292. self.emit(b'ZADD', key, score, member)
  293. def end_sorted_set(self, key):
  294. self.post_expiry(key)
  295. # Other misc commands
  296. def select(self, db_number):
  297. self.emit(b'SELECT', db_number)
  298. def expireat(self, key, timestamp):
  299. self.emit(b'EXPIREAT', key, timestamp)