test_procmon.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.runner.procmon}.
  5. """
  6. from twisted.trial import unittest
  7. from twisted.runner.procmon import LoggingProtocol, ProcessMonitor
  8. from twisted.internet.error import (ProcessDone, ProcessTerminated,
  9. ProcessExitedAlready)
  10. from twisted.internet.task import Clock
  11. from twisted.python.failure import Failure
  12. from twisted.python import log
  13. from twisted.test.proto_helpers import MemoryReactor
  14. class DummyProcess(object):
  15. """
  16. An incomplete and fake L{IProcessTransport} implementation for testing how
  17. L{ProcessMonitor} behaves when its monitored processes exit.
  18. @ivar _terminationDelay: the delay in seconds after which the DummyProcess
  19. will appear to exit when it receives a TERM signal
  20. """
  21. pid = 1
  22. proto = None
  23. _terminationDelay = 1
  24. def __init__(self, reactor, executable, args, environment, path,
  25. proto, uid=None, gid=None, usePTY=0, childFDs=None):
  26. self.proto = proto
  27. self._reactor = reactor
  28. self._executable = executable
  29. self._args = args
  30. self._environment = environment
  31. self._path = path
  32. self._uid = uid
  33. self._gid = gid
  34. self._usePTY = usePTY
  35. self._childFDs = childFDs
  36. def signalProcess(self, signalID):
  37. """
  38. A partial implementation of signalProcess which can only handle TERM and
  39. KILL signals.
  40. - When a TERM signal is given, the dummy process will appear to exit
  41. after L{DummyProcess._terminationDelay} seconds with exit code 0
  42. - When a KILL signal is given, the dummy process will appear to exit
  43. immediately with exit code 1.
  44. @param signalID: The signal name or number to be issued to the process.
  45. @type signalID: C{str}
  46. """
  47. params = {
  48. "TERM": (self._terminationDelay, 0),
  49. "KILL": (0, 1)
  50. }
  51. if self.pid is None:
  52. raise ProcessExitedAlready()
  53. if signalID in params:
  54. delay, status = params[signalID]
  55. self._signalHandler = self._reactor.callLater(
  56. delay, self.processEnded, status)
  57. def processEnded(self, status):
  58. """
  59. Deliver the process ended event to C{self.proto}.
  60. """
  61. self.pid = None
  62. statusMap = {
  63. 0: ProcessDone,
  64. 1: ProcessTerminated,
  65. }
  66. self.proto.processEnded(Failure(statusMap[status](status)))
  67. class DummyProcessReactor(MemoryReactor, Clock):
  68. """
  69. @ivar spawnedProcesses: a list that keeps track of the fake process
  70. instances built by C{spawnProcess}.
  71. @type spawnedProcesses: C{list}
  72. """
  73. def __init__(self):
  74. MemoryReactor.__init__(self)
  75. Clock.__init__(self)
  76. self.spawnedProcesses = []
  77. def spawnProcess(self, processProtocol, executable, args=(), env={},
  78. path=None, uid=None, gid=None, usePTY=0,
  79. childFDs=None):
  80. """
  81. Fake L{reactor.spawnProcess}, that logs all the process
  82. arguments and returns a L{DummyProcess}.
  83. """
  84. proc = DummyProcess(self, executable, args, env, path,
  85. processProtocol, uid, gid, usePTY, childFDs)
  86. processProtocol.makeConnection(proc)
  87. self.spawnedProcesses.append(proc)
  88. return proc
  89. class ProcmonTests(unittest.TestCase):
  90. """
  91. Tests for L{ProcessMonitor}.
  92. """
  93. def setUp(self):
  94. """
  95. Create an L{ProcessMonitor} wrapped around a fake reactor.
  96. """
  97. self.reactor = DummyProcessReactor()
  98. self.pm = ProcessMonitor(reactor=self.reactor)
  99. self.pm.minRestartDelay = 2
  100. self.pm.maxRestartDelay = 10
  101. self.pm.threshold = 10
  102. def test_getStateIncludesProcesses(self):
  103. """
  104. The list of monitored processes must be included in the pickle state.
  105. """
  106. self.pm.addProcess("foo", ["arg1", "arg2"],
  107. uid=1, gid=2, env={})
  108. self.assertEqual(self.pm.__getstate__()['processes'],
  109. {'foo': (['arg1', 'arg2'], 1, 2, {})})
  110. def test_getStateExcludesReactor(self):
  111. """
  112. The private L{ProcessMonitor._reactor} instance variable should not be
  113. included in the pickle state.
  114. """
  115. self.assertNotIn('_reactor', self.pm.__getstate__())
  116. def test_addProcess(self):
  117. """
  118. L{ProcessMonitor.addProcess} only starts the named program if
  119. L{ProcessMonitor.startService} has been called.
  120. """
  121. self.pm.addProcess("foo", ["arg1", "arg2"],
  122. uid=1, gid=2, env={})
  123. self.assertEqual(self.pm.protocols, {})
  124. self.assertEqual(self.pm.processes,
  125. {"foo": (["arg1", "arg2"], 1, 2, {})})
  126. self.pm.startService()
  127. self.reactor.advance(0)
  128. self.assertEqual(list(self.pm.protocols.keys()), ["foo"])
  129. def test_addProcessDuplicateKeyError(self):
  130. """
  131. L{ProcessMonitor.addProcess} raises a C{KeyError} if a process with the
  132. given name already exists.
  133. """
  134. self.pm.addProcess("foo", ["arg1", "arg2"],
  135. uid=1, gid=2, env={})
  136. self.assertRaises(KeyError, self.pm.addProcess,
  137. "foo", ["arg1", "arg2"], uid=1, gid=2, env={})
  138. def test_addProcessEnv(self):
  139. """
  140. L{ProcessMonitor.addProcess} takes an C{env} parameter that is passed to
  141. L{IReactorProcess.spawnProcess}.
  142. """
  143. fakeEnv = {"KEY": "value"}
  144. self.pm.startService()
  145. self.pm.addProcess("foo", ["foo"], uid=1, gid=2, env=fakeEnv)
  146. self.reactor.advance(0)
  147. self.assertEqual(
  148. self.reactor.spawnedProcesses[0]._environment, fakeEnv)
  149. def test_removeProcess(self):
  150. """
  151. L{ProcessMonitor.removeProcess} removes the process from the public
  152. processes list.
  153. """
  154. self.pm.startService()
  155. self.pm.addProcess("foo", ["foo"])
  156. self.assertEqual(len(self.pm.processes), 1)
  157. self.pm.removeProcess("foo")
  158. self.assertEqual(len(self.pm.processes), 0)
  159. def test_removeProcessUnknownKeyError(self):
  160. """
  161. L{ProcessMonitor.removeProcess} raises a C{KeyError} if the given
  162. process name isn't recognised.
  163. """
  164. self.pm.startService()
  165. self.assertRaises(KeyError, self.pm.removeProcess, "foo")
  166. def test_startProcess(self):
  167. """
  168. When a process has been started, an instance of L{LoggingProtocol} will
  169. be added to the L{ProcessMonitor.protocols} dict and the start time of
  170. the process will be recorded in the L{ProcessMonitor.timeStarted}
  171. dictionary.
  172. """
  173. self.pm.addProcess("foo", ["foo"])
  174. self.pm.startProcess("foo")
  175. self.assertIsInstance(self.pm.protocols["foo"], LoggingProtocol)
  176. self.assertIn("foo", self.pm.timeStarted.keys())
  177. def test_startProcessAlreadyStarted(self):
  178. """
  179. L{ProcessMonitor.startProcess} silently returns if the named process is
  180. already started.
  181. """
  182. self.pm.addProcess("foo", ["foo"])
  183. self.pm.startProcess("foo")
  184. self.assertIsNone(self.pm.startProcess("foo"))
  185. def test_startProcessUnknownKeyError(self):
  186. """
  187. L{ProcessMonitor.startProcess} raises a C{KeyError} if the given
  188. process name isn't recognised.
  189. """
  190. self.assertRaises(KeyError, self.pm.startProcess, "foo")
  191. def test_stopProcessNaturalTermination(self):
  192. """
  193. L{ProcessMonitor.stopProcess} immediately sends a TERM signal to the
  194. named process.
  195. """
  196. self.pm.startService()
  197. self.pm.addProcess("foo", ["foo"])
  198. self.assertIn("foo", self.pm.protocols)
  199. # Configure fake process to die 1 second after receiving term signal
  200. timeToDie = self.pm.protocols["foo"].transport._terminationDelay = 1
  201. # Advance the reactor to just before the short lived process threshold
  202. # and leave enough time for the process to die
  203. self.reactor.advance(self.pm.threshold)
  204. # Then signal the process to stop
  205. self.pm.stopProcess("foo")
  206. # Advance the reactor just enough to give the process time to die and
  207. # verify that the process restarts
  208. self.reactor.advance(timeToDie)
  209. # We expect it to be restarted immediately
  210. self.assertEqual(self.reactor.seconds(),
  211. self.pm.timeStarted["foo"])
  212. def test_stopProcessForcedKill(self):
  213. """
  214. L{ProcessMonitor.stopProcess} kills a process which fails to terminate
  215. naturally within L{ProcessMonitor.killTime} seconds.
  216. """
  217. self.pm.startService()
  218. self.pm.addProcess("foo", ["foo"])
  219. self.assertIn("foo", self.pm.protocols)
  220. self.reactor.advance(self.pm.threshold)
  221. proc = self.pm.protocols["foo"].transport
  222. # Arrange for the fake process to live longer than the killTime
  223. proc._terminationDelay = self.pm.killTime + 1
  224. self.pm.stopProcess("foo")
  225. # If process doesn't die before the killTime, procmon should
  226. # terminate it
  227. self.reactor.advance(self.pm.killTime - 1)
  228. self.assertEqual(0.0, self.pm.timeStarted["foo"])
  229. self.reactor.advance(1)
  230. # We expect it to be immediately restarted
  231. self.assertEqual(self.reactor.seconds(), self.pm.timeStarted["foo"])
  232. def test_stopProcessUnknownKeyError(self):
  233. """
  234. L{ProcessMonitor.stopProcess} raises a C{KeyError} if the given process
  235. name isn't recognised.
  236. """
  237. self.assertRaises(KeyError, self.pm.stopProcess, "foo")
  238. def test_stopProcessAlreadyStopped(self):
  239. """
  240. L{ProcessMonitor.stopProcess} silently returns if the named process
  241. is already stopped. eg Process has crashed and a restart has been
  242. rescheduled, but in the meantime, the service is stopped.
  243. """
  244. self.pm.addProcess("foo", ["foo"])
  245. self.assertIsNone(self.pm.stopProcess("foo"))
  246. def test_outputReceivedCompleteLine(self):
  247. """
  248. Getting a complete output line generates a log message.
  249. """
  250. events = []
  251. self.addCleanup(log.removeObserver, events.append)
  252. log.addObserver(events.append)
  253. self.pm.addProcess("foo", ["foo"])
  254. # Schedule the process to start
  255. self.pm.startService()
  256. # Advance the reactor to start the process
  257. self.reactor.advance(0)
  258. self.assertIn("foo", self.pm.protocols)
  259. # Long time passes
  260. self.reactor.advance(self.pm.threshold)
  261. # Process greets
  262. self.pm.protocols["foo"].outReceived(b'hello world!\n')
  263. self.assertEquals(len(events), 1)
  264. message = events[0]['message']
  265. self.assertEquals(message, tuple([u'[foo] hello world!']))
  266. def test_outputReceivedCompleteLineInvalidUTF8(self):
  267. """
  268. Getting invalid UTF-8 results in the repr of the raw message
  269. """
  270. events = []
  271. self.addCleanup(log.removeObserver, events.append)
  272. log.addObserver(events.append)
  273. self.pm.addProcess("foo", ["foo"])
  274. # Schedule the process to start
  275. self.pm.startService()
  276. # Advance the reactor to start the process
  277. self.reactor.advance(0)
  278. self.assertIn("foo", self.pm.protocols)
  279. # Long time passes
  280. self.reactor.advance(self.pm.threshold)
  281. # Process greets
  282. self.pm.protocols["foo"].outReceived(b'\xffhello world!\n')
  283. self.assertEquals(len(events), 1)
  284. messages = events[0]['message']
  285. self.assertEquals(len(messages), 1)
  286. message = messages[0]
  287. tag, output = message.split(' ', 1)
  288. self.assertEquals(tag, '[foo]')
  289. self.assertEquals(output, repr(b'\xffhello world!'))
  290. def test_outputReceivedPartialLine(self):
  291. """
  292. Getting partial line results in no events until process end
  293. """
  294. events = []
  295. self.addCleanup(log.removeObserver, events.append)
  296. log.addObserver(events.append)
  297. self.pm.addProcess("foo", ["foo"])
  298. # Schedule the process to start
  299. self.pm.startService()
  300. # Advance the reactor to start the process
  301. self.reactor.advance(0)
  302. self.assertIn("foo", self.pm.protocols)
  303. # Long time passes
  304. self.reactor.advance(self.pm.threshold)
  305. # Process greets
  306. self.pm.protocols["foo"].outReceived(b'hello world!')
  307. self.assertEquals(len(events), 0)
  308. self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
  309. self.assertEquals(len(events), 1)
  310. message = events[0]['message']
  311. self.assertEquals(message, tuple([u'[foo] hello world!']))
  312. def test_connectionLostLongLivedProcess(self):
  313. """
  314. L{ProcessMonitor.connectionLost} should immediately restart a process
  315. if it has been running longer than L{ProcessMonitor.threshold} seconds.
  316. """
  317. self.pm.addProcess("foo", ["foo"])
  318. # Schedule the process to start
  319. self.pm.startService()
  320. # advance the reactor to start the process
  321. self.reactor.advance(0)
  322. self.assertIn("foo", self.pm.protocols)
  323. # Long time passes
  324. self.reactor.advance(self.pm.threshold)
  325. # Process dies after threshold
  326. self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
  327. self.assertNotIn("foo", self.pm.protocols)
  328. # Process should be restarted immediately
  329. self.reactor.advance(0)
  330. self.assertIn("foo", self.pm.protocols)
  331. def test_connectionLostMurderCancel(self):
  332. """
  333. L{ProcessMonitor.connectionLost} cancels a scheduled process killer and
  334. deletes the DelayedCall from the L{ProcessMonitor.murder} list.
  335. """
  336. self.pm.addProcess("foo", ["foo"])
  337. # Schedule the process to start
  338. self.pm.startService()
  339. # Advance 1s to start the process then ask ProcMon to stop it
  340. self.reactor.advance(1)
  341. self.pm.stopProcess("foo")
  342. # A process killer has been scheduled, delayedCall is active
  343. self.assertIn("foo", self.pm.murder)
  344. delayedCall = self.pm.murder["foo"]
  345. self.assertTrue(delayedCall.active())
  346. # Advance to the point at which the dummy process exits
  347. self.reactor.advance(
  348. self.pm.protocols["foo"].transport._terminationDelay)
  349. # Now the delayedCall has been cancelled and deleted
  350. self.assertFalse(delayedCall.active())
  351. self.assertNotIn("foo", self.pm.murder)
  352. def test_connectionLostProtocolDeletion(self):
  353. """
  354. L{ProcessMonitor.connectionLost} removes the corresponding
  355. ProcessProtocol instance from the L{ProcessMonitor.protocols} list.
  356. """
  357. self.pm.startService()
  358. self.pm.addProcess("foo", ["foo"])
  359. self.assertIn("foo", self.pm.protocols)
  360. self.pm.protocols["foo"].transport.signalProcess("KILL")
  361. self.reactor.advance(
  362. self.pm.protocols["foo"].transport._terminationDelay)
  363. self.assertNotIn("foo", self.pm.protocols)
  364. def test_connectionLostMinMaxRestartDelay(self):
  365. """
  366. L{ProcessMonitor.connectionLost} will wait at least minRestartDelay s
  367. and at most maxRestartDelay s
  368. """
  369. self.pm.minRestartDelay = 2
  370. self.pm.maxRestartDelay = 3
  371. self.pm.startService()
  372. self.pm.addProcess("foo", ["foo"])
  373. self.assertEqual(self.pm.delay["foo"], self.pm.minRestartDelay)
  374. self.reactor.advance(self.pm.threshold - 1)
  375. self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
  376. self.assertEqual(self.pm.delay["foo"], self.pm.maxRestartDelay)
  377. def test_connectionLostBackoffDelayDoubles(self):
  378. """
  379. L{ProcessMonitor.connectionLost} doubles the restart delay each time
  380. the process dies too quickly.
  381. """
  382. self.pm.startService()
  383. self.pm.addProcess("foo", ["foo"])
  384. self.reactor.advance(self.pm.threshold - 1) #9s
  385. self.assertIn("foo", self.pm.protocols)
  386. self.assertEqual(self.pm.delay["foo"], self.pm.minRestartDelay)
  387. # process dies within the threshold and should not restart immediately
  388. self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
  389. self.assertEqual(self.pm.delay["foo"], self.pm.minRestartDelay * 2)
  390. def test_startService(self):
  391. """
  392. L{ProcessMonitor.startService} starts all monitored processes.
  393. """
  394. self.pm.addProcess("foo", ["foo"])
  395. # Schedule the process to start
  396. self.pm.startService()
  397. # advance the reactor to start the process
  398. self.reactor.advance(0)
  399. self.assertIn("foo", self.pm.protocols)
  400. def test_stopService(self):
  401. """
  402. L{ProcessMonitor.stopService} should stop all monitored processes.
  403. """
  404. self.pm.addProcess("foo", ["foo"])
  405. self.pm.addProcess("bar", ["bar"])
  406. # Schedule the process to start
  407. self.pm.startService()
  408. # advance the reactor to start the processes
  409. self.reactor.advance(self.pm.threshold)
  410. self.assertIn("foo", self.pm.protocols)
  411. self.assertIn("bar", self.pm.protocols)
  412. self.reactor.advance(1)
  413. self.pm.stopService()
  414. # Advance to beyond the killTime - all monitored processes
  415. # should have exited
  416. self.reactor.advance(self.pm.killTime + 1)
  417. # The processes shouldn't be restarted
  418. self.assertEqual({}, self.pm.protocols)
  419. def test_stopServiceCancelRestarts(self):
  420. """
  421. L{ProcessMonitor.stopService} should cancel any scheduled process
  422. restarts.
  423. """
  424. self.pm.addProcess("foo", ["foo"])
  425. # Schedule the process to start
  426. self.pm.startService()
  427. # advance the reactor to start the processes
  428. self.reactor.advance(self.pm.threshold)
  429. self.assertIn("foo", self.pm.protocols)
  430. self.reactor.advance(1)
  431. # Kill the process early
  432. self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
  433. self.assertTrue(self.pm.restart['foo'].active())
  434. self.pm.stopService()
  435. # Scheduled restart should have been cancelled
  436. self.assertFalse(self.pm.restart['foo'].active())
  437. def test_stopServiceCleanupScheduledRestarts(self):
  438. """
  439. L{ProcessMonitor.stopService} should cancel all scheduled process
  440. restarts.
  441. """
  442. self.pm.threshold = 5
  443. self.pm.minRestartDelay = 5
  444. # Start service and add a process (started immediately)
  445. self.pm.startService()
  446. self.pm.addProcess("foo", ["foo"])
  447. # Stop the process after 1s
  448. self.reactor.advance(1)
  449. self.pm.stopProcess("foo")
  450. # Wait 1s for it to exit it will be scheduled to restart 5s later
  451. self.reactor.advance(1)
  452. # Meanwhile stop the service
  453. self.pm.stopService()
  454. # Advance to beyond the process restart time
  455. self.reactor.advance(6)
  456. # The process shouldn't have restarted because stopService has cancelled
  457. # all pending process restarts.
  458. self.assertEqual(self.pm.protocols, {})