procmon.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. # -*- test-case-name: twisted.runner.test.test_procmon -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Support for starting, monitoring, and restarting child process.
  6. """
  7. from twisted.python import log
  8. from twisted.internet import error, protocol, reactor as _reactor
  9. from twisted.application import service
  10. from twisted.protocols import basic
  11. class DummyTransport:
  12. disconnecting = 0
  13. transport = DummyTransport()
  14. class LineLogger(basic.LineReceiver):
  15. tag = None
  16. delimiter = b'\n'
  17. def lineReceived(self, line):
  18. try:
  19. line = line.decode('utf-8')
  20. except UnicodeDecodeError:
  21. line = repr(line)
  22. log.msg(u'[%s] %s' % (self.tag, line))
  23. class LoggingProtocol(protocol.ProcessProtocol):
  24. service = None
  25. name = None
  26. empty = 1
  27. def connectionMade(self):
  28. self.output = LineLogger()
  29. self.output.tag = self.name
  30. self.output.makeConnection(transport)
  31. def outReceived(self, data):
  32. self.output.dataReceived(data)
  33. self.empty = data[-1] == b'\n'
  34. errReceived = outReceived
  35. def processEnded(self, reason):
  36. if not self.empty:
  37. self.output.dataReceived(b'\n')
  38. self.service.connectionLost(self.name)
  39. class ProcessMonitor(service.Service):
  40. """
  41. ProcessMonitor runs processes, monitors their progress, and restarts
  42. them when they die.
  43. The ProcessMonitor will not attempt to restart a process that appears to
  44. die instantly -- with each "instant" death (less than 1 second, by
  45. default), it will delay approximately twice as long before restarting
  46. it. A successful run will reset the counter.
  47. The primary interface is L{addProcess} and L{removeProcess}. When the
  48. service is running (that is, when the application it is attached to is
  49. running), adding a process automatically starts it.
  50. Each process has a name. This name string must uniquely identify the
  51. process. In particular, attempting to add two processes with the same
  52. name will result in a C{KeyError}.
  53. @type threshold: C{float}
  54. @ivar threshold: How long a process has to live before the death is
  55. considered instant, in seconds. The default value is 1 second.
  56. @type killTime: C{float}
  57. @ivar killTime: How long a process being killed has to get its affairs
  58. in order before it gets killed with an unmaskable signal. The
  59. default value is 5 seconds.
  60. @type minRestartDelay: C{float}
  61. @ivar minRestartDelay: The minimum time (in seconds) to wait before
  62. attempting to restart a process. Default 1s.
  63. @type maxRestartDelay: C{float}
  64. @ivar maxRestartDelay: The maximum time (in seconds) to wait before
  65. attempting to restart a process. Default 3600s (1h).
  66. @type _reactor: L{IReactorProcess} provider
  67. @ivar _reactor: A provider of L{IReactorProcess} and L{IReactorTime}
  68. which will be used to spawn processes and register delayed calls.
  69. """
  70. threshold = 1
  71. killTime = 5
  72. minRestartDelay = 1
  73. maxRestartDelay = 3600
  74. def __init__(self, reactor=_reactor):
  75. self._reactor = reactor
  76. self.processes = {}
  77. self.protocols = {}
  78. self.delay = {}
  79. self.timeStarted = {}
  80. self.murder = {}
  81. self.restart = {}
  82. def __getstate__(self):
  83. dct = service.Service.__getstate__(self)
  84. del dct['_reactor']
  85. dct['protocols'] = {}
  86. dct['delay'] = {}
  87. dct['timeStarted'] = {}
  88. dct['murder'] = {}
  89. dct['restart'] = {}
  90. return dct
  91. def addProcess(self, name, args, uid=None, gid=None, env={}):
  92. """
  93. Add a new monitored process and start it immediately if the
  94. L{ProcessMonitor} service is running.
  95. Note that args are passed to the system call, not to the shell. If
  96. running the shell is desired, the common idiom is to use
  97. C{ProcessMonitor.addProcess("name", ['/bin/sh', '-c', shell_script])}
  98. @param name: A name for this process. This value must be
  99. unique across all processes added to this monitor.
  100. @type name: C{str}
  101. @param args: The argv sequence for the process to launch.
  102. @param uid: The user ID to use to run the process. If L{None},
  103. the current UID is used.
  104. @type uid: C{int}
  105. @param gid: The group ID to use to run the process. If L{None},
  106. the current GID is used.
  107. @type uid: C{int}
  108. @param env: The environment to give to the launched process. See
  109. L{IReactorProcess.spawnProcess}'s C{env} parameter.
  110. @type env: C{dict}
  111. @raises: C{KeyError} if a process with the given name already
  112. exists
  113. """
  114. if name in self.processes:
  115. raise KeyError("remove %s first" % (name,))
  116. self.processes[name] = args, uid, gid, env
  117. self.delay[name] = self.minRestartDelay
  118. if self.running:
  119. self.startProcess(name)
  120. def removeProcess(self, name):
  121. """
  122. Stop the named process and remove it from the list of monitored
  123. processes.
  124. @type name: C{str}
  125. @param name: A string that uniquely identifies the process.
  126. """
  127. self.stopProcess(name)
  128. del self.processes[name]
  129. def startService(self):
  130. """
  131. Start all monitored processes.
  132. """
  133. service.Service.startService(self)
  134. for name in self.processes:
  135. self.startProcess(name)
  136. def stopService(self):
  137. """
  138. Stop all monitored processes and cancel all scheduled process restarts.
  139. """
  140. service.Service.stopService(self)
  141. # Cancel any outstanding restarts
  142. for name, delayedCall in self.restart.items():
  143. if delayedCall.active():
  144. delayedCall.cancel()
  145. for name in self.processes:
  146. self.stopProcess(name)
  147. def connectionLost(self, name):
  148. """
  149. Called when a monitored processes exits. If
  150. L{service.IService.running} is L{True} (ie the service is started), the
  151. process will be restarted.
  152. If the process had been running for more than
  153. L{ProcessMonitor.threshold} seconds it will be restarted immediately.
  154. If the process had been running for less than
  155. L{ProcessMonitor.threshold} seconds, the restart will be delayed and
  156. each time the process dies before the configured threshold, the restart
  157. delay will be doubled - up to a maximum delay of maxRestartDelay sec.
  158. @type name: C{str}
  159. @param name: A string that uniquely identifies the process
  160. which exited.
  161. """
  162. # Cancel the scheduled _forceStopProcess function if the process
  163. # dies naturally
  164. if name in self.murder:
  165. if self.murder[name].active():
  166. self.murder[name].cancel()
  167. del self.murder[name]
  168. del self.protocols[name]
  169. if self._reactor.seconds() - self.timeStarted[name] < self.threshold:
  170. # The process died too fast - backoff
  171. nextDelay = self.delay[name]
  172. self.delay[name] = min(self.delay[name] * 2, self.maxRestartDelay)
  173. else:
  174. # Process had been running for a significant amount of time
  175. # restart immediately
  176. nextDelay = 0
  177. self.delay[name] = self.minRestartDelay
  178. # Schedule a process restart if the service is running
  179. if self.running and name in self.processes:
  180. self.restart[name] = self._reactor.callLater(nextDelay,
  181. self.startProcess,
  182. name)
  183. def startProcess(self, name):
  184. """
  185. @param name: The name of the process to be started
  186. """
  187. # If a protocol instance already exists, it means the process is
  188. # already running
  189. if name in self.protocols:
  190. return
  191. args, uid, gid, env = self.processes[name]
  192. proto = LoggingProtocol()
  193. proto.service = self
  194. proto.name = name
  195. self.protocols[name] = proto
  196. self.timeStarted[name] = self._reactor.seconds()
  197. self._reactor.spawnProcess(proto, args[0], args, uid=uid,
  198. gid=gid, env=env)
  199. def _forceStopProcess(self, proc):
  200. """
  201. @param proc: An L{IProcessTransport} provider
  202. """
  203. try:
  204. proc.signalProcess('KILL')
  205. except error.ProcessExitedAlready:
  206. pass
  207. def stopProcess(self, name):
  208. """
  209. @param name: The name of the process to be stopped
  210. """
  211. if name not in self.processes:
  212. raise KeyError('Unrecognized process name: %s' % (name,))
  213. proto = self.protocols.get(name, None)
  214. if proto is not None:
  215. proc = proto.transport
  216. try:
  217. proc.signalProcess('TERM')
  218. except error.ProcessExitedAlready:
  219. pass
  220. else:
  221. self.murder[name] = self._reactor.callLater(
  222. self.killTime,
  223. self._forceStopProcess, proc)
  224. def restartAll(self):
  225. """
  226. Restart all processes. This is useful for third party management
  227. services to allow a user to restart servers because of an outside change
  228. in circumstances -- for example, a new version of a library is
  229. installed.
  230. """
  231. for name in self.processes:
  232. self.stopProcess(name)
  233. def __repr__(self):
  234. l = []
  235. for name, proc in self.processes.items():
  236. uidgid = ''
  237. if proc[1] is not None:
  238. uidgid = str(proc[1])
  239. if proc[2] is not None:
  240. uidgid += ':'+str(proc[2])
  241. if uidgid:
  242. uidgid = '(' + uidgid + ')'
  243. l.append('%r%s: %r' % (name, uidgid, proc[0]))
  244. return ('<' + self.__class__.__name__ + ' '
  245. + ' '.join(l)
  246. + '>')