async.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. # -*- coding: utf-8 -
  2. #
  3. # This file is part of gunicorn released under the MIT license.
  4. # See the NOTICE for more information.
  5. from datetime import datetime
  6. import errno
  7. import socket
  8. import ssl
  9. import sys
  10. import gunicorn.http as http
  11. import gunicorn.http.wsgi as wsgi
  12. import gunicorn.util as util
  13. import gunicorn.workers.base as base
  14. from gunicorn import six
  15. ALREADY_HANDLED = object()
  16. class AsyncWorker(base.Worker):
  17. def __init__(self, *args, **kwargs):
  18. super(AsyncWorker, self).__init__(*args, **kwargs)
  19. self.worker_connections = self.cfg.worker_connections
  20. def timeout_ctx(self):
  21. raise NotImplementedError()
  22. def is_already_handled(self, respiter):
  23. # some workers will need to overload this function to raise a StopIteration
  24. return respiter == ALREADY_HANDLED
  25. def handle(self, listener, client, addr):
  26. req = None
  27. try:
  28. parser = http.RequestParser(self.cfg, client)
  29. try:
  30. listener_name = listener.getsockname()
  31. if not self.cfg.keepalive:
  32. req = six.next(parser)
  33. self.handle_request(listener_name, req, client, addr)
  34. else:
  35. # keepalive loop
  36. proxy_protocol_info = {}
  37. while True:
  38. req = None
  39. with self.timeout_ctx():
  40. req = six.next(parser)
  41. if not req:
  42. break
  43. if req.proxy_protocol_info:
  44. proxy_protocol_info = req.proxy_protocol_info
  45. else:
  46. req.proxy_protocol_info = proxy_protocol_info
  47. self.handle_request(listener_name, req, client, addr)
  48. except http.errors.NoMoreData as e:
  49. self.log.debug("Ignored premature client disconnection. %s", e)
  50. except StopIteration as e:
  51. self.log.debug("Closing connection. %s", e)
  52. except ssl.SSLError:
  53. # pass to next try-except level
  54. six.reraise(*sys.exc_info())
  55. except EnvironmentError:
  56. # pass to next try-except level
  57. six.reraise(*sys.exc_info())
  58. except Exception as e:
  59. self.handle_error(req, client, addr, e)
  60. except ssl.SSLError as e:
  61. if e.args[0] == ssl.SSL_ERROR_EOF:
  62. self.log.debug("ssl connection closed")
  63. client.close()
  64. else:
  65. self.log.debug("Error processing SSL request.")
  66. self.handle_error(req, client, addr, e)
  67. except EnvironmentError as e:
  68. if e.errno not in (errno.EPIPE, errno.ECONNRESET):
  69. self.log.exception("Socket error processing request.")
  70. else:
  71. if e.errno == errno.ECONNRESET:
  72. self.log.debug("Ignoring connection reset")
  73. else:
  74. self.log.debug("Ignoring EPIPE")
  75. except Exception as e:
  76. self.handle_error(req, client, addr, e)
  77. finally:
  78. util.close(client)
  79. def handle_request(self, listener_name, req, sock, addr):
  80. request_start = datetime.now()
  81. environ = {}
  82. resp = None
  83. try:
  84. self.cfg.pre_request(self, req)
  85. resp, environ = wsgi.create(req, sock, addr,
  86. listener_name, self.cfg)
  87. environ["wsgi.multithread"] = True
  88. self.nr += 1
  89. if self.alive and self.nr >= self.max_requests:
  90. self.log.info("Autorestarting worker after current request.")
  91. resp.force_close()
  92. self.alive = False
  93. if not self.cfg.keepalive:
  94. resp.force_close()
  95. respiter = self.wsgi(environ, resp.start_response)
  96. if self.is_already_handled(respiter):
  97. return False
  98. try:
  99. if isinstance(respiter, environ['wsgi.file_wrapper']):
  100. resp.write_file(respiter)
  101. else:
  102. for item in respiter:
  103. resp.write(item)
  104. resp.close()
  105. request_time = datetime.now() - request_start
  106. self.log.access(resp, req, environ, request_time)
  107. finally:
  108. if hasattr(respiter, "close"):
  109. respiter.close()
  110. if resp.should_close():
  111. raise StopIteration()
  112. except StopIteration:
  113. raise
  114. except EnvironmentError:
  115. # If the original exception was a socket.error we delegate
  116. # handling it to the caller (where handle() might ignore it)
  117. six.reraise(*sys.exc_info())
  118. except Exception:
  119. if resp and resp.headers_sent:
  120. # If the requests have already been sent, we should close the
  121. # connection to indicate the error.
  122. self.log.exception("Error handling request")
  123. try:
  124. sock.shutdown(socket.SHUT_RDWR)
  125. sock.close()
  126. except EnvironmentError:
  127. pass
  128. raise StopIteration()
  129. raise
  130. finally:
  131. try:
  132. self.cfg.post_request(self, req, environ, resp)
  133. except Exception:
  134. self.log.exception("Exception in post_request hook")
  135. return True