fake_redis_server.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. """
  4. Taken from http://charlesleifer.com/blog/building-a-simple-redis-server-with-python/
  5. """
  6. from gevent import socket
  7. from gevent.pool import Pool
  8. from gevent.server import StreamServer
  9. from collections import namedtuple
  10. from io import BytesIO
  11. from socket import error
  12. import logging
  13. logger = logging.getLogger(__name__)
  14. class CommandError(Exception): pass
  15. class Disconnect(Exception): pass
  16. Error = namedtuple('Error', ('message',))
  17. class ProtocolHandler(object):
  18. def __init__(self):
  19. self.handlers = {
  20. '+': self.handle_simple_string,
  21. '-': self.handle_error,
  22. ':': self.handle_integer,
  23. '$': self.handle_string,
  24. '*': self.handle_array,
  25. '%': self.handle_dict
  26. }
  27. def handle_request(self, socket_file):
  28. first_byte = socket_file.read(1)
  29. if not first_byte:
  30. raise Disconnect()
  31. try:
  32. # Delegate to the appropriate handler based on the first byte.
  33. return self.handlers[first_byte](socket_file)
  34. except KeyError:
  35. raise CommandError('bad request')
  36. def handle_simple_string(self, socket_file):
  37. return socket_file.readline().rstrip('\r\n')
  38. def handle_error(self, socket_file):
  39. return Error(socket_file.readline().rstrip('\r\n'))
  40. def handle_integer(self, socket_file):
  41. return int(socket_file.readline().rstrip('\r\n'))
  42. def handle_string(self, socket_file):
  43. # First read the length ($<length>\r\n).
  44. length = int(socket_file.readline().rstrip('\r\n'))
  45. if length == -1:
  46. return None # Special-case for NULLs.
  47. length += 2 # Include the trailing \r\n in count.
  48. return socket_file.read(length)[:-2]
  49. def handle_array(self, socket_file):
  50. num_elements = int(socket_file.readline().rstrip('\r\n'))
  51. return [self.handle_request(socket_file) for _ in range(num_elements)]
  52. def handle_dict(self, socket_file):
  53. num_items = int(socket_file.readline().rstrip('\r\n'))
  54. elements = [self.handle_request(socket_file)
  55. for _ in range(num_items * 2)]
  56. return dict(zip(elements[::2], elements[1::2]))
  57. def write_response(self, socket_file, data):
  58. buf = BytesIO()
  59. self._write(buf, data)
  60. buf.seek(0)
  61. socket_file.write(buf.getvalue())
  62. socket_file.flush()
  63. def _write(self, buf, data):
  64. if isinstance(data, str):
  65. data = data.encode('utf-8')
  66. if isinstance(data, bytes):
  67. buf.write('$%s\r\n%s\r\n' % (len(data), data))
  68. elif isinstance(data, int):
  69. buf.write(':%s\r\n' % data)
  70. elif isinstance(data, Error):
  71. buf.write('-%s\r\n' % error.message)
  72. elif isinstance(data, (list, tuple)):
  73. buf.write('*%s\r\n' % len(data))
  74. for item in data:
  75. self._write(buf, item)
  76. elif isinstance(data, dict):
  77. buf.write('%%%s\r\n' % len(data))
  78. for key in data:
  79. self._write(buf, key)
  80. self._write(buf, data[key])
  81. elif data is None:
  82. buf.write('$-1\r\n')
  83. else:
  84. raise CommandError('unrecognized type: %s' % type(data))
  85. class Server(object):
  86. def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
  87. self._pool = Pool(max_clients)
  88. self._server = StreamServer(
  89. (host, port),
  90. self.connection_handler,
  91. spawn=self._pool)
  92. self._protocol = ProtocolHandler()
  93. self._kv = {}
  94. self._commands = self.get_commands()
  95. def get_commands(self):
  96. return {
  97. 'GET': self.get,
  98. 'SET': self.set,
  99. 'DELETE': self.delete,
  100. 'FLUSH': self.flush,
  101. 'MGET': self.mget,
  102. 'MSET': self.mset
  103. }
  104. def connection_handler(self, conn, address):
  105. logger.info('Connection received: %s:%s' % address)
  106. # Convert "conn" (a socket object) into a file-like object.
  107. socket_file = conn.makefile('rwb')
  108. # Process client requests until client disconnects.
  109. while True:
  110. try:
  111. data = self._protocol.handle_request(socket_file)
  112. except Disconnect:
  113. logger.info('Client went away: %s:%s' % address)
  114. break
  115. try:
  116. resp = self.get_response(data)
  117. except CommandError as exc:
  118. logger.exception('Command error')
  119. resp = Error(exc.args[0])
  120. self._protocol.write_response(socket_file, resp)
  121. def run(self):
  122. self._server.serve_forever()
  123. def get_response(self, data):
  124. if not isinstance(data, list):
  125. try:
  126. data = data.split()
  127. except:
  128. raise CommandError('Request must be list or simple string.')
  129. if not data:
  130. raise CommandError('Missing command')
  131. command = data[0].upper()
  132. if command not in self._commands:
  133. raise CommandError('Unrecognized command: %s' % command)
  134. else:
  135. logger.debug('Received %s', command)
  136. return self._commands[command](*data[1:])
  137. def get(self, key):
  138. return self._kv.get(key)
  139. def set(self, key, value):
  140. self._kv[key] = value
  141. return 1
  142. def delete(self, key):
  143. if key in self._kv:
  144. del self._kv[key]
  145. return 1
  146. return 0
  147. def flush(self):
  148. kvlen = len(self._kv)
  149. self._kv.clear()
  150. return kvlen
  151. def mget(self, *keys):
  152. return [self._kv.get(key) for key in keys]
  153. def mset(self, *items):
  154. data = zip(items[::2], items[1::2])
  155. for key, value in data:
  156. self._kv[key] = value
  157. return len(data)
  158. class Client(object):
  159. def __init__(self, host='127.0.0.1', port=31337):
  160. self._protocol = ProtocolHandler()
  161. self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  162. self._socket.connect((host, port))
  163. self._fh = self._socket.makefile('rwb')
  164. def execute(self, *args):
  165. self._protocol.write_response(self._fh, args)
  166. resp = self._protocol.handle_request(self._fh)
  167. if isinstance(resp, Error):
  168. raise CommandError(resp.message)
  169. return resp
  170. def get(self, key):
  171. return self.execute('GET', key)
  172. def set(self, key, value):
  173. return self.execute('SET', key, value)
  174. def delete(self, key):
  175. return self.execute('DELETE', key)
  176. def flush(self):
  177. return self.execute('FLUSH')
  178. def mget(self, *keys):
  179. return self.execute('MGET', *keys)
  180. def mset(self, *items):
  181. return self.execute('MSET', *items)
  182. if __name__ == '__main__':
  183. from gevent import monkey
  184. monkey.patch_all()
  185. logger.addHandler(logging.StreamHandler())
  186. logger.setLevel(logging.DEBUG)
  187. Server().run()