process.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. #
  2. # Copyright 2011 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """Utilities for working with multiple processes, including both forking
  16. the server into multiple processes and managing subprocesses.
  17. """
  18. from __future__ import absolute_import, division, print_function
  19. import errno
  20. import os
  21. import signal
  22. import subprocess
  23. import sys
  24. import time
  25. from binascii import hexlify
  26. from tornado.concurrent import Future, future_set_result_unless_cancelled
  27. from tornado import ioloop
  28. from tornado.iostream import PipeIOStream
  29. from tornado.log import gen_log
  30. from tornado.platform.auto import set_close_exec
  31. from tornado import stack_context
  32. from tornado.util import errno_from_exception, PY3
  33. try:
  34. import multiprocessing
  35. except ImportError:
  36. # Multiprocessing is not available on Google App Engine.
  37. multiprocessing = None
  38. if PY3:
  39. long = int
  40. # Re-export this exception for convenience.
  41. try:
  42. CalledProcessError = subprocess.CalledProcessError
  43. except AttributeError:
  44. # The subprocess module exists in Google App Engine, but is empty.
  45. # This module isn't very useful in that case, but it should
  46. # at least be importable.
  47. if 'APPENGINE_RUNTIME' not in os.environ:
  48. raise
  49. def cpu_count():
  50. """Returns the number of processors on this machine."""
  51. if multiprocessing is None:
  52. return 1
  53. try:
  54. return multiprocessing.cpu_count()
  55. except NotImplementedError:
  56. pass
  57. try:
  58. return os.sysconf("SC_NPROCESSORS_CONF")
  59. except (AttributeError, ValueError):
  60. pass
  61. gen_log.error("Could not detect number of processors; assuming 1")
  62. return 1
  63. def _reseed_random():
  64. if 'random' not in sys.modules:
  65. return
  66. import random
  67. # If os.urandom is available, this method does the same thing as
  68. # random.seed (at least as of python 2.6). If os.urandom is not
  69. # available, we mix in the pid in addition to a timestamp.
  70. try:
  71. seed = long(hexlify(os.urandom(16)), 16)
  72. except NotImplementedError:
  73. seed = int(time.time() * 1000) ^ os.getpid()
  74. random.seed(seed)
  75. def _pipe_cloexec():
  76. r, w = os.pipe()
  77. set_close_exec(r)
  78. set_close_exec(w)
  79. return r, w
  80. _task_id = None
  81. def fork_processes(num_processes, max_restarts=100):
  82. """Starts multiple worker processes.
  83. If ``num_processes`` is None or <= 0, we detect the number of cores
  84. available on this machine and fork that number of child
  85. processes. If ``num_processes`` is given and > 0, we fork that
  86. specific number of sub-processes.
  87. Since we use processes and not threads, there is no shared memory
  88. between any server code.
  89. Note that multiple processes are not compatible with the autoreload
  90. module (or the ``autoreload=True`` option to `tornado.web.Application`
  91. which defaults to True when ``debug=True``).
  92. When using multiple processes, no IOLoops can be created or
  93. referenced until after the call to ``fork_processes``.
  94. In each child process, ``fork_processes`` returns its *task id*, a
  95. number between 0 and ``num_processes``. Processes that exit
  96. abnormally (due to a signal or non-zero exit status) are restarted
  97. with the same id (up to ``max_restarts`` times). In the parent
  98. process, ``fork_processes`` returns None if all child processes
  99. have exited normally, but will otherwise only exit by throwing an
  100. exception.
  101. """
  102. global _task_id
  103. assert _task_id is None
  104. if num_processes is None or num_processes <= 0:
  105. num_processes = cpu_count()
  106. gen_log.info("Starting %d processes", num_processes)
  107. children = {}
  108. def start_child(i):
  109. pid = os.fork()
  110. if pid == 0:
  111. # child process
  112. _reseed_random()
  113. global _task_id
  114. _task_id = i
  115. return i
  116. else:
  117. children[pid] = i
  118. return None
  119. for i in range(num_processes):
  120. id = start_child(i)
  121. if id is not None:
  122. return id
  123. num_restarts = 0
  124. while children:
  125. try:
  126. pid, status = os.wait()
  127. except OSError as e:
  128. if errno_from_exception(e) == errno.EINTR:
  129. continue
  130. raise
  131. if pid not in children:
  132. continue
  133. id = children.pop(pid)
  134. if os.WIFSIGNALED(status):
  135. gen_log.warning("child %d (pid %d) killed by signal %d, restarting",
  136. id, pid, os.WTERMSIG(status))
  137. elif os.WEXITSTATUS(status) != 0:
  138. gen_log.warning("child %d (pid %d) exited with status %d, restarting",
  139. id, pid, os.WEXITSTATUS(status))
  140. else:
  141. gen_log.info("child %d (pid %d) exited normally", id, pid)
  142. continue
  143. num_restarts += 1
  144. if num_restarts > max_restarts:
  145. raise RuntimeError("Too many child restarts, giving up")
  146. new_id = start_child(id)
  147. if new_id is not None:
  148. return new_id
  149. # All child processes exited cleanly, so exit the master process
  150. # instead of just returning to right after the call to
  151. # fork_processes (which will probably just start up another IOLoop
  152. # unless the caller checks the return value).
  153. sys.exit(0)
  154. def task_id():
  155. """Returns the current task id, if any.
  156. Returns None if this process was not created by `fork_processes`.
  157. """
  158. global _task_id
  159. return _task_id
  160. class Subprocess(object):
  161. """Wraps ``subprocess.Popen`` with IOStream support.
  162. The constructor is the same as ``subprocess.Popen`` with the following
  163. additions:
  164. * ``stdin``, ``stdout``, and ``stderr`` may have the value
  165. ``tornado.process.Subprocess.STREAM``, which will make the corresponding
  166. attribute of the resulting Subprocess a `.PipeIOStream`. If this option
  167. is used, the caller is responsible for closing the streams when done
  168. with them.
  169. The ``Subprocess.STREAM`` option and the ``set_exit_callback`` and
  170. ``wait_for_exit`` methods do not work on Windows. There is
  171. therefore no reason to use this class instead of
  172. ``subprocess.Popen`` on that platform.
  173. .. versionchanged:: 5.0
  174. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  175. """
  176. STREAM = object()
  177. _initialized = False
  178. _waiting = {} # type: ignore
  179. def __init__(self, *args, **kwargs):
  180. self.io_loop = ioloop.IOLoop.current()
  181. # All FDs we create should be closed on error; those in to_close
  182. # should be closed in the parent process on success.
  183. pipe_fds = []
  184. to_close = []
  185. if kwargs.get('stdin') is Subprocess.STREAM:
  186. in_r, in_w = _pipe_cloexec()
  187. kwargs['stdin'] = in_r
  188. pipe_fds.extend((in_r, in_w))
  189. to_close.append(in_r)
  190. self.stdin = PipeIOStream(in_w)
  191. if kwargs.get('stdout') is Subprocess.STREAM:
  192. out_r, out_w = _pipe_cloexec()
  193. kwargs['stdout'] = out_w
  194. pipe_fds.extend((out_r, out_w))
  195. to_close.append(out_w)
  196. self.stdout = PipeIOStream(out_r)
  197. if kwargs.get('stderr') is Subprocess.STREAM:
  198. err_r, err_w = _pipe_cloexec()
  199. kwargs['stderr'] = err_w
  200. pipe_fds.extend((err_r, err_w))
  201. to_close.append(err_w)
  202. self.stderr = PipeIOStream(err_r)
  203. try:
  204. self.proc = subprocess.Popen(*args, **kwargs)
  205. except:
  206. for fd in pipe_fds:
  207. os.close(fd)
  208. raise
  209. for fd in to_close:
  210. os.close(fd)
  211. for attr in ['stdin', 'stdout', 'stderr', 'pid']:
  212. if not hasattr(self, attr): # don't clobber streams set above
  213. setattr(self, attr, getattr(self.proc, attr))
  214. self._exit_callback = None
  215. self.returncode = None
  216. def set_exit_callback(self, callback):
  217. """Runs ``callback`` when this process exits.
  218. The callback takes one argument, the return code of the process.
  219. This method uses a ``SIGCHLD`` handler, which is a global setting
  220. and may conflict if you have other libraries trying to handle the
  221. same signal. If you are using more than one ``IOLoop`` it may
  222. be necessary to call `Subprocess.initialize` first to designate
  223. one ``IOLoop`` to run the signal handlers.
  224. In many cases a close callback on the stdout or stderr streams
  225. can be used as an alternative to an exit callback if the
  226. signal handler is causing a problem.
  227. """
  228. self._exit_callback = stack_context.wrap(callback)
  229. Subprocess.initialize()
  230. Subprocess._waiting[self.pid] = self
  231. Subprocess._try_cleanup_process(self.pid)
  232. def wait_for_exit(self, raise_error=True):
  233. """Returns a `.Future` which resolves when the process exits.
  234. Usage::
  235. ret = yield proc.wait_for_exit()
  236. This is a coroutine-friendly alternative to `set_exit_callback`
  237. (and a replacement for the blocking `subprocess.Popen.wait`).
  238. By default, raises `subprocess.CalledProcessError` if the process
  239. has a non-zero exit status. Use ``wait_for_exit(raise_error=False)``
  240. to suppress this behavior and return the exit status without raising.
  241. .. versionadded:: 4.2
  242. """
  243. future = Future()
  244. def callback(ret):
  245. if ret != 0 and raise_error:
  246. # Unfortunately we don't have the original args any more.
  247. future.set_exception(CalledProcessError(ret, None))
  248. else:
  249. future_set_result_unless_cancelled(future, ret)
  250. self.set_exit_callback(callback)
  251. return future
  252. @classmethod
  253. def initialize(cls):
  254. """Initializes the ``SIGCHLD`` handler.
  255. The signal handler is run on an `.IOLoop` to avoid locking issues.
  256. Note that the `.IOLoop` used for signal handling need not be the
  257. same one used by individual Subprocess objects (as long as the
  258. ``IOLoops`` are each running in separate threads).
  259. .. versionchanged:: 5.0
  260. The ``io_loop`` argument (deprecated since version 4.1) has been
  261. removed.
  262. """
  263. if cls._initialized:
  264. return
  265. io_loop = ioloop.IOLoop.current()
  266. cls._old_sigchld = signal.signal(
  267. signal.SIGCHLD,
  268. lambda sig, frame: io_loop.add_callback_from_signal(cls._cleanup))
  269. cls._initialized = True
  270. @classmethod
  271. def uninitialize(cls):
  272. """Removes the ``SIGCHLD`` handler."""
  273. if not cls._initialized:
  274. return
  275. signal.signal(signal.SIGCHLD, cls._old_sigchld)
  276. cls._initialized = False
  277. @classmethod
  278. def _cleanup(cls):
  279. for pid in list(cls._waiting.keys()): # make a copy
  280. cls._try_cleanup_process(pid)
  281. @classmethod
  282. def _try_cleanup_process(cls, pid):
  283. try:
  284. ret_pid, status = os.waitpid(pid, os.WNOHANG)
  285. except OSError as e:
  286. if errno_from_exception(e) == errno.ECHILD:
  287. return
  288. if ret_pid == 0:
  289. return
  290. assert ret_pid == pid
  291. subproc = cls._waiting.pop(pid)
  292. subproc.io_loop.add_callback_from_signal(
  293. subproc._set_returncode, status)
  294. def _set_returncode(self, status):
  295. if os.WIFSIGNALED(status):
  296. self.returncode = -os.WTERMSIG(status)
  297. else:
  298. assert os.WIFEXITED(status)
  299. self.returncode = os.WEXITSTATUS(status)
  300. # We've taken over wait() duty from the subprocess.Popen
  301. # object. If we don't inform it of the process's return code,
  302. # it will log a warning at destruction in python 3.6+.
  303. self.proc.returncode = self.returncode
  304. if self._exit_callback:
  305. callback = self._exit_callback
  306. self._exit_callback = None
  307. callback(self.returncode)