test_util.py 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165
  1. # -*- test-case-name: twisted.python.test.test_util
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Tests for L{twisted.python.util}.
  6. """
  7. from __future__ import division, absolute_import
  8. import errno
  9. import os.path
  10. import shutil
  11. import sys
  12. import warnings
  13. try:
  14. import pwd, grp
  15. except ImportError:
  16. pwd = grp = None
  17. from twisted.trial import unittest
  18. from twisted.trial.util import suppress as SUPPRESS
  19. from twisted.python import util
  20. from twisted.python.filepath import FilePath
  21. from twisted.internet import reactor
  22. from twisted.internet.interfaces import IReactorProcess
  23. from twisted.internet.protocol import ProcessProtocol
  24. from twisted.internet.defer import Deferred
  25. from twisted.internet.error import ProcessDone
  26. from twisted.test.test_process import MockOS
  27. pyExe = FilePath(sys.executable)._asBytesPath()
  28. class UtilTests(unittest.TestCase):
  29. def testUniq(self):
  30. l = ["a", 1, "ab", "a", 3, 4, 1, 2, 2, 4, 6]
  31. self.assertEqual(util.uniquify(l), ["a", 1, "ab", 3, 4, 2, 6])
  32. def testRaises(self):
  33. self.assertTrue(util.raises(ZeroDivisionError, divmod, 1, 0))
  34. self.assertFalse(util.raises(ZeroDivisionError, divmod, 0, 1))
  35. try:
  36. util.raises(TypeError, divmod, 1, 0)
  37. except ZeroDivisionError:
  38. pass
  39. else:
  40. raise unittest.FailTest("util.raises didn't raise when it should have")
  41. def test_uidFromNumericString(self):
  42. """
  43. When L{uidFromString} is called with a base-ten string representation
  44. of an integer, it returns the integer.
  45. """
  46. self.assertEqual(util.uidFromString("100"), 100)
  47. def test_uidFromUsernameString(self):
  48. """
  49. When L{uidFromString} is called with a base-ten string representation
  50. of an integer, it returns the integer.
  51. """
  52. pwent = pwd.getpwuid(os.getuid())
  53. self.assertEqual(util.uidFromString(pwent.pw_name), pwent.pw_uid)
  54. if pwd is None:
  55. test_uidFromUsernameString.skip = (
  56. "Username/UID conversion requires the pwd module.")
  57. def test_gidFromNumericString(self):
  58. """
  59. When L{gidFromString} is called with a base-ten string representation
  60. of an integer, it returns the integer.
  61. """
  62. self.assertEqual(util.gidFromString("100"), 100)
  63. def test_gidFromGroupnameString(self):
  64. """
  65. When L{gidFromString} is called with a base-ten string representation
  66. of an integer, it returns the integer.
  67. """
  68. grent = grp.getgrgid(os.getgid())
  69. self.assertEqual(util.gidFromString(grent.gr_name), grent.gr_gid)
  70. if grp is None:
  71. test_gidFromGroupnameString.skip = (
  72. "Group Name/GID conversion requires the grp module.")
  73. class NameToLabelTests(unittest.TestCase):
  74. """
  75. Tests for L{nameToLabel}.
  76. """
  77. def test_nameToLabel(self):
  78. """
  79. Test the various kinds of inputs L{nameToLabel} supports.
  80. """
  81. nameData = [
  82. ('f', 'F'),
  83. ('fo', 'Fo'),
  84. ('foo', 'Foo'),
  85. ('fooBar', 'Foo Bar'),
  86. ('fooBarBaz', 'Foo Bar Baz'),
  87. ]
  88. for inp, out in nameData:
  89. got = util.nameToLabel(inp)
  90. self.assertEqual(
  91. got, out,
  92. "nameToLabel(%r) == %r != %r" % (inp, got, out))
  93. class UntilConcludesTests(unittest.TestCase):
  94. """
  95. Tests for L{untilConcludes}, an C{EINTR} helper.
  96. """
  97. def test_uninterruptably(self):
  98. """
  99. L{untilConcludes} calls the function passed to it until the function
  100. does not raise either L{OSError} or L{IOError} with C{errno} of
  101. C{EINTR}. It otherwise completes with the same result as the function
  102. passed to it.
  103. """
  104. def f(a, b):
  105. self.calls += 1
  106. exc = self.exceptions.pop()
  107. if exc is not None:
  108. raise exc(errno.EINTR, "Interrupted system call!")
  109. return a + b
  110. self.exceptions = [None]
  111. self.calls = 0
  112. self.assertEqual(util.untilConcludes(f, 1, 2), 3)
  113. self.assertEqual(self.calls, 1)
  114. self.exceptions = [None, OSError, IOError]
  115. self.calls = 0
  116. self.assertEqual(util.untilConcludes(f, 2, 3), 5)
  117. self.assertEqual(self.calls, 3)
  118. class SwitchUIDTests(unittest.TestCase):
  119. """
  120. Tests for L{util.switchUID}.
  121. """
  122. if getattr(os, "getuid", None) is None:
  123. skip = "getuid/setuid not available"
  124. def setUp(self):
  125. self.mockos = MockOS()
  126. self.patch(util, "os", self.mockos)
  127. self.patch(util, "initgroups", self.initgroups)
  128. self.initgroupsCalls = []
  129. def initgroups(self, uid, gid):
  130. """
  131. Save L{util.initgroups} calls in C{self.initgroupsCalls}.
  132. """
  133. self.initgroupsCalls.append((uid, gid))
  134. def test_uid(self):
  135. """
  136. L{util.switchUID} calls L{util.initgroups} and then C{os.setuid} with
  137. the given uid.
  138. """
  139. util.switchUID(12000, None)
  140. self.assertEqual(self.initgroupsCalls, [(12000, None)])
  141. self.assertEqual(self.mockos.actions, [("setuid", 12000)])
  142. def test_euid(self):
  143. """
  144. L{util.switchUID} calls L{util.initgroups} and then C{os.seteuid} with
  145. the given uid if the C{euid} parameter is set to C{True}.
  146. """
  147. util.switchUID(12000, None, True)
  148. self.assertEqual(self.initgroupsCalls, [(12000, None)])
  149. self.assertEqual(self.mockos.seteuidCalls, [12000])
  150. def test_currentUID(self):
  151. """
  152. If the current uid is the same as the uid passed to L{util.switchUID},
  153. then initgroups does not get called, but a warning is issued.
  154. """
  155. uid = self.mockos.getuid()
  156. util.switchUID(uid, None)
  157. self.assertEqual(self.initgroupsCalls, [])
  158. self.assertEqual(self.mockos.actions, [])
  159. currentWarnings = self.flushWarnings([util.switchUID])
  160. self.assertEqual(len(currentWarnings), 1)
  161. self.assertIn('tried to drop privileges and setuid %i' % uid,
  162. currentWarnings[0]['message'])
  163. self.assertIn(
  164. 'but uid is already %i' % uid, currentWarnings[0]['message'])
  165. def test_currentEUID(self):
  166. """
  167. If the current euid is the same as the euid passed to L{util.switchUID},
  168. then initgroups does not get called, but a warning is issued.
  169. """
  170. euid = self.mockos.geteuid()
  171. util.switchUID(euid, None, True)
  172. self.assertEqual(self.initgroupsCalls, [])
  173. self.assertEqual(self.mockos.seteuidCalls, [])
  174. currentWarnings = self.flushWarnings([util.switchUID])
  175. self.assertEqual(len(currentWarnings), 1)
  176. self.assertIn('tried to drop privileges and seteuid %i' % euid,
  177. currentWarnings[0]['message'])
  178. self.assertIn(
  179. 'but euid is already %i' % euid, currentWarnings[0]['message'])
  180. class MergeFunctionMetadataTests(unittest.TestCase):
  181. """
  182. Tests for L{mergeFunctionMetadata}.
  183. """
  184. def test_mergedFunctionBehavesLikeMergeTarget(self):
  185. """
  186. After merging C{foo}'s data into C{bar}, the returned function behaves
  187. as if it is C{bar}.
  188. """
  189. foo_object = object()
  190. bar_object = object()
  191. def foo():
  192. return foo_object
  193. def bar(x, y, ab, c=10, *d, **e):
  194. (a, b) = ab
  195. return bar_object
  196. baz = util.mergeFunctionMetadata(foo, bar)
  197. self.assertIs(baz(1, 2, (3, 4), quux=10), bar_object)
  198. def test_moduleIsMerged(self):
  199. """
  200. Merging C{foo} into C{bar} returns a function with C{foo}'s
  201. C{__module__}.
  202. """
  203. def foo():
  204. pass
  205. def bar():
  206. pass
  207. bar.__module__ = 'somewhere.else'
  208. baz = util.mergeFunctionMetadata(foo, bar)
  209. self.assertEqual(baz.__module__, foo.__module__)
  210. def test_docstringIsMerged(self):
  211. """
  212. Merging C{foo} into C{bar} returns a function with C{foo}'s docstring.
  213. """
  214. def foo():
  215. """
  216. This is foo.
  217. """
  218. def bar():
  219. """
  220. This is bar.
  221. """
  222. baz = util.mergeFunctionMetadata(foo, bar)
  223. self.assertEqual(baz.__doc__, foo.__doc__)
  224. def test_nameIsMerged(self):
  225. """
  226. Merging C{foo} into C{bar} returns a function with C{foo}'s name.
  227. """
  228. def foo():
  229. pass
  230. def bar():
  231. pass
  232. baz = util.mergeFunctionMetadata(foo, bar)
  233. self.assertEqual(baz.__name__, foo.__name__)
  234. def test_instanceDictionaryIsMerged(self):
  235. """
  236. Merging C{foo} into C{bar} returns a function with C{bar}'s
  237. dictionary, updated by C{foo}'s.
  238. """
  239. def foo():
  240. pass
  241. foo.a = 1
  242. foo.b = 2
  243. def bar():
  244. pass
  245. bar.b = 3
  246. bar.c = 4
  247. baz = util.mergeFunctionMetadata(foo, bar)
  248. self.assertEqual(foo.a, baz.a)
  249. self.assertEqual(foo.b, baz.b)
  250. self.assertEqual(bar.c, baz.c)
  251. class OrderedDictTests(unittest.TestCase):
  252. """
  253. Tests for L{util.OrderedDict}.
  254. """
  255. def test_deprecated(self):
  256. """
  257. L{util.OrderedDict} is deprecated.
  258. """
  259. from twisted.python.util import OrderedDict
  260. OrderedDict # Shh pyflakes
  261. currentWarnings = self.flushWarnings(offendingFunctions=[
  262. self.test_deprecated])
  263. self.assertEqual(
  264. currentWarnings[0]['message'],
  265. "twisted.python.util.OrderedDict was deprecated in Twisted "
  266. "15.5.0: Use collections.OrderedDict instead.")
  267. self.assertEqual(currentWarnings[0]['category'], DeprecationWarning)
  268. self.assertEqual(len(currentWarnings), 1)
  269. class InsensitiveDictTests(unittest.TestCase):
  270. """
  271. Tests for L{util.InsensitiveDict}.
  272. """
  273. def test_preserve(self):
  274. """
  275. L{util.InsensitiveDict} preserves the case of keys if constructed with
  276. C{preserve=True}.
  277. """
  278. dct = util.InsensitiveDict({'Foo':'bar', 1:2, 'fnz':{1:2}}, preserve=1)
  279. self.assertEqual(dct['fnz'], {1:2})
  280. self.assertEqual(dct['foo'], 'bar')
  281. self.assertEqual(dct.copy(), dct)
  282. self.assertEqual(dct['foo'], dct.get('Foo'))
  283. self.assertIn(1, dct)
  284. self.assertIn('foo', dct)
  285. result = eval(repr(dct), {
  286. 'dct': dct,
  287. 'InsensitiveDict': util.InsensitiveDict,
  288. })
  289. self.assertEqual(result, dct)
  290. keys=['Foo', 'fnz', 1]
  291. for x in keys:
  292. self.assertIn(x, dct.keys())
  293. self.assertIn((x, dct[x]), dct.items())
  294. self.assertEqual(len(keys), len(dct))
  295. del dct[1]
  296. del dct['foo']
  297. self.assertEqual(dct.keys(), ['fnz'])
  298. def test_noPreserve(self):
  299. """
  300. L{util.InsensitiveDict} does not preserves the case of keys if
  301. constructed with C{preserve=False}.
  302. """
  303. dct = util.InsensitiveDict({'Foo':'bar', 1:2, 'fnz':{1:2}}, preserve=0)
  304. keys=['foo', 'fnz', 1]
  305. for x in keys:
  306. self.assertIn(x, dct.keys())
  307. self.assertIn((x, dct[x]), dct.items())
  308. self.assertEqual(len(keys), len(dct))
  309. del dct[1]
  310. del dct['foo']
  311. self.assertEqual(dct.keys(), ['fnz'])
  312. def test_unicode(self):
  313. """
  314. Unicode keys are case insensitive.
  315. """
  316. d = util.InsensitiveDict(preserve=False)
  317. d[u"Foo"] = 1
  318. self.assertEqual(d[u"FOO"], 1)
  319. self.assertEqual(d.keys(), [u"foo"])
  320. def test_bytes(self):
  321. """
  322. Bytes keys are case insensitive.
  323. """
  324. d = util.InsensitiveDict(preserve=False)
  325. d[b"Foo"] = 1
  326. self.assertEqual(d[b"FOO"], 1)
  327. self.assertEqual(d.keys(), [b"foo"])
  328. class PasswordTestingProcessProtocol(ProcessProtocol):
  329. """
  330. Write the string C{"secret\n"} to a subprocess and then collect all of
  331. its output and fire a Deferred with it when the process ends.
  332. """
  333. def connectionMade(self):
  334. self.output = []
  335. self.transport.write(b'secret\n')
  336. def childDataReceived(self, fd, output):
  337. self.output.append((fd, output))
  338. def processEnded(self, reason):
  339. self.finished.callback((reason, self.output))
  340. class GetPasswordTests(unittest.TestCase):
  341. if not IReactorProcess.providedBy(reactor):
  342. skip = "Process support required to test getPassword"
  343. def test_stdin(self):
  344. """
  345. Making sure getPassword accepts a password from standard input by
  346. running a child process which uses getPassword to read in a string
  347. which it then writes it out again. Write a string to the child
  348. process and then read one and make sure it is the right string.
  349. """
  350. p = PasswordTestingProcessProtocol()
  351. p.finished = Deferred()
  352. reactor.spawnProcess(
  353. p, pyExe,
  354. [pyExe,
  355. b'-c',
  356. (b'import sys\n'
  357. b'from twisted.python.util import getPassword\n'
  358. b'sys.stdout.write(getPassword())\n'
  359. b'sys.stdout.flush()\n')],
  360. env={b'PYTHONPATH': os.pathsep.join(sys.path).encode("utf8")})
  361. def processFinished(result):
  362. (reason, output) = result
  363. reason.trap(ProcessDone)
  364. self.assertIn((1, b'secret'), output)
  365. return p.finished.addCallback(processFinished)
  366. class SearchUpwardsTests(unittest.TestCase):
  367. def testSearchupwards(self):
  368. os.makedirs('searchupwards/a/b/c')
  369. open('searchupwards/foo.txt', 'w').close()
  370. open('searchupwards/a/foo.txt', 'w').close()
  371. open('searchupwards/a/b/c/foo.txt', 'w').close()
  372. os.mkdir('searchupwards/bar')
  373. os.mkdir('searchupwards/bam')
  374. os.mkdir('searchupwards/a/bar')
  375. os.mkdir('searchupwards/a/b/bam')
  376. actual=util.searchupwards('searchupwards/a/b/c',
  377. files=['foo.txt'],
  378. dirs=['bar', 'bam'])
  379. expected=os.path.abspath('searchupwards') + os.sep
  380. self.assertEqual(actual, expected)
  381. shutil.rmtree('searchupwards')
  382. actual=util.searchupwards('searchupwards/a/b/c',
  383. files=['foo.txt'],
  384. dirs=['bar', 'bam'])
  385. expected=None
  386. self.assertEqual(actual, expected)
  387. class IntervalDifferentialTests(unittest.TestCase):
  388. def testDefault(self):
  389. d = iter(util.IntervalDifferential([], 10))
  390. for i in range(100):
  391. self.assertEqual(next(d), (10, None))
  392. def testSingle(self):
  393. d = iter(util.IntervalDifferential([5], 10))
  394. for i in range(100):
  395. self.assertEqual(next(d), (5, 0))
  396. def testPair(self):
  397. d = iter(util.IntervalDifferential([5, 7], 10))
  398. for i in range(100):
  399. self.assertEqual(next(d), (5, 0))
  400. self.assertEqual(next(d), (2, 1))
  401. self.assertEqual(next(d), (3, 0))
  402. self.assertEqual(next(d), (4, 1))
  403. self.assertEqual(next(d), (1, 0))
  404. self.assertEqual(next(d), (5, 0))
  405. self.assertEqual(next(d), (1, 1))
  406. self.assertEqual(next(d), (4, 0))
  407. self.assertEqual(next(d), (3, 1))
  408. self.assertEqual(next(d), (2, 0))
  409. self.assertEqual(next(d), (5, 0))
  410. self.assertEqual(next(d), (0, 1))
  411. def testTriple(self):
  412. d = iter(util.IntervalDifferential([2, 4, 5], 10))
  413. for i in range(100):
  414. self.assertEqual(next(d), (2, 0))
  415. self.assertEqual(next(d), (2, 0))
  416. self.assertEqual(next(d), (0, 1))
  417. self.assertEqual(next(d), (1, 2))
  418. self.assertEqual(next(d), (1, 0))
  419. self.assertEqual(next(d), (2, 0))
  420. self.assertEqual(next(d), (0, 1))
  421. self.assertEqual(next(d), (2, 0))
  422. self.assertEqual(next(d), (0, 2))
  423. self.assertEqual(next(d), (2, 0))
  424. self.assertEqual(next(d), (0, 1))
  425. self.assertEqual(next(d), (2, 0))
  426. self.assertEqual(next(d), (1, 2))
  427. self.assertEqual(next(d), (1, 0))
  428. self.assertEqual(next(d), (0, 1))
  429. self.assertEqual(next(d), (2, 0))
  430. self.assertEqual(next(d), (2, 0))
  431. self.assertEqual(next(d), (0, 1))
  432. self.assertEqual(next(d), (0, 2))
  433. def testInsert(self):
  434. d = iter(util.IntervalDifferential([], 10))
  435. self.assertEqual(next(d), (10, None))
  436. d.addInterval(3)
  437. self.assertEqual(next(d), (3, 0))
  438. self.assertEqual(next(d), (3, 0))
  439. d.addInterval(6)
  440. self.assertEqual(next(d), (3, 0))
  441. self.assertEqual(next(d), (3, 0))
  442. self.assertEqual(next(d), (0, 1))
  443. self.assertEqual(next(d), (3, 0))
  444. self.assertEqual(next(d), (3, 0))
  445. self.assertEqual(next(d), (0, 1))
  446. def testRemove(self):
  447. d = iter(util.IntervalDifferential([3, 5], 10))
  448. self.assertEqual(next(d), (3, 0))
  449. self.assertEqual(next(d), (2, 1))
  450. self.assertEqual(next(d), (1, 0))
  451. d.removeInterval(3)
  452. self.assertEqual(next(d), (4, 0))
  453. self.assertEqual(next(d), (5, 0))
  454. d.removeInterval(5)
  455. self.assertEqual(next(d), (10, None))
  456. self.assertRaises(ValueError, d.removeInterval, 10)
  457. class Record(util.FancyEqMixin):
  458. """
  459. Trivial user of L{FancyEqMixin} used by tests.
  460. """
  461. compareAttributes = ('a', 'b')
  462. def __init__(self, a, b):
  463. self.a = a
  464. self.b = b
  465. class DifferentRecord(util.FancyEqMixin):
  466. """
  467. Trivial user of L{FancyEqMixin} which is not related to L{Record}.
  468. """
  469. compareAttributes = ('a', 'b')
  470. def __init__(self, a, b):
  471. self.a = a
  472. self.b = b
  473. class DerivedRecord(Record):
  474. """
  475. A class with an inheritance relationship to L{Record}.
  476. """
  477. class EqualToEverything(object):
  478. """
  479. A class the instances of which consider themselves equal to everything.
  480. """
  481. def __eq__(self, other):
  482. return True
  483. def __ne__(self, other):
  484. return False
  485. class EqualToNothing(object):
  486. """
  487. A class the instances of which consider themselves equal to nothing.
  488. """
  489. def __eq__(self, other):
  490. return False
  491. def __ne__(self, other):
  492. return True
  493. class EqualityTests(unittest.TestCase):
  494. """
  495. Tests for L{FancyEqMixin}.
  496. """
  497. def test_identity(self):
  498. """
  499. Instances of a class which mixes in L{FancyEqMixin} but which
  500. defines no comparison attributes compare by identity.
  501. """
  502. class Empty(util.FancyEqMixin):
  503. pass
  504. self.assertFalse(Empty() == Empty())
  505. self.assertTrue(Empty() != Empty())
  506. empty = Empty()
  507. self.assertTrue(empty == empty)
  508. self.assertFalse(empty != empty)
  509. def test_equality(self):
  510. """
  511. Instances of a class which mixes in L{FancyEqMixin} should compare
  512. equal if all of their attributes compare equal. They should not
  513. compare equal if any of their attributes do not compare equal.
  514. """
  515. self.assertTrue(Record(1, 2) == Record(1, 2))
  516. self.assertFalse(Record(1, 2) == Record(1, 3))
  517. self.assertFalse(Record(1, 2) == Record(2, 2))
  518. self.assertFalse(Record(1, 2) == Record(3, 4))
  519. def test_unequality(self):
  520. """
  521. Inequality between instances of a particular L{record} should be
  522. defined as the negation of equality.
  523. """
  524. self.assertFalse(Record(1, 2) != Record(1, 2))
  525. self.assertTrue(Record(1, 2) != Record(1, 3))
  526. self.assertTrue(Record(1, 2) != Record(2, 2))
  527. self.assertTrue(Record(1, 2) != Record(3, 4))
  528. def test_differentClassesEquality(self):
  529. """
  530. Instances of different classes which mix in L{FancyEqMixin} should not
  531. compare equal.
  532. """
  533. self.assertFalse(Record(1, 2) == DifferentRecord(1, 2))
  534. def test_differentClassesInequality(self):
  535. """
  536. Instances of different classes which mix in L{FancyEqMixin} should
  537. compare unequal.
  538. """
  539. self.assertTrue(Record(1, 2) != DifferentRecord(1, 2))
  540. def test_inheritedClassesEquality(self):
  541. """
  542. An instance of a class which derives from a class which mixes in
  543. L{FancyEqMixin} should compare equal to an instance of the base class
  544. if and only if all of their attributes compare equal.
  545. """
  546. self.assertTrue(Record(1, 2) == DerivedRecord(1, 2))
  547. self.assertFalse(Record(1, 2) == DerivedRecord(1, 3))
  548. self.assertFalse(Record(1, 2) == DerivedRecord(2, 2))
  549. self.assertFalse(Record(1, 2) == DerivedRecord(3, 4))
  550. def test_inheritedClassesInequality(self):
  551. """
  552. An instance of a class which derives from a class which mixes in
  553. L{FancyEqMixin} should compare unequal to an instance of the base
  554. class if any of their attributes compare unequal.
  555. """
  556. self.assertFalse(Record(1, 2) != DerivedRecord(1, 2))
  557. self.assertTrue(Record(1, 2) != DerivedRecord(1, 3))
  558. self.assertTrue(Record(1, 2) != DerivedRecord(2, 2))
  559. self.assertTrue(Record(1, 2) != DerivedRecord(3, 4))
  560. def test_rightHandArgumentImplementsEquality(self):
  561. """
  562. The right-hand argument to the equality operator is given a chance
  563. to determine the result of the operation if it is of a type
  564. unrelated to the L{FancyEqMixin}-based instance on the left-hand
  565. side.
  566. """
  567. self.assertTrue(Record(1, 2) == EqualToEverything())
  568. self.assertFalse(Record(1, 2) == EqualToNothing())
  569. def test_rightHandArgumentImplementsUnequality(self):
  570. """
  571. The right-hand argument to the non-equality operator is given a
  572. chance to determine the result of the operation if it is of a type
  573. unrelated to the L{FancyEqMixin}-based instance on the left-hand
  574. side.
  575. """
  576. self.assertFalse(Record(1, 2) != EqualToEverything())
  577. self.assertTrue(Record(1, 2) != EqualToNothing())
  578. class RunAsEffectiveUserTests(unittest.TestCase):
  579. """
  580. Test for the L{util.runAsEffectiveUser} function.
  581. """
  582. if getattr(os, "geteuid", None) is None:
  583. skip = "geteuid/seteuid not available"
  584. def setUp(self):
  585. self.mockos = MockOS()
  586. self.patch(os, "geteuid", self.mockos.geteuid)
  587. self.patch(os, "getegid", self.mockos.getegid)
  588. self.patch(os, "seteuid", self.mockos.seteuid)
  589. self.patch(os, "setegid", self.mockos.setegid)
  590. def _securedFunction(self, startUID, startGID, wantUID, wantGID):
  591. """
  592. Check if wanted UID/GID matched start or saved ones.
  593. """
  594. self.assertTrue(wantUID == startUID or
  595. wantUID == self.mockos.seteuidCalls[-1])
  596. self.assertTrue(wantGID == startGID or
  597. wantGID == self.mockos.setegidCalls[-1])
  598. def test_forwardResult(self):
  599. """
  600. L{util.runAsEffectiveUser} forwards the result obtained by calling the
  601. given function
  602. """
  603. result = util.runAsEffectiveUser(0, 0, lambda: 1)
  604. self.assertEqual(result, 1)
  605. def test_takeParameters(self):
  606. """
  607. L{util.runAsEffectiveUser} pass the given parameters to the given
  608. function.
  609. """
  610. result = util.runAsEffectiveUser(0, 0, lambda x: 2*x, 3)
  611. self.assertEqual(result, 6)
  612. def test_takesKeyworkArguments(self):
  613. """
  614. L{util.runAsEffectiveUser} pass the keyword parameters to the given
  615. function.
  616. """
  617. result = util.runAsEffectiveUser(0, 0, lambda x, y=1, z=1: x*y*z, 2, z=3)
  618. self.assertEqual(result, 6)
  619. def _testUIDGIDSwitch(self, startUID, startGID, wantUID, wantGID,
  620. expectedUIDSwitches, expectedGIDSwitches):
  621. """
  622. Helper method checking the calls to C{os.seteuid} and C{os.setegid}
  623. made by L{util.runAsEffectiveUser}, when switching from startUID to
  624. wantUID and from startGID to wantGID.
  625. """
  626. self.mockos.euid = startUID
  627. self.mockos.egid = startGID
  628. util.runAsEffectiveUser(
  629. wantUID, wantGID,
  630. self._securedFunction, startUID, startGID, wantUID, wantGID)
  631. self.assertEqual(self.mockos.seteuidCalls, expectedUIDSwitches)
  632. self.assertEqual(self.mockos.setegidCalls, expectedGIDSwitches)
  633. self.mockos.seteuidCalls = []
  634. self.mockos.setegidCalls = []
  635. def test_root(self):
  636. """
  637. Check UID/GID switches when current effective UID is root.
  638. """
  639. self._testUIDGIDSwitch(0, 0, 0, 0, [], [])
  640. self._testUIDGIDSwitch(0, 0, 1, 0, [1, 0], [])
  641. self._testUIDGIDSwitch(0, 0, 0, 1, [], [1, 0])
  642. self._testUIDGIDSwitch(0, 0, 1, 1, [1, 0], [1, 0])
  643. def test_UID(self):
  644. """
  645. Check UID/GID switches when current effective UID is non-root.
  646. """
  647. self._testUIDGIDSwitch(1, 0, 0, 0, [0, 1], [])
  648. self._testUIDGIDSwitch(1, 0, 1, 0, [], [])
  649. self._testUIDGIDSwitch(1, 0, 1, 1, [0, 1, 0, 1], [1, 0])
  650. self._testUIDGIDSwitch(1, 0, 2, 1, [0, 2, 0, 1], [1, 0])
  651. def test_GID(self):
  652. """
  653. Check UID/GID switches when current effective GID is non-root.
  654. """
  655. self._testUIDGIDSwitch(0, 1, 0, 0, [], [0, 1])
  656. self._testUIDGIDSwitch(0, 1, 0, 1, [], [])
  657. self._testUIDGIDSwitch(0, 1, 1, 1, [1, 0], [])
  658. self._testUIDGIDSwitch(0, 1, 1, 2, [1, 0], [2, 1])
  659. def test_UIDGID(self):
  660. """
  661. Check UID/GID switches when current effective UID/GID is non-root.
  662. """
  663. self._testUIDGIDSwitch(1, 1, 0, 0, [0, 1], [0, 1])
  664. self._testUIDGIDSwitch(1, 1, 0, 1, [0, 1], [])
  665. self._testUIDGIDSwitch(1, 1, 1, 0, [0, 1, 0, 1], [0, 1])
  666. self._testUIDGIDSwitch(1, 1, 1, 1, [], [])
  667. self._testUIDGIDSwitch(1, 1, 2, 1, [0, 2, 0, 1], [])
  668. self._testUIDGIDSwitch(1, 1, 1, 2, [0, 1, 0, 1], [2, 1])
  669. self._testUIDGIDSwitch(1, 1, 2, 2, [0, 2, 0, 1], [2, 1])
  670. class InitGroupsTests(unittest.TestCase):
  671. """
  672. Tests for L{util.initgroups}.
  673. """
  674. def setUp(self):
  675. self.addCleanup(setattr, util, "_initgroups", util._initgroups)
  676. self.addCleanup(setattr, util, "setgroups", util.setgroups)
  677. def test_initgroupsInStdlib(self):
  678. """
  679. Calling L{util.initgroups} will call the underlying stdlib
  680. implmentation.
  681. """
  682. calls = []
  683. util._initgroups = lambda x, y: calls.append((x, y))
  684. setgroupsCalls = []
  685. util.setgroups = setgroupsCalls.append
  686. util.initgroups(os.getuid(), 4)
  687. self.assertEqual(calls, [(pwd.getpwuid(os.getuid())[0], 4)])
  688. self.assertFalse(setgroupsCalls)
  689. if util._initgroups is None:
  690. test_initgroupsInStdlib.skip = ("stdlib support for initgroups is not "
  691. "available")
  692. class DeprecationTests(unittest.TestCase):
  693. """
  694. Tests for deprecations in C{twisted.python.util}.
  695. """
  696. def test_getPluginDirs(self):
  697. """
  698. L{util.getPluginDirs} is deprecated.
  699. """
  700. util.getPluginDirs()
  701. currentWarnings = self.flushWarnings(offendingFunctions=[
  702. self.test_getPluginDirs])
  703. self.assertEqual(
  704. currentWarnings[0]['message'],
  705. "twisted.python.util.getPluginDirs is deprecated since Twisted "
  706. "12.2.")
  707. self.assertEqual(currentWarnings[0]['category'], DeprecationWarning)
  708. self.assertEqual(len(currentWarnings), 1)
  709. def test_addPluginDir(self):
  710. """
  711. L{util.addPluginDir} is deprecated.
  712. """
  713. util.addPluginDir()
  714. currentWarnings = self.flushWarnings(offendingFunctions=[
  715. self.test_addPluginDir])
  716. self.assertEqual(
  717. currentWarnings[0]['message'],
  718. "twisted.python.util.addPluginDir is deprecated since Twisted "
  719. "12.2.")
  720. self.assertEqual(currentWarnings[0]['category'], DeprecationWarning)
  721. self.assertEqual(len(currentWarnings), 1)
  722. test_addPluginDir.suppress = [
  723. SUPPRESS(category=DeprecationWarning,
  724. message="twisted.python.util.getPluginDirs is deprecated")
  725. ]
  726. class SuppressedWarningsTests(unittest.TestCase):
  727. """
  728. Tests for L{util.runWithWarningsSuppressed}.
  729. """
  730. runWithWarningsSuppressed = staticmethod(util.runWithWarningsSuppressed)
  731. def test_runWithWarningsSuppressedFiltered(self):
  732. """
  733. Warnings from the function called by C{runWithWarningsSuppressed} are
  734. suppressed if they match the passed in filter.
  735. """
  736. filters = [(("ignore", ".*foo.*"), {}),
  737. (("ignore", ".*bar.*"), {})]
  738. self.runWithWarningsSuppressed(filters, warnings.warn, "ignore foo")
  739. self.runWithWarningsSuppressed(filters, warnings.warn, "ignore bar")
  740. self.assertEqual([], self.flushWarnings())
  741. def test_runWithWarningsSuppressedUnfiltered(self):
  742. """
  743. Warnings from the function called by C{runWithWarningsSuppressed} are
  744. not suppressed if they do not match the passed in filter.
  745. """
  746. filters = [(("ignore", ".*foo.*"), {}),
  747. (("ignore", ".*bar.*"), {})]
  748. self.runWithWarningsSuppressed(filters, warnings.warn, "don't ignore")
  749. self.assertEqual(
  750. ["don't ignore"], [w['message'] for w in self.flushWarnings()])
  751. def test_passThrough(self):
  752. """
  753. C{runWithWarningsSuppressed} returns the result of the function it
  754. called.
  755. """
  756. self.assertEqual(self.runWithWarningsSuppressed([], lambda: 4), 4)
  757. def test_noSideEffects(self):
  758. """
  759. Once C{runWithWarningsSuppressed} has returned, it no longer
  760. suppresses warnings.
  761. """
  762. filters = [(("ignore", ".*foo.*"), {}),
  763. (("ignore", ".*bar.*"), {})]
  764. self.runWithWarningsSuppressed(filters, lambda: None)
  765. warnings.warn("ignore foo")
  766. self.assertEqual(
  767. ["ignore foo"], [w['message'] for w in self.flushWarnings()])
  768. class FancyStrMixinTests(unittest.TestCase):
  769. """
  770. Tests for L{util.FancyStrMixin}.
  771. """
  772. def test_sequenceOfStrings(self):
  773. """
  774. If C{showAttributes} is set to a sequence of strings, C{__str__}
  775. renders using those by looking them up as attributes on the object.
  776. """
  777. class Foo(util.FancyStrMixin):
  778. showAttributes = ("first", "second")
  779. first = 1
  780. second = "hello"
  781. self.assertEqual(str(Foo()), "<Foo first=1 second='hello'>")
  782. def test_formatter(self):
  783. """
  784. If C{showAttributes} has an item that is a 2-tuple, C{__str__} renders
  785. the first item in the tuple as a key and the result of calling the
  786. second item with the value of the attribute named by the first item as
  787. the value.
  788. """
  789. class Foo(util.FancyStrMixin):
  790. showAttributes = (
  791. "first",
  792. ("second", lambda value: repr(value[::-1])))
  793. first = "hello"
  794. second = "world"
  795. self.assertEqual("<Foo first='hello' second='dlrow'>", str(Foo()))
  796. def test_override(self):
  797. """
  798. If C{showAttributes} has an item that is a 3-tuple, C{__str__} renders
  799. the second item in the tuple as a key, and the contents of the
  800. attribute named in the first item are rendered as the value. The value
  801. is formatted using the third item in the tuple.
  802. """
  803. class Foo(util.FancyStrMixin):
  804. showAttributes = ("first", ("second", "2nd", "%.1f"))
  805. first = 1
  806. second = 2.111
  807. self.assertEqual(str(Foo()), "<Foo first=1 2nd=2.1>")
  808. def test_fancybasename(self):
  809. """
  810. If C{fancybasename} is present, C{__str__} uses it instead of the class name.
  811. """
  812. class Foo(util.FancyStrMixin):
  813. fancybasename = "Bar"
  814. self.assertEqual(str(Foo()), "<Bar>")
  815. def test_repr(self):
  816. """
  817. C{__repr__} outputs the same content as C{__str__}.
  818. """
  819. class Foo(util.FancyStrMixin):
  820. showAttributes = ("first", "second")
  821. first = 1
  822. second = "hello"
  823. obj = Foo()
  824. self.assertEqual(str(obj), repr(obj))
  825. class PadToTests(unittest.TestCase):
  826. """
  827. Tests for L{util.padTo}.
  828. """
  829. def test_default(self):
  830. """
  831. L{None} values can be added to a list to cause it to have a certain
  832. length.
  833. """
  834. padded = util.padTo(3, [])
  835. self.assertEqual([None] * 3, padded)
  836. def test_specificDefaultValue(self):
  837. """
  838. A specific value can be added to a list to cause it to have a certain
  839. length.
  840. """
  841. padded = util.padTo(4, [], "x")
  842. self.assertEqual(["x"] * 4, padded)
  843. def test_padNonEmptyList(self):
  844. """
  845. A list which already has some items has the padding value added after
  846. those items.
  847. """
  848. padded = util.padTo(3, [1, 2], "z")
  849. self.assertEqual([1, 2, "z"], padded)
  850. def test_padToSmallerSize(self):
  851. """
  852. L{util.padTo} can't pad a list if the size requested is smaller than
  853. the size of the list to pad.
  854. """
  855. self.assertRaises(ValueError, util.padTo, 1, [1, 2])
  856. def test_alreadyPadded(self):
  857. """
  858. If the list is already the length indicated by the padding argument
  859. then a list with the same value is returned.
  860. """
  861. items = [1, 2]
  862. padded = util.padTo(len(items), items)
  863. self.assertEqual(items, padded)
  864. def test_alreadyPaddedCopies(self):
  865. """
  866. If the list is already the length indicated by the padding argument
  867. then the return value is a copy of the input.
  868. """
  869. items = [1, 2]
  870. padded = util.padTo(len(items), items)
  871. self.assertIsNot(padded, items)
  872. def test_makeCopy(self):
  873. """
  874. L{util.padTo} doesn't modify the input list but makes a copy.
  875. """
  876. items = []
  877. util.padTo(4, items)
  878. self.assertEqual([], items)
  879. class ReplaceIfTests(unittest.TestCase):
  880. """
  881. Tests for L{util._replaceIf}.
  882. """
  883. def test_replacesIfTrue(self):
  884. """
  885. L{util._replaceIf} swaps out the body of a function if the conditional
  886. is C{True}.
  887. """
  888. @util._replaceIf(True, lambda: "hi")
  889. def test():
  890. return "bye"
  891. self.assertEqual(test(), "hi")
  892. self.assertEqual(test.__name__, "test")
  893. self.assertEqual(test.__module__, "twisted.python.test.test_util")
  894. def test_keepsIfFalse(self):
  895. """
  896. L{util._replaceIf} keeps the original body of the function if the
  897. conditional is C{False}.
  898. """
  899. @util._replaceIf(False, lambda: "hi")
  900. def test():
  901. return "bye"
  902. self.assertEqual(test(), "bye")
  903. def test_multipleReplace(self):
  904. """
  905. In the case that multiple conditions are true, the first one
  906. (to the reader) is chosen by L{util._replaceIf}
  907. """
  908. @util._replaceIf(True, lambda: "hi")
  909. @util._replaceIf(False, lambda: "bar")
  910. @util._replaceIf(True, lambda: "baz")
  911. def test():
  912. return "bye"
  913. self.assertEqual(test(), "hi")
  914. def test_boolsOnly(self):
  915. """
  916. L{util._replaceIf}'s condition argument only accepts bools.
  917. """
  918. with self.assertRaises(ValueError) as e:
  919. @util._replaceIf("hi", "there")
  920. def test():
  921. """
  922. Some test function.
  923. """
  924. self.assertEqual(e.exception.args[0],
  925. ("condition argument to _replaceIf requires a bool, "
  926. "not 'hi'"))