base.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # -*- coding: utf-8 -
  2. #
  3. # This file is part of gunicorn released under the MIT license.
  4. # See the NOTICE for more information.
  5. from datetime import datetime
  6. import os
  7. from random import randint
  8. import signal
  9. from ssl import SSLError
  10. import sys
  11. import time
  12. import traceback
  13. from gunicorn import six
  14. from gunicorn import util
  15. from gunicorn.workers.workertmp import WorkerTmp
  16. from gunicorn.reloader import reloader_engines
  17. from gunicorn.http.errors import (
  18. InvalidHeader, InvalidHeaderName, InvalidRequestLine, InvalidRequestMethod,
  19. InvalidHTTPVersion, LimitRequestLine, LimitRequestHeaders,
  20. )
  21. from gunicorn.http.errors import InvalidProxyLine, ForbiddenProxyRequest
  22. from gunicorn.http.wsgi import default_environ, Response
  23. from gunicorn.six import MAXSIZE
  24. class Worker(object):
  25. SIGNALS = [getattr(signal, "SIG%s" % x)
  26. for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
  27. PIPE = []
  28. def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
  29. """\
  30. This is called pre-fork so it shouldn't do anything to the
  31. current process. If there's a need to make process wide
  32. changes you'll want to do that in ``self.init_process()``.
  33. """
  34. self.age = age
  35. self.pid = "[booting]"
  36. self.ppid = ppid
  37. self.sockets = sockets
  38. self.app = app
  39. self.timeout = timeout
  40. self.cfg = cfg
  41. self.booted = False
  42. self.aborted = False
  43. self.reloader = None
  44. self.nr = 0
  45. jitter = randint(0, cfg.max_requests_jitter)
  46. self.max_requests = cfg.max_requests + jitter or MAXSIZE
  47. self.alive = True
  48. self.log = log
  49. self.tmp = WorkerTmp(cfg)
  50. def __str__(self):
  51. return "<Worker %s>" % self.pid
  52. def notify(self):
  53. """\
  54. Your worker subclass must arrange to have this method called
  55. once every ``self.timeout`` seconds. If you fail in accomplishing
  56. this task, the master process will murder your workers.
  57. """
  58. self.tmp.notify()
  59. def run(self):
  60. """\
  61. This is the mainloop of a worker process. You should override
  62. this method in a subclass to provide the intended behaviour
  63. for your particular evil schemes.
  64. """
  65. raise NotImplementedError()
  66. def init_process(self):
  67. """\
  68. If you override this method in a subclass, the last statement
  69. in the function should be to call this method with
  70. super(MyWorkerClass, self).init_process() so that the ``run()``
  71. loop is initiated.
  72. """
  73. # set environment' variables
  74. if self.cfg.env:
  75. for k, v in self.cfg.env.items():
  76. os.environ[k] = v
  77. util.set_owner_process(self.cfg.uid, self.cfg.gid,
  78. initgroups=self.cfg.initgroups)
  79. # Reseed the random number generator
  80. util.seed()
  81. # For waking ourselves up
  82. self.PIPE = os.pipe()
  83. for p in self.PIPE:
  84. util.set_non_blocking(p)
  85. util.close_on_exec(p)
  86. # Prevent fd inheritance
  87. [util.close_on_exec(s) for s in self.sockets]
  88. util.close_on_exec(self.tmp.fileno())
  89. self.wait_fds = self.sockets + [self.PIPE[0]]
  90. self.log.close_on_exec()
  91. self.init_signals()
  92. # start the reloader
  93. if self.cfg.reload:
  94. def changed(fname):
  95. self.log.info("Worker reloading: %s modified", fname)
  96. self.alive = False
  97. self.cfg.worker_int(self)
  98. time.sleep(0.1)
  99. sys.exit(0)
  100. reloader_cls = reloader_engines[self.cfg.reload_engine]
  101. self.reloader = reloader_cls(callback=changed)
  102. self.reloader.start()
  103. self.load_wsgi()
  104. self.cfg.post_worker_init(self)
  105. # Enter main run loop
  106. self.booted = True
  107. self.run()
  108. def load_wsgi(self):
  109. try:
  110. self.wsgi = self.app.wsgi()
  111. except SyntaxError as e:
  112. if self.cfg.reload == 'off':
  113. raise
  114. self.log.exception(e)
  115. # fix from PR #1228
  116. # storing the traceback into exc_tb will create a circular reference.
  117. # per https://docs.python.org/2/library/sys.html#sys.exc_info warning,
  118. # delete the traceback after use.
  119. try:
  120. exc_type, exc_val, exc_tb = sys.exc_info()
  121. self.reloader.add_extra_file(exc_val.filename)
  122. tb_string = six.StringIO()
  123. traceback.print_tb(exc_tb, file=tb_string)
  124. self.wsgi = util.make_fail_app(tb_string.getvalue())
  125. finally:
  126. del exc_tb
  127. def init_signals(self):
  128. # reset signaling
  129. [signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
  130. # init new signaling
  131. signal.signal(signal.SIGQUIT, self.handle_quit)
  132. signal.signal(signal.SIGTERM, self.handle_exit)
  133. signal.signal(signal.SIGINT, self.handle_quit)
  134. signal.signal(signal.SIGWINCH, self.handle_winch)
  135. signal.signal(signal.SIGUSR1, self.handle_usr1)
  136. signal.signal(signal.SIGABRT, self.handle_abort)
  137. # Don't let SIGTERM and SIGUSR1 disturb active requests
  138. # by interrupting system calls
  139. if hasattr(signal, 'siginterrupt'): # python >= 2.6
  140. signal.siginterrupt(signal.SIGTERM, False)
  141. signal.siginterrupt(signal.SIGUSR1, False)
  142. if hasattr(signal, 'set_wakeup_fd'):
  143. signal.set_wakeup_fd(self.PIPE[1])
  144. def handle_usr1(self, sig, frame):
  145. self.log.reopen_files()
  146. def handle_exit(self, sig, frame):
  147. self.alive = False
  148. def handle_quit(self, sig, frame):
  149. self.alive = False
  150. # worker_int callback
  151. self.cfg.worker_int(self)
  152. time.sleep(0.1)
  153. sys.exit(0)
  154. def handle_abort(self, sig, frame):
  155. self.alive = False
  156. self.cfg.worker_abort(self)
  157. sys.exit(1)
  158. def handle_error(self, req, client, addr, exc):
  159. request_start = datetime.now()
  160. addr = addr or ('', -1) # unix socket case
  161. if isinstance(exc, (InvalidRequestLine, InvalidRequestMethod,
  162. InvalidHTTPVersion, InvalidHeader, InvalidHeaderName,
  163. LimitRequestLine, LimitRequestHeaders,
  164. InvalidProxyLine, ForbiddenProxyRequest,
  165. SSLError)):
  166. status_int = 400
  167. reason = "Bad Request"
  168. if isinstance(exc, InvalidRequestLine):
  169. mesg = "Invalid Request Line '%s'" % str(exc)
  170. elif isinstance(exc, InvalidRequestMethod):
  171. mesg = "Invalid Method '%s'" % str(exc)
  172. elif isinstance(exc, InvalidHTTPVersion):
  173. mesg = "Invalid HTTP Version '%s'" % str(exc)
  174. elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)):
  175. mesg = "%s" % str(exc)
  176. if not req and hasattr(exc, "req"):
  177. req = exc.req # for access log
  178. elif isinstance(exc, LimitRequestLine):
  179. mesg = "%s" % str(exc)
  180. elif isinstance(exc, LimitRequestHeaders):
  181. mesg = "Error parsing headers: '%s'" % str(exc)
  182. elif isinstance(exc, InvalidProxyLine):
  183. mesg = "'%s'" % str(exc)
  184. elif isinstance(exc, ForbiddenProxyRequest):
  185. reason = "Forbidden"
  186. mesg = "Request forbidden"
  187. status_int = 403
  188. elif isinstance(exc, SSLError):
  189. reason = "Forbidden"
  190. mesg = "'%s'" % str(exc)
  191. status_int = 403
  192. msg = "Invalid request from ip={ip}: {error}"
  193. self.log.debug(msg.format(ip=addr[0], error=str(exc)))
  194. else:
  195. if hasattr(req, "uri"):
  196. self.log.exception("Error handling request %s", req.uri)
  197. status_int = 500
  198. reason = "Internal Server Error"
  199. mesg = ""
  200. if req is not None:
  201. request_time = datetime.now() - request_start
  202. environ = default_environ(req, client, self.cfg)
  203. environ['REMOTE_ADDR'] = addr[0]
  204. environ['REMOTE_PORT'] = str(addr[1])
  205. resp = Response(req, client, self.cfg)
  206. resp.status = "%s %s" % (status_int, reason)
  207. resp.response_length = len(mesg)
  208. self.log.access(resp, req, environ, request_time)
  209. try:
  210. util.write_error(client, status_int, reason, mesg)
  211. except:
  212. self.log.debug("Failed to send error message.")
  213. def handle_winch(self, sig, fname):
  214. # Ignore SIGWINCH in worker. Fixes a crash on OpenBSD.
  215. self.log.debug("worker: SIGWINCH ignored.")