test_dirdbm.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test cases for dirdbm module.
  5. """
  6. import shutil
  7. from base64 import b64decode
  8. from twisted.trial import unittest
  9. from twisted.persisted import dirdbm
  10. from twisted.python.compat import _PY3
  11. from twisted.python.filepath import FilePath
  12. class DirDbmTests(unittest.TestCase):
  13. def setUp(self):
  14. self.path = FilePath(self.mktemp())
  15. self.dbm = dirdbm.open(self.path.path)
  16. self.items = ((b'abc', b'foo'), (b'/lalal', b'\000\001'), (b'\000\012', b'baz'))
  17. def testAll(self):
  18. k = b64decode("//==")
  19. self.dbm[k] = b"a"
  20. self.dbm[k] = b"a"
  21. self.assertEqual(self.dbm[k], b"a")
  22. def testRebuildInteraction(self):
  23. from twisted.persisted import dirdbm
  24. from twisted.python import rebuild
  25. s = dirdbm.Shelf('dirdbm.rebuild.test')
  26. s[b'key'] = b'value'
  27. rebuild.rebuild(dirdbm)
  28. # print s['key']
  29. if _PY3:
  30. testRebuildInteraction.skip=(
  31. "Does not work on Python 3 (https://tm.tl/8887)")
  32. def testDbm(self):
  33. d = self.dbm
  34. # Insert keys
  35. keys = []
  36. values = set()
  37. for k, v in self.items:
  38. d[k] = v
  39. keys.append(k)
  40. values.add(v)
  41. keys.sort()
  42. # Check they exist
  43. for k, v in self.items:
  44. self.assertIn(k, d)
  45. self.assertEqual(d[k], v)
  46. # Check non existent key
  47. try:
  48. d[b"XXX"]
  49. except KeyError:
  50. pass
  51. else:
  52. assert 0, "didn't raise KeyError on non-existent key"
  53. # Check keys(), values() and items()
  54. dbkeys = d.keys()
  55. dbvalues = set(d.values())
  56. dbitems = set(d.items())
  57. dbkeys.sort()
  58. items = set(self.items)
  59. self.assertEqual(keys, dbkeys,
  60. ".keys() output didn't match: %s != %s" %
  61. (repr(keys), repr(dbkeys)))
  62. self.assertEqual(values, dbvalues,
  63. ".values() output didn't match: %s != %s" %
  64. (repr(values), repr(dbvalues)))
  65. self.assertEqual(items, dbitems,
  66. "items() didn't match: %s != %s" %
  67. (repr(items), repr(dbitems)))
  68. copyPath = self.mktemp()
  69. d2 = d.copyTo(copyPath)
  70. copykeys = d.keys()
  71. copyvalues = set(d.values())
  72. copyitems = set(d.items())
  73. copykeys.sort()
  74. self.assertEqual(dbkeys, copykeys,
  75. ".copyTo().keys() didn't match: %s != %s" %
  76. (repr(dbkeys), repr(copykeys)))
  77. self.assertEqual(dbvalues, copyvalues,
  78. ".copyTo().values() didn't match: %s != %s" %
  79. (repr(dbvalues), repr(copyvalues)))
  80. self.assertEqual(dbitems, copyitems,
  81. ".copyTo().items() didn't match: %s != %s" %
  82. (repr(dbkeys), repr(copyitems)))
  83. d2.clear()
  84. self.assertTrue(len(d2.keys()) == len(d2.values()) ==
  85. len(d2.items()) == len(d2) == 0, ".clear() failed")
  86. self.assertNotEqual(len(d), len(d2))
  87. shutil.rmtree(copyPath)
  88. # Delete items
  89. for k, v in self.items:
  90. del d[k]
  91. self.assertNotIn(k, d, "key is still in database, even though we deleted it")
  92. self.assertEqual(len(d.keys()), 0, "database has keys")
  93. self.assertEqual(len(d.values()), 0, "database has values")
  94. self.assertEqual(len(d.items()), 0, "database has items")
  95. self.assertEqual(len(d), 0, "database has items")
  96. def testModificationTime(self):
  97. import time
  98. # The mtime value for files comes from a different place than the
  99. # gettimeofday() system call. On linux, gettimeofday() can be
  100. # slightly ahead (due to clock drift which gettimeofday() takes into
  101. # account but which open()/write()/close() do not), and if we are
  102. # close to the edge of the next second, time.time() can give a value
  103. # which is larger than the mtime which results from a subsequent
  104. # write(). I consider this a kernel bug, but it is beyond the scope
  105. # of this test. Thus we keep the range of acceptability to 3 seconds time.
  106. # -warner
  107. self.dbm[b"k"] = b"v"
  108. self.assertTrue(abs(time.time() - self.dbm.getModificationTime(b"k")) <= 3)
  109. self.assertRaises(KeyError, self.dbm.getModificationTime, b"nokey")
  110. def testRecovery(self):
  111. """
  112. DirDBM: test recovery from directory after a faked crash
  113. """
  114. k = self.dbm._encode(b"key1")
  115. with self.path.child(k + b".rpl").open(mode="wb") as f:
  116. f.write(b"value")
  117. k2 = self.dbm._encode(b"key2")
  118. with self.path.child(k2).open(mode="wb") as f:
  119. f.write(b"correct")
  120. with self.path.child(k2 + b".rpl").open(mode="wb") as f:
  121. f.write(b"wrong")
  122. with self.path.child("aa.new").open(mode="wb") as f:
  123. f.write(b"deleted")
  124. dbm = dirdbm.DirDBM(self.path.path)
  125. self.assertEqual(dbm[b"key1"], b"value")
  126. self.assertEqual(dbm[b"key2"], b"correct")
  127. self.assertFalse(self.path.globChildren("*.new"))
  128. self.assertFalse(self.path.globChildren("*.rpl"))
  129. def test_nonStringKeys(self):
  130. """
  131. L{dirdbm.DirDBM} operations only support string keys: other types
  132. should raise a L{TypeError}.
  133. """
  134. self.assertRaises(TypeError, self.dbm.__setitem__, 2, "3")
  135. try:
  136. self.assertRaises(TypeError, self.dbm.__setitem__, "2", 3)
  137. except unittest.FailTest:
  138. # dirdbm.Shelf.__setitem__ supports non-string values
  139. self.assertIsInstance(self.dbm, dirdbm.Shelf)
  140. self.assertRaises(TypeError, self.dbm.__getitem__, 2)
  141. self.assertRaises(TypeError, self.dbm.__delitem__, 2)
  142. self.assertRaises(TypeError, self.dbm.has_key, 2)
  143. self.assertRaises(TypeError, self.dbm.__contains__, 2)
  144. self.assertRaises(TypeError, self.dbm.getModificationTime, 2)
  145. def test_failSet(self):
  146. """
  147. Failure path when setting an item.
  148. """
  149. def _writeFail(path, data):
  150. path.setContent(data)
  151. raise IOError("fail to write")
  152. self.dbm[b"failkey"] = b"test"
  153. self.patch(self.dbm, "_writeFile", _writeFail)
  154. self.assertRaises(IOError, self.dbm.__setitem__, b"failkey", b"test2")
  155. class ShelfTests(DirDbmTests):
  156. def setUp(self):
  157. self.path = FilePath(self.mktemp())
  158. self.dbm = dirdbm.Shelf(self.path.path)
  159. self.items = ((b'abc', b'foo'), (b'/lalal', b'\000\001'), (b'\000\012', b'baz'),
  160. (b'int', 12), (b'float', 12.0), (b'tuple', (None, 12)))
  161. testCases = [DirDbmTests, ShelfTests]