123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Test cases for dirdbm module.
- """
- import shutil
- from base64 import b64decode
- from twisted.trial import unittest
- from twisted.persisted import dirdbm
- from twisted.python.compat import _PY3
- from twisted.python.filepath import FilePath
- class DirDbmTests(unittest.TestCase):
- def setUp(self):
- self.path = FilePath(self.mktemp())
- self.dbm = dirdbm.open(self.path.path)
- self.items = ((b'abc', b'foo'), (b'/lalal', b'\000\001'), (b'\000\012', b'baz'))
- def testAll(self):
- k = b64decode("//==")
- self.dbm[k] = b"a"
- self.dbm[k] = b"a"
- self.assertEqual(self.dbm[k], b"a")
- def testRebuildInteraction(self):
- from twisted.persisted import dirdbm
- from twisted.python import rebuild
- s = dirdbm.Shelf('dirdbm.rebuild.test')
- s[b'key'] = b'value'
- rebuild.rebuild(dirdbm)
- # print s['key']
- if _PY3:
- testRebuildInteraction.skip=(
- "Does not work on Python 3 (https://tm.tl/8887)")
- def testDbm(self):
- d = self.dbm
- # Insert keys
- keys = []
- values = set()
- for k, v in self.items:
- d[k] = v
- keys.append(k)
- values.add(v)
- keys.sort()
- # Check they exist
- for k, v in self.items:
- self.assertIn(k, d)
- self.assertEqual(d[k], v)
- # Check non existent key
- try:
- d[b"XXX"]
- except KeyError:
- pass
- else:
- assert 0, "didn't raise KeyError on non-existent key"
- # Check keys(), values() and items()
- dbkeys = d.keys()
- dbvalues = set(d.values())
- dbitems = set(d.items())
- dbkeys.sort()
- items = set(self.items)
- self.assertEqual(keys, dbkeys,
- ".keys() output didn't match: %s != %s" %
- (repr(keys), repr(dbkeys)))
- self.assertEqual(values, dbvalues,
- ".values() output didn't match: %s != %s" %
- (repr(values), repr(dbvalues)))
- self.assertEqual(items, dbitems,
- "items() didn't match: %s != %s" %
- (repr(items), repr(dbitems)))
- copyPath = self.mktemp()
- d2 = d.copyTo(copyPath)
- copykeys = d.keys()
- copyvalues = set(d.values())
- copyitems = set(d.items())
- copykeys.sort()
- self.assertEqual(dbkeys, copykeys,
- ".copyTo().keys() didn't match: %s != %s" %
- (repr(dbkeys), repr(copykeys)))
- self.assertEqual(dbvalues, copyvalues,
- ".copyTo().values() didn't match: %s != %s" %
- (repr(dbvalues), repr(copyvalues)))
- self.assertEqual(dbitems, copyitems,
- ".copyTo().items() didn't match: %s != %s" %
- (repr(dbkeys), repr(copyitems)))
- d2.clear()
- self.assertTrue(len(d2.keys()) == len(d2.values()) ==
- len(d2.items()) == len(d2) == 0, ".clear() failed")
- self.assertNotEqual(len(d), len(d2))
- shutil.rmtree(copyPath)
- # Delete items
- for k, v in self.items:
- del d[k]
- self.assertNotIn(k, d, "key is still in database, even though we deleted it")
- self.assertEqual(len(d.keys()), 0, "database has keys")
- self.assertEqual(len(d.values()), 0, "database has values")
- self.assertEqual(len(d.items()), 0, "database has items")
- self.assertEqual(len(d), 0, "database has items")
- def testModificationTime(self):
- import time
- # The mtime value for files comes from a different place than the
- # gettimeofday() system call. On linux, gettimeofday() can be
- # slightly ahead (due to clock drift which gettimeofday() takes into
- # account but which open()/write()/close() do not), and if we are
- # close to the edge of the next second, time.time() can give a value
- # which is larger than the mtime which results from a subsequent
- # write(). I consider this a kernel bug, but it is beyond the scope
- # of this test. Thus we keep the range of acceptability to 3 seconds time.
- # -warner
- self.dbm[b"k"] = b"v"
- self.assertTrue(abs(time.time() - self.dbm.getModificationTime(b"k")) <= 3)
- self.assertRaises(KeyError, self.dbm.getModificationTime, b"nokey")
- def testRecovery(self):
- """
- DirDBM: test recovery from directory after a faked crash
- """
- k = self.dbm._encode(b"key1")
- with self.path.child(k + b".rpl").open(mode="wb") as f:
- f.write(b"value")
- k2 = self.dbm._encode(b"key2")
- with self.path.child(k2).open(mode="wb") as f:
- f.write(b"correct")
- with self.path.child(k2 + b".rpl").open(mode="wb") as f:
- f.write(b"wrong")
- with self.path.child("aa.new").open(mode="wb") as f:
- f.write(b"deleted")
- dbm = dirdbm.DirDBM(self.path.path)
- self.assertEqual(dbm[b"key1"], b"value")
- self.assertEqual(dbm[b"key2"], b"correct")
- self.assertFalse(self.path.globChildren("*.new"))
- self.assertFalse(self.path.globChildren("*.rpl"))
- def test_nonStringKeys(self):
- """
- L{dirdbm.DirDBM} operations only support string keys: other types
- should raise a L{TypeError}.
- """
- self.assertRaises(TypeError, self.dbm.__setitem__, 2, "3")
- try:
- self.assertRaises(TypeError, self.dbm.__setitem__, "2", 3)
- except unittest.FailTest:
- # dirdbm.Shelf.__setitem__ supports non-string values
- self.assertIsInstance(self.dbm, dirdbm.Shelf)
- self.assertRaises(TypeError, self.dbm.__getitem__, 2)
- self.assertRaises(TypeError, self.dbm.__delitem__, 2)
- self.assertRaises(TypeError, self.dbm.has_key, 2)
- self.assertRaises(TypeError, self.dbm.__contains__, 2)
- self.assertRaises(TypeError, self.dbm.getModificationTime, 2)
- def test_failSet(self):
- """
- Failure path when setting an item.
- """
- def _writeFail(path, data):
- path.setContent(data)
- raise IOError("fail to write")
- self.dbm[b"failkey"] = b"test"
- self.patch(self.dbm, "_writeFile", _writeFail)
- self.assertRaises(IOError, self.dbm.__setitem__, b"failkey", b"test2")
- class ShelfTests(DirDbmTests):
- def setUp(self):
- self.path = FilePath(self.mktemp())
- self.dbm = dirdbm.Shelf(self.path.path)
- self.items = ((b'abc', b'foo'), (b'/lalal', b'\000\001'), (b'\000\012', b'baz'),
- (b'int', 12), (b'float', 12.0), (b'tuple', (None, 12)))
- testCases = [DirDbmTests, ShelfTests]
|