geventlet.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # -*- coding: utf-8 -
  2. #
  3. # This file is part of gunicorn released under the MIT license.
  4. # See the NOTICE for more information.
  5. from functools import partial
  6. import errno
  7. import sys
  8. try:
  9. import eventlet
  10. except ImportError:
  11. raise RuntimeError("You need eventlet installed to use this worker.")
  12. # validate the eventlet version
  13. if eventlet.version_info < (0, 9, 7):
  14. raise RuntimeError("You need eventlet >= 0.9.7")
  15. from eventlet import hubs, greenthread
  16. from eventlet.greenio import GreenSocket
  17. from eventlet.hubs import trampoline
  18. from eventlet.wsgi import ALREADY_HANDLED as EVENTLET_ALREADY_HANDLED
  19. import greenlet
  20. from gunicorn.http.wsgi import sendfile as o_sendfile
  21. from gunicorn.workers.async import AsyncWorker
  22. def _eventlet_sendfile(fdout, fdin, offset, nbytes):
  23. while True:
  24. try:
  25. return o_sendfile(fdout, fdin, offset, nbytes)
  26. except OSError as e:
  27. if e.args[0] == errno.EAGAIN:
  28. trampoline(fdout, write=True)
  29. else:
  30. raise
  31. def _eventlet_serve(sock, handle, concurrency):
  32. """
  33. Serve requests forever.
  34. This code is nearly identical to ``eventlet.convenience.serve`` except
  35. that it attempts to join the pool at the end, which allows for gunicorn
  36. graceful shutdowns.
  37. """
  38. pool = eventlet.greenpool.GreenPool(concurrency)
  39. server_gt = eventlet.greenthread.getcurrent()
  40. while True:
  41. try:
  42. conn, addr = sock.accept()
  43. gt = pool.spawn(handle, conn, addr)
  44. gt.link(_eventlet_stop, server_gt, conn)
  45. conn, addr, gt = None, None, None
  46. except eventlet.StopServe:
  47. sock.close()
  48. pool.waitall()
  49. return
  50. def _eventlet_stop(client, server, conn):
  51. """
  52. Stop a greenlet handling a request and close its connection.
  53. This code is lifted from eventlet so as not to depend on undocumented
  54. functions in the library.
  55. """
  56. try:
  57. try:
  58. client.wait()
  59. finally:
  60. conn.close()
  61. except greenlet.GreenletExit:
  62. pass
  63. except Exception:
  64. greenthread.kill(server, *sys.exc_info())
  65. def patch_sendfile():
  66. from gunicorn.http import wsgi
  67. if o_sendfile is not None:
  68. setattr(wsgi, "sendfile", _eventlet_sendfile)
  69. class EventletWorker(AsyncWorker):
  70. def patch(self):
  71. hubs.use_hub()
  72. eventlet.monkey_patch(os=False)
  73. patch_sendfile()
  74. def is_already_handled(self, respiter):
  75. if respiter == EVENTLET_ALREADY_HANDLED:
  76. raise StopIteration()
  77. else:
  78. return super(EventletWorker, self).is_already_handled(respiter)
  79. def init_process(self):
  80. self.patch()
  81. super(EventletWorker, self).init_process()
  82. def handle_quit(self, sig, frame):
  83. eventlet.spawn(super(EventletWorker, self).handle_quit, sig, frame)
  84. def timeout_ctx(self):
  85. return eventlet.Timeout(self.cfg.keepalive or None, False)
  86. def handle(self, listener, client, addr):
  87. if self.cfg.is_ssl:
  88. client = eventlet.wrap_ssl(client, server_side=True,
  89. **self.cfg.ssl_options)
  90. super(EventletWorker, self).handle(listener, client, addr)
  91. def run(self):
  92. acceptors = []
  93. for sock in self.sockets:
  94. gsock = GreenSocket(sock)
  95. gsock.setblocking(1)
  96. hfun = partial(self.handle, gsock)
  97. acceptor = eventlet.spawn(_eventlet_serve, gsock, hfun,
  98. self.worker_connections)
  99. acceptors.append(acceptor)
  100. eventlet.sleep(0.0)
  101. while self.alive:
  102. self.notify()
  103. eventlet.sleep(1.0)
  104. self.notify()
  105. try:
  106. with eventlet.Timeout(self.cfg.graceful_timeout) as t:
  107. [a.kill(eventlet.StopServe()) for a in acceptors]
  108. [a.wait() for a in acceptors]
  109. except eventlet.Timeout as te:
  110. if te != t:
  111. raise
  112. [a.kill() for a in acceptors]