baseserver.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. """Base class for implementing servers"""
  2. # Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
  3. import sys
  4. import _socket
  5. import errno
  6. from gevent.greenlet import Greenlet
  7. from gevent.event import Event
  8. from gevent.hub import get_hub
  9. from gevent._compat import string_types, integer_types, xrange
  10. __all__ = ['BaseServer']
  11. # We define a helper function to handle closing the socket in
  12. # do_handle; We'd like to bind it to a kwarg to avoid *any* lookups at
  13. # all, but that's incompatible with the calling convention of
  14. # do_handle. On CPython, this is ~20% faster than creating and calling
  15. # a closure and ~10% faster than using a @staticmethod. (In theory, we
  16. # could create a closure only once in set_handle, to wrap self._handle,
  17. # but this is safer from a backwards compat standpoint.)
  18. # we also avoid unpacking the *args tuple when calling/spawning this object
  19. # for a tiny improvement (benchmark shows a wash)
  20. def _handle_and_close_when_done(handle, close, args_tuple):
  21. try:
  22. return handle(*args_tuple)
  23. finally:
  24. close(*args_tuple)
  25. class BaseServer(object):
  26. """
  27. An abstract base class that implements some common functionality for the servers in gevent.
  28. :param listener: Either be an address that the server should bind
  29. on or a :class:`gevent.socket.socket` instance that is already
  30. bound (and put into listening mode in case of TCP socket).
  31. :keyword handle: If given, the request handler. The request
  32. handler can be defined in a few ways. Most commonly,
  33. subclasses will implement a ``handle`` method as an
  34. instance method. Alternatively, a function can be passed
  35. as the ``handle`` argument to the constructor. In either
  36. case, the handler can later be changed by calling
  37. :meth:`set_handle`.
  38. When the request handler returns, the socket used for the
  39. request will be closed. Therefore, the handler must not return if
  40. the socket is still in use (for example, by manually spawned greenlets).
  41. :keyword spawn: If provided, is called to create a new
  42. greenlet to run the handler. By default,
  43. :func:`gevent.spawn` is used (meaning there is no
  44. artificial limit on the number of concurrent requests). Possible values for *spawn*:
  45. - a :class:`gevent.pool.Pool` instance -- ``handle`` will be executed
  46. using :meth:`gevent.pool.Pool.spawn` only if the pool is not full.
  47. While it is full, no new connections are accepted;
  48. - :func:`gevent.spawn_raw` -- ``handle`` will be executed in a raw
  49. greenlet which has a little less overhead then :class:`gevent.Greenlet` instances spawned by default;
  50. - ``None`` -- ``handle`` will be executed right away, in the :class:`Hub` greenlet.
  51. ``handle`` cannot use any blocking functions as it would mean switching to the :class:`Hub`.
  52. - an integer -- a shortcut for ``gevent.pool.Pool(integer)``
  53. .. versionchanged:: 1.1a1
  54. When the *handle* function returns from processing a connection,
  55. the client socket will be closed. This resolves the non-deterministic
  56. closing of the socket, fixing ResourceWarnings under Python 3 and PyPy.
  57. """
  58. # pylint: disable=too-many-instance-attributes,bare-except,broad-except
  59. #: the number of seconds to sleep in case there was an error in accept() call
  60. #: for consecutive errors the delay will double until it reaches max_delay
  61. #: when accept() finally succeeds the delay will be reset to min_delay again
  62. min_delay = 0.01
  63. max_delay = 1
  64. #: Sets the maximum number of consecutive accepts that a process may perform on
  65. #: a single wake up. High values give higher priority to high connection rates,
  66. #: while lower values give higher priority to already established connections.
  67. #: Default is 100. Note, that in case of multiple working processes on the same
  68. #: listening value, it should be set to a lower value. (pywsgi.WSGIServer sets it
  69. #: to 1 when environ["wsgi.multiprocess"] is true)
  70. max_accept = 100
  71. _spawn = Greenlet.spawn
  72. #: the default timeout that we wait for the client connections to close in stop()
  73. stop_timeout = 1
  74. fatal_errors = (errno.EBADF, errno.EINVAL, errno.ENOTSOCK)
  75. def __init__(self, listener, handle=None, spawn='default'):
  76. self._stop_event = Event()
  77. self._stop_event.set()
  78. self._watcher = None
  79. self._timer = None
  80. self._handle = None
  81. # XXX: FIXME: Subclasses rely on the presence or absence of the
  82. # `socket` attribute to determine whether we are open/should be opened.
  83. # Instead, have it be None.
  84. self.pool = None
  85. try:
  86. self.set_listener(listener)
  87. self.set_spawn(spawn)
  88. self.set_handle(handle)
  89. self.delay = self.min_delay
  90. self.loop = get_hub().loop
  91. if self.max_accept < 1:
  92. raise ValueError('max_accept must be positive int: %r' % (self.max_accept, ))
  93. except:
  94. self.close()
  95. raise
  96. def set_listener(self, listener):
  97. if hasattr(listener, 'accept'):
  98. if hasattr(listener, 'do_handshake'):
  99. raise TypeError('Expected a regular socket, not SSLSocket: %r' % (listener, ))
  100. self.family = listener.family
  101. self.address = listener.getsockname()
  102. self.socket = listener
  103. else:
  104. self.family, self.address = parse_address(listener)
  105. def set_spawn(self, spawn):
  106. if spawn == 'default':
  107. self.pool = None
  108. self._spawn = self._spawn
  109. elif hasattr(spawn, 'spawn'):
  110. self.pool = spawn
  111. self._spawn = spawn.spawn
  112. elif isinstance(spawn, integer_types):
  113. from gevent.pool import Pool
  114. self.pool = Pool(spawn)
  115. self._spawn = self.pool.spawn
  116. else:
  117. self.pool = None
  118. self._spawn = spawn
  119. if hasattr(self.pool, 'full'):
  120. self.full = self.pool.full
  121. if self.pool is not None:
  122. self.pool._semaphore.rawlink(self._start_accepting_if_started)
  123. def set_handle(self, handle):
  124. if handle is not None:
  125. self.handle = handle
  126. if hasattr(self, 'handle'):
  127. self._handle = self.handle
  128. else:
  129. raise TypeError("'handle' must be provided")
  130. def _start_accepting_if_started(self, _event=None):
  131. if self.started:
  132. self.start_accepting()
  133. def start_accepting(self):
  134. if self._watcher is None:
  135. # just stop watcher without creating a new one?
  136. self._watcher = self.loop.io(self.socket.fileno(), 1)
  137. self._watcher.start(self._do_read)
  138. def stop_accepting(self):
  139. if self._watcher is not None:
  140. self._watcher.stop()
  141. self._watcher = None
  142. if self._timer is not None:
  143. self._timer.stop()
  144. self._timer = None
  145. def do_handle(self, *args):
  146. spawn = self._spawn
  147. handle = self._handle
  148. close = self.do_close
  149. try:
  150. if spawn is None:
  151. _handle_and_close_when_done(handle, close, args)
  152. else:
  153. spawn(_handle_and_close_when_done, handle, close, args)
  154. except:
  155. close(*args)
  156. raise
  157. def do_close(self, *args):
  158. pass
  159. def do_read(self):
  160. raise NotImplementedError()
  161. def _do_read(self):
  162. for _ in xrange(self.max_accept):
  163. if self.full():
  164. self.stop_accepting()
  165. return
  166. try:
  167. args = self.do_read()
  168. self.delay = self.min_delay
  169. if not args:
  170. return
  171. except:
  172. self.loop.handle_error(self, *sys.exc_info())
  173. ex = sys.exc_info()[1]
  174. if self.is_fatal_error(ex):
  175. self.close()
  176. sys.stderr.write('ERROR: %s failed with %s\n' % (self, str(ex) or repr(ex)))
  177. return
  178. if self.delay >= 0:
  179. self.stop_accepting()
  180. self._timer = self.loop.timer(self.delay)
  181. self._timer.start(self._start_accepting_if_started)
  182. self.delay = min(self.max_delay, self.delay * 2)
  183. break
  184. else:
  185. try:
  186. self.do_handle(*args)
  187. except:
  188. self.loop.handle_error((args[1:], self), *sys.exc_info())
  189. if self.delay >= 0:
  190. self.stop_accepting()
  191. self._timer = self.loop.timer(self.delay)
  192. self._timer.start(self._start_accepting_if_started)
  193. self.delay = min(self.max_delay, self.delay * 2)
  194. break
  195. def full(self):
  196. # copied from self.pool
  197. # pylint: disable=method-hidden
  198. return False
  199. def __repr__(self):
  200. return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._formatinfo())
  201. def __str__(self):
  202. return '<%s %s>' % (type(self).__name__, self._formatinfo())
  203. def _formatinfo(self):
  204. if hasattr(self, 'socket'):
  205. try:
  206. fileno = self.socket.fileno()
  207. except Exception as ex:
  208. fileno = str(ex)
  209. result = 'fileno=%s ' % fileno
  210. else:
  211. result = ''
  212. try:
  213. if isinstance(self.address, tuple) and len(self.address) == 2:
  214. result += 'address=%s:%s' % self.address
  215. else:
  216. result += 'address=%s' % (self.address, )
  217. except Exception as ex:
  218. result += str(ex) or '<error>'
  219. handle = self.__dict__.get('handle')
  220. if handle is not None:
  221. fself = getattr(handle, '__self__', None)
  222. try:
  223. if fself is self:
  224. # Checks the __self__ of the handle in case it is a bound
  225. # method of self to prevent recursivly defined reprs.
  226. handle_repr = '<bound method %s.%s of self>' % (
  227. self.__class__.__name__,
  228. handle.__name__,
  229. )
  230. else:
  231. handle_repr = repr(handle)
  232. result += ' handle=' + handle_repr
  233. except Exception as ex:
  234. result += str(ex) or '<error>'
  235. return result
  236. @property
  237. def server_host(self):
  238. """IP address that the server is bound to (string)."""
  239. if isinstance(self.address, tuple):
  240. return self.address[0]
  241. @property
  242. def server_port(self):
  243. """Port that the server is bound to (an integer)."""
  244. if isinstance(self.address, tuple):
  245. return self.address[1]
  246. def init_socket(self):
  247. """If the user initialized the server with an address rather than socket,
  248. then this function will create a socket, bind it and put it into listening mode.
  249. It is not supposed to be called by the user, it is called by :meth:`start` before starting
  250. the accept loop."""
  251. pass
  252. @property
  253. def started(self):
  254. return not self._stop_event.is_set()
  255. def start(self):
  256. """Start accepting the connections.
  257. If an address was provided in the constructor, then also create a socket,
  258. bind it and put it into the listening mode.
  259. """
  260. self.init_socket()
  261. self._stop_event.clear()
  262. try:
  263. self.start_accepting()
  264. except:
  265. self.close()
  266. raise
  267. def close(self):
  268. """Close the listener socket and stop accepting."""
  269. self._stop_event.set()
  270. try:
  271. self.stop_accepting()
  272. finally:
  273. try:
  274. self.socket.close()
  275. except Exception:
  276. pass
  277. finally:
  278. self.__dict__.pop('socket', None)
  279. self.__dict__.pop('handle', None)
  280. self.__dict__.pop('_handle', None)
  281. self.__dict__.pop('_spawn', None)
  282. self.__dict__.pop('full', None)
  283. if self.pool is not None:
  284. self.pool._semaphore.unlink(self._start_accepting_if_started)
  285. @property
  286. def closed(self):
  287. return not hasattr(self, 'socket')
  288. def stop(self, timeout=None):
  289. """
  290. Stop accepting the connections and close the listening socket.
  291. If the server uses a pool to spawn the requests, then
  292. :meth:`stop` also waits for all the handlers to exit. If there
  293. are still handlers executing after *timeout* has expired
  294. (default 1 second, :attr:`stop_timeout`), then the currently
  295. running handlers in the pool are killed.
  296. If the server does not use a pool, then this merely stops accepting connections;
  297. any spawned greenlets that are handling requests continue running until
  298. they naturally complete.
  299. """
  300. self.close()
  301. if timeout is None:
  302. timeout = self.stop_timeout
  303. if self.pool:
  304. self.pool.join(timeout=timeout)
  305. self.pool.kill(block=True, timeout=1)
  306. def serve_forever(self, stop_timeout=None):
  307. """Start the server if it hasn't been already started and wait until it's stopped."""
  308. # add test that serve_forever exists on stop()
  309. if not self.started:
  310. self.start()
  311. try:
  312. self._stop_event.wait()
  313. finally:
  314. Greenlet.spawn(self.stop, timeout=stop_timeout).join()
  315. def is_fatal_error(self, ex):
  316. return isinstance(ex, _socket.error) and ex.args[0] in self.fatal_errors
  317. def _extract_family(host):
  318. if host.startswith('[') and host.endswith(']'):
  319. host = host[1:-1]
  320. return _socket.AF_INET6, host
  321. return _socket.AF_INET, host
  322. def _parse_address(address):
  323. if isinstance(address, tuple):
  324. if not address[0] or ':' in address[0]:
  325. return _socket.AF_INET6, address
  326. return _socket.AF_INET, address
  327. if ((isinstance(address, string_types) and ':' not in address)
  328. or isinstance(address, integer_types)): # noqa (pep8 E129)
  329. # Just a port
  330. return _socket.AF_INET6, ('', int(address))
  331. if not isinstance(address, string_types):
  332. raise TypeError('Expected tuple or string, got %s' % type(address))
  333. host, port = address.rsplit(':', 1)
  334. family, host = _extract_family(host)
  335. if host == '*':
  336. host = ''
  337. return family, (host, int(port))
  338. def parse_address(address):
  339. try:
  340. return _parse_address(address)
  341. except ValueError as ex:
  342. raise ValueError('Failed to parse address %r: %s' % (address, ex))