test_systemd.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.python.systemd}.
  5. """
  6. from __future__ import division, absolute_import
  7. import os
  8. from twisted.trial.unittest import TestCase
  9. from twisted.python.systemd import ListenFDs
  10. class InheritedDescriptorsMixin(object):
  11. """
  12. Mixin for a L{TestCase} subclass which defines test methods for some kind of
  13. systemd sd-daemon class. In particular, it defines tests for a
  14. C{inheritedDescriptors} method.
  15. """
  16. def test_inheritedDescriptors(self):
  17. """
  18. C{inheritedDescriptors} returns a list of integers giving the file
  19. descriptors which were inherited from systemd.
  20. """
  21. sddaemon = self.getDaemon(7, 3)
  22. self.assertEqual([7, 8, 9], sddaemon.inheritedDescriptors())
  23. def test_repeated(self):
  24. """
  25. Any subsequent calls to C{inheritedDescriptors} return the same list.
  26. """
  27. sddaemon = self.getDaemon(7, 3)
  28. self.assertEqual(
  29. sddaemon.inheritedDescriptors(),
  30. sddaemon.inheritedDescriptors())
  31. class MemoryOnlyMixin(object):
  32. """
  33. Mixin for a L{TestCase} subclass which creates creating a fake, in-memory
  34. implementation of C{inheritedDescriptors}. This provides verification that
  35. the fake behaves in a compatible way with the real implementation.
  36. """
  37. def getDaemon(self, start, count):
  38. """
  39. Invent C{count} new I{file descriptors} (actually integers, attached to
  40. no real file description), starting at C{start}. Construct and return a
  41. new L{ListenFDs} which will claim those integers represent inherited
  42. file descriptors.
  43. """
  44. return ListenFDs(range(start, start + count))
  45. class EnvironmentMixin(object):
  46. """
  47. Mixin for a L{TestCase} subclass which creates a real implementation of
  48. C{inheritedDescriptors} which is based on the environment variables set by
  49. systemd. To facilitate testing, this mixin will also create a fake
  50. environment dictionary and add keys to it to make it look as if some
  51. descriptors have been inherited.
  52. """
  53. def initializeEnvironment(self, count, pid):
  54. """
  55. Create a copy of the process environment and add I{LISTEN_FDS} and
  56. I{LISTEN_PID} (the environment variables set by systemd) to it.
  57. """
  58. result = os.environ.copy()
  59. result['LISTEN_FDS'] = str(count)
  60. result['LISTEN_PID'] = str(pid)
  61. return result
  62. def getDaemon(self, start, count):
  63. """
  64. Create a new L{ListenFDs} instance, initialized with a fake environment
  65. dictionary which will be set up as systemd would have set it up if
  66. C{count} descriptors were being inherited. The descriptors will also
  67. start at C{start}.
  68. """
  69. fakeEnvironment = self.initializeEnvironment(count, os.getpid())
  70. return ListenFDs.fromEnvironment(environ=fakeEnvironment, start=start)
  71. class MemoryOnlyTests(MemoryOnlyMixin, InheritedDescriptorsMixin, TestCase):
  72. """
  73. Apply tests to L{ListenFDs}, explicitly constructed with some fake file
  74. descriptors.
  75. """
  76. class EnvironmentTests(EnvironmentMixin, InheritedDescriptorsMixin, TestCase):
  77. """
  78. Apply tests to L{ListenFDs}, constructed based on an environment dictionary.
  79. """
  80. def test_secondEnvironment(self):
  81. """
  82. Only a single L{Environment} can extract inherited file descriptors.
  83. """
  84. fakeEnvironment = self.initializeEnvironment(3, os.getpid())
  85. first = ListenFDs.fromEnvironment(environ=fakeEnvironment)
  86. second = ListenFDs.fromEnvironment(environ=fakeEnvironment)
  87. self.assertEqual(list(range(3, 6)), first.inheritedDescriptors())
  88. self.assertEqual([], second.inheritedDescriptors())
  89. def test_mismatchedPID(self):
  90. """
  91. If the current process PID does not match the PID in the environment, no
  92. inherited descriptors are reported.
  93. """
  94. fakeEnvironment = self.initializeEnvironment(3, os.getpid() + 1)
  95. sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
  96. self.assertEqual([], sddaemon.inheritedDescriptors())
  97. def test_missingPIDVariable(self):
  98. """
  99. If the I{LISTEN_PID} environment variable is not present, no inherited
  100. descriptors are reported.
  101. """
  102. fakeEnvironment = self.initializeEnvironment(3, os.getpid())
  103. del fakeEnvironment['LISTEN_PID']
  104. sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
  105. self.assertEqual([], sddaemon.inheritedDescriptors())
  106. def test_nonIntegerPIDVariable(self):
  107. """
  108. If the I{LISTEN_PID} environment variable is set to a string that cannot
  109. be parsed as an integer, no inherited descriptors are reported.
  110. """
  111. fakeEnvironment = self.initializeEnvironment(3, "hello, world")
  112. sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
  113. self.assertEqual([], sddaemon.inheritedDescriptors())
  114. def test_missingFDSVariable(self):
  115. """
  116. If the I{LISTEN_FDS} environment variable is not present, no inherited
  117. descriptors are reported.
  118. """
  119. fakeEnvironment = self.initializeEnvironment(3, os.getpid())
  120. del fakeEnvironment['LISTEN_FDS']
  121. sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
  122. self.assertEqual([], sddaemon.inheritedDescriptors())
  123. def test_nonIntegerFDSVariable(self):
  124. """
  125. If the I{LISTEN_FDS} environment variable is set to a string that cannot
  126. be parsed as an integer, no inherited descriptors are reported.
  127. """
  128. fakeEnvironment = self.initializeEnvironment("hello, world", os.getpid())
  129. sddaemon = ListenFDs.fromEnvironment(environ=fakeEnvironment)
  130. self.assertEqual([], sddaemon.inheritedDescriptors())
  131. def test_defaultEnviron(self):
  132. """
  133. If the process environment is not explicitly passed to
  134. L{Environment.__init__}, the real process environment dictionary is
  135. used.
  136. """
  137. self.patch(os, 'environ', {
  138. 'LISTEN_PID': str(os.getpid()),
  139. 'LISTEN_FDS': '5'})
  140. sddaemon = ListenFDs.fromEnvironment()
  141. self.assertEqual(list(range(3, 3 + 5)),
  142. sddaemon.inheritedDescriptors())