123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.application.runner._pidfile}.
- """
- from functools import wraps
- import errno
- from os import getpid, name as SYSTEM_NAME
- from io import BytesIO
- from zope.interface import implementer
- from zope.interface.verify import verifyObject
- from twisted.python.filepath import IFilePath
- from twisted.python.runtime import platform
- from ...runner import _pidfile
- from .._pidfile import (
- IPIDFile, PIDFile, NonePIDFile,
- AlreadyRunningError, InvalidPIDFileError, StalePIDFileError,
- NoPIDFound,
- )
- import twisted.trial.unittest
- from twisted.trial.unittest import SkipTest
- def ifPlatformSupported(f):
- """
- Decorator for tests that are not expected to work on all platforms.
- Calling L{PIDFile.isRunning} currently raises L{NotImplementedError} on
- non-POSIX platforms.
- On an unsupported platform, we expect to see any test that calls
- L{PIDFile.isRunning} to raise either L{NotImplementedError}, L{SkipTest},
- or C{self.failureException}.
- (C{self.failureException} may occur in a test that checks for a specific
- exception but it gets NotImplementedError instead.)
- @param f: The test method to decorate.
- @type f: method
- @return: The wrapped callable.
- """
- @wraps(f)
- def wrapper(self, *args, **kwargs):
- supported = platform.getType() == "posix"
- if supported:
- return f(self, *args, **kwargs)
- else:
- e = self.assertRaises(
- (NotImplementedError, SkipTest, self.failureException),
- f, self, *args, **kwargs
- )
- if isinstance(e, NotImplementedError):
- self.assertTrue(
- str(e).startswith("isRunning is not implemented on ")
- )
- return wrapper
- class PIDFileTests(twisted.trial.unittest.TestCase):
- """
- Tests for L{PIDFile}.
- """
- def test_interface(self):
- """
- L{PIDFile} conforms to L{IPIDFile}.
- """
- pidFile = PIDFile(DummyFilePath())
- verifyObject(IPIDFile, pidFile)
- def test_formatWithPID(self):
- """
- L{PIDFile._format} returns the expected format when given a PID.
- """
- self.assertEqual(PIDFile._format(pid=1337), b"1337\n")
- def test_readWithPID(self):
- """
- L{PIDFile.read} returns the PID from the given file path.
- """
- pid = 1337
- pidFile = PIDFile(DummyFilePath(PIDFile._format(pid=pid)))
- self.assertEqual(pid, pidFile.read())
- def test_readEmptyPID(self):
- """
- L{PIDFile.read} raises L{InvalidPIDFileError} when given an empty file
- path.
- """
- pidValue = b""
- pidFile = PIDFile(DummyFilePath(b""))
- e = self.assertRaises(InvalidPIDFileError, pidFile.read)
- self.assertEqual(
- str(e),
- "non-integer PID value in PID file: {!r}".format(pidValue)
- )
- def test_readWithBogusPID(self):
- """
- L{PIDFile.read} raises L{InvalidPIDFileError} when given an empty file
- path.
- """
- pidValue = b"$foo!"
- pidFile = PIDFile(DummyFilePath(pidValue))
- e = self.assertRaises(InvalidPIDFileError, pidFile.read)
- self.assertEqual(
- str(e),
- "non-integer PID value in PID file: {!r}".format(pidValue)
- )
- def test_readDoesntExist(self):
- """
- L{PIDFile.read} raises L{NoPIDFound} when given a non-existing file
- path.
- """
- pidFile = PIDFile(DummyFilePath())
- e = self.assertRaises(NoPIDFound, pidFile.read)
- self.assertEqual(str(e), "PID file does not exist")
- def test_readOpenRaisesOSErrorNotENOENT(self):
- """
- L{PIDFile.read} re-raises L{OSError} if the associated C{errno} is
- anything other than L{errno.ENOENT}.
- """
- def oops(mode="r"):
- raise OSError(errno.EIO, "I/O error")
- self.patch(DummyFilePath, "open", oops)
- pidFile = PIDFile(DummyFilePath())
- error = self.assertRaises(OSError, pidFile.read)
- self.assertEqual(error.errno, errno.EIO)
- def test_writePID(self):
- """
- L{PIDFile._write} stores the given PID.
- """
- pid = 1995
- pidFile = PIDFile(DummyFilePath())
- pidFile._write(pid)
- self.assertEqual(pidFile.read(), pid)
- def test_writePIDInvalid(self):
- """
- L{PIDFile._write} raises L{ValueError} when given an invalid PID.
- """
- pidFile = PIDFile(DummyFilePath())
- self.assertRaises(ValueError, pidFile._write, u"burp")
- def test_writeRunningPID(self):
- """
- L{PIDFile.writeRunningPID} stores the PID for the current process.
- """
- pidFile = PIDFile(DummyFilePath())
- pidFile.writeRunningPID()
- self.assertEqual(pidFile.read(), getpid())
- def test_remove(self):
- """
- L{PIDFile.remove} removes the PID file.
- """
- pidFile = PIDFile(DummyFilePath(b""))
- self.assertTrue(pidFile.filePath.exists())
- pidFile.remove()
- self.assertFalse(pidFile.filePath.exists())
- @ifPlatformSupported
- def test_isRunningDoesExist(self):
- """
- L{PIDFile.isRunning} returns true for a process that does exist.
- """
- pidFile = PIDFile(DummyFilePath())
- pidFile._write(1337)
- def kill(pid, signal):
- return # Don't actually kill anything
- self.patch(_pidfile, "kill", kill)
- self.assertTrue(pidFile.isRunning())
- @ifPlatformSupported
- def test_isRunningThis(self):
- """
- L{PIDFile.isRunning} returns true for this process (which is running).
- @note: This differs from L{PIDFileTests.test_isRunningDoesExist} in
- that it actually invokes the C{kill} system call, which is useful for
- testing of our chosen method for probing the existence of a process.
- """
- pidFile = PIDFile(DummyFilePath())
- pidFile.writeRunningPID()
- self.assertTrue(pidFile.isRunning())
- @ifPlatformSupported
- def test_isRunningDoesNotExist(self):
- """
- L{PIDFile.isRunning} raises L{StalePIDFileError} for a process that
- does not exist (errno=ESRCH).
- """
- pidFile = PIDFile(DummyFilePath())
- pidFile._write(1337)
- def kill(pid, signal):
- raise OSError(errno.ESRCH, "No such process")
- self.patch(_pidfile, "kill", kill)
- self.assertRaises(StalePIDFileError, pidFile.isRunning)
- @ifPlatformSupported
- def test_isRunningNotAllowed(self):
- """
- L{PIDFile.isRunning} returns true for a process that we are not allowed
- to kill (errno=EPERM).
- """
- pidFile = PIDFile(DummyFilePath())
- pidFile._write(1337)
- def kill(pid, signal):
- raise OSError(errno.EPERM, "Operation not permitted")
- self.patch(_pidfile, "kill", kill)
- self.assertTrue(pidFile.isRunning())
- @ifPlatformSupported
- def test_isRunningInit(self):
- """
- L{PIDFile.isRunning} returns true for a process that we are not allowed
- to kill (errno=EPERM).
- @note: This differs from L{PIDFileTests.test_isRunningNotAllowed} in
- that it actually invokes the C{kill} system call, which is useful for
- testing of our chosen method for probing the existence of a process
- that we are not allowed to kill.
- @note: In this case, we try killing C{init}, which is process #1 on
- POSIX systems, so this test is not portable. C{init} should always be
- running and should not be killable by non-root users.
- """
- if SYSTEM_NAME != "posix":
- raise SkipTest("This test assumes POSIX")
- pidFile = PIDFile(DummyFilePath())
- pidFile._write(1) # PID 1 is init on POSIX systems
- self.assertTrue(pidFile.isRunning())
- @ifPlatformSupported
- def test_isRunningUnknownErrno(self):
- """
- L{PIDFile.isRunning} re-raises L{OSError} if the attached C{errno}
- value from L{os.kill} is not an expected one.
- """
- pidFile = PIDFile(DummyFilePath())
- pidFile.writeRunningPID()
- def kill(pid, signal):
- raise OSError(errno.EEXIST, "File exists")
- self.patch(_pidfile, "kill", kill)
- self.assertRaises(OSError, pidFile.isRunning)
- def test_isRunningNoPIDFile(self):
- """
- L{PIDFile.isRunning} returns false if the PID file doesn't exist.
- """
- pidFile = PIDFile(DummyFilePath())
- self.assertFalse(pidFile.isRunning())
- def test_contextManager(self):
- """
- When used as a context manager, a L{PIDFile} will store the current pid
- on entry, then removes the PID file on exit.
- """
- pidFile = PIDFile(DummyFilePath())
- self.assertFalse(pidFile.filePath.exists())
- with pidFile:
- self.assertTrue(pidFile.filePath.exists())
- self.assertEqual(pidFile.read(), getpid())
- self.assertFalse(pidFile.filePath.exists())
- @ifPlatformSupported
- def test_contextManagerDoesntExist(self):
- """
- When used as a context manager, a L{PIDFile} will replace the
- underlying PIDFile rather than raising L{AlreadyRunningError} if the
- contained PID file exists but refers to a non-running PID.
- """
- pidFile = PIDFile(DummyFilePath())
- pidFile._write(1337)
- def kill(pid, signal):
- raise OSError(errno.ESRCH, "No such process")
- self.patch(_pidfile, "kill", kill)
- e = self.assertRaises(StalePIDFileError, pidFile.isRunning)
- self.assertEqual(str(e), "PID file refers to non-existing process")
- with pidFile:
- self.assertEqual(pidFile.read(), getpid())
- @ifPlatformSupported
- def test_contextManagerAlreadyRunning(self):
- """
- When used as a context manager, a L{PIDFile} will raise
- L{AlreadyRunningError} if the there is already a running process with
- the contained PID.
- """
- pidFile = PIDFile(DummyFilePath())
- pidFile._write(1337)
- def kill(pid, signal):
- return # Don't actually kill anything
- self.patch(_pidfile, "kill", kill)
- self.assertTrue(pidFile.isRunning())
- self.assertRaises(AlreadyRunningError, pidFile.__enter__)
- class NonePIDFileTests(twisted.trial.unittest.TestCase):
- """
- Tests for L{NonePIDFile}.
- """
- def test_interface(self):
- """
- L{NonePIDFile} conforms to L{IPIDFile}.
- """
- pidFile = NonePIDFile()
- verifyObject(IPIDFile, pidFile)
- def test_read(self):
- """
- L{NonePIDFile.read} raises L{NoPIDFound}.
- """
- pidFile = NonePIDFile()
- e = self.assertRaises(NoPIDFound, pidFile.read)
- self.assertEqual(str(e), "PID file does not exist")
- def test_write(self):
- """
- L{NonePIDFile._write} raises L{OSError} with an errno of L{errno.EPERM}.
- """
- pidFile = NonePIDFile()
- error = self.assertRaises(OSError, pidFile._write, 0)
- self.assertEqual(error.errno, errno.EPERM)
- def test_writeRunningPID(self):
- """
- L{NonePIDFile.writeRunningPID} raises L{OSError} with an errno of
- L{errno.EPERM}.
- """
- pidFile = NonePIDFile()
- error = self.assertRaises(OSError, pidFile.writeRunningPID)
- self.assertEqual(error.errno, errno.EPERM)
- def test_remove(self):
- """
- L{NonePIDFile.remove} raises L{OSError} with an errno of L{errno.EPERM}.
- """
- pidFile = NonePIDFile()
- error = self.assertRaises(OSError, pidFile.remove)
- self.assertEqual(error.errno, errno.ENOENT)
- def test_isRunning(self):
- """
- L{NonePIDFile.isRunning} returns L{False}.
- """
- pidFile = NonePIDFile()
- self.assertEqual(pidFile.isRunning(), False)
- def test_contextManager(self):
- """
- When used as a context manager, a L{NonePIDFile} doesn't raise, despite
- not existing.
- """
- pidFile = NonePIDFile()
- with pidFile:
- pass
- @implementer(IFilePath)
- class DummyFilePath(object):
- """
- In-memory L{IFilePath}.
- """
- def __init__(self, content=None):
- self.setContent(content)
- def open(self, mode="r"):
- if not self._exists:
- raise OSError(errno.ENOENT, "No such file or directory")
- return BytesIO(self.getContent())
- def setContent(self, content):
- self._exists = content is not None
- self._content = content
- def getContent(self):
- return self._content
- def remove(self):
- self.setContent(None)
- def exists(self):
- return self._exists
|