test_zipstream.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.python.zipstream}
  5. """
  6. import random
  7. import zipfile
  8. from hashlib import md5
  9. from twisted.python import zipstream, filepath
  10. from twisted.trial import unittest
  11. class FileEntryMixin(object):
  12. """
  13. File entry classes should behave as file-like objects
  14. """
  15. def getFileEntry(self, contents):
  16. """
  17. Return an appropriate zip file entry
  18. """
  19. filename = self.mktemp()
  20. with zipfile.ZipFile(filename, 'w', self.compression) as z:
  21. z.writestr('content', contents)
  22. z = zipstream.ChunkingZipFile(filename, 'r')
  23. return z.readfile('content')
  24. def test_isatty(self):
  25. """
  26. zip files should not be ttys, so isatty() should be false
  27. """
  28. with self.getFileEntry('') as fileEntry:
  29. self.assertFalse(fileEntry.isatty())
  30. def test_closed(self):
  31. """
  32. The C{closed} attribute should reflect whether C{close()} has been
  33. called.
  34. """
  35. with self.getFileEntry('') as fileEntry:
  36. self.assertFalse(fileEntry.closed)
  37. self.assertTrue(fileEntry.closed)
  38. def test_readline(self):
  39. """
  40. C{readline()} should mirror L{file.readline} and return up to a single
  41. delimiter.
  42. """
  43. with self.getFileEntry(b'hoho\nho') as fileEntry:
  44. self.assertEqual(fileEntry.readline(), b'hoho\n')
  45. self.assertEqual(fileEntry.readline(), b'ho')
  46. self.assertEqual(fileEntry.readline(), b'')
  47. def test_next(self):
  48. """
  49. Zip file entries should implement the iterator protocol as files do.
  50. """
  51. with self.getFileEntry(b'ho\nhoho') as fileEntry:
  52. self.assertEqual(fileEntry.next(), b'ho\n')
  53. self.assertEqual(fileEntry.next(), b'hoho')
  54. self.assertRaises(StopIteration, fileEntry.next)
  55. def test_readlines(self):
  56. """
  57. C{readlines()} should return a list of all the lines.
  58. """
  59. with self.getFileEntry(b'ho\nho\nho') as fileEntry:
  60. self.assertEqual(fileEntry.readlines(), [b'ho\n', b'ho\n', b'ho'])
  61. def test_iteration(self):
  62. """
  63. C{__iter__()} and C{xreadlines()} should return C{self}.
  64. """
  65. with self.getFileEntry('') as fileEntry:
  66. self.assertIs(iter(fileEntry), fileEntry)
  67. self.assertIs(fileEntry.xreadlines(), fileEntry)
  68. def test_readWhole(self):
  69. """
  70. C{.read()} should read the entire file.
  71. """
  72. contents = b"Hello, world!"
  73. with self.getFileEntry(contents) as entry:
  74. self.assertEqual(entry.read(), contents)
  75. def test_readPartial(self):
  76. """
  77. C{.read(num)} should read num bytes from the file.
  78. """
  79. contents = "0123456789"
  80. with self.getFileEntry(contents) as entry:
  81. one = entry.read(4)
  82. two = entry.read(200)
  83. self.assertEqual(one, b"0123")
  84. self.assertEqual(two, b"456789")
  85. def test_tell(self):
  86. """
  87. C{.tell()} should return the number of bytes that have been read so
  88. far.
  89. """
  90. contents = "x" * 100
  91. with self.getFileEntry(contents) as entry:
  92. entry.read(2)
  93. self.assertEqual(entry.tell(), 2)
  94. entry.read(4)
  95. self.assertEqual(entry.tell(), 6)
  96. class DeflatedZipFileEntryTests(FileEntryMixin, unittest.TestCase):
  97. """
  98. DeflatedZipFileEntry should be file-like
  99. """
  100. compression = zipfile.ZIP_DEFLATED
  101. class ZipFileEntryTests(FileEntryMixin, unittest.TestCase):
  102. """
  103. ZipFileEntry should be file-like
  104. """
  105. compression = zipfile.ZIP_STORED
  106. class ZipstreamTests(unittest.TestCase):
  107. """
  108. Tests for twisted.python.zipstream
  109. """
  110. def setUp(self):
  111. """
  112. Creates junk data that can be compressed and a test directory for any
  113. files that will be created
  114. """
  115. self.testdir = filepath.FilePath(self.mktemp())
  116. self.testdir.makedirs()
  117. self.unzipdir = self.testdir.child('unzipped')
  118. self.unzipdir.makedirs()
  119. def makeZipFile(self, contents, directory=''):
  120. """
  121. Makes a zip file archive containing len(contents) files. Contents
  122. should be a list of strings, each string being the content of one file.
  123. """
  124. zpfilename = self.testdir.child('zipfile.zip').path
  125. with zipfile.ZipFile(zpfilename, 'w') as zpfile:
  126. for i, content in enumerate(contents):
  127. filename = str(i)
  128. if directory:
  129. filename = directory + "/" + filename
  130. zpfile.writestr(filename, content)
  131. return zpfilename
  132. def test_invalidMode(self):
  133. """
  134. A ChunkingZipFile opened in write-mode should not allow .readfile(),
  135. and raise a RuntimeError instead.
  136. """
  137. with zipstream.ChunkingZipFile(self.mktemp(), "w") as czf:
  138. self.assertRaises(RuntimeError, czf.readfile, "something")
  139. def test_closedArchive(self):
  140. """
  141. A closed ChunkingZipFile should raise a L{RuntimeError} when
  142. .readfile() is invoked.
  143. """
  144. czf = zipstream.ChunkingZipFile(self.makeZipFile(["something"]), "r")
  145. czf.close()
  146. self.assertRaises(RuntimeError, czf.readfile, "something")
  147. def test_invalidHeader(self):
  148. """
  149. A zipfile entry with the wrong magic number should raise BadZipfile for
  150. readfile(), but that should not affect other files in the archive.
  151. """
  152. fn = self.makeZipFile(["test contents",
  153. "more contents"])
  154. with zipfile.ZipFile(fn, "r") as zf:
  155. zeroOffset = zf.getinfo("0").header_offset
  156. # Zero out just the one header.
  157. with open(fn, "r+b") as scribble:
  158. scribble.seek(zeroOffset, 0)
  159. scribble.write(b'0' * 4)
  160. with zipstream.ChunkingZipFile(fn) as czf:
  161. self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
  162. with czf.readfile("1") as zfe:
  163. self.assertEqual(zfe.read(), b"more contents")
  164. def test_filenameMismatch(self):
  165. """
  166. A zipfile entry with a different filename than is found in the central
  167. directory should raise BadZipfile.
  168. """
  169. fn = self.makeZipFile([b"test contents",
  170. b"more contents"])
  171. with zipfile.ZipFile(fn, "r") as zf:
  172. info = zf.getinfo("0")
  173. info.filename = "not zero"
  174. with open(fn, "r+b") as scribble:
  175. scribble.seek(info.header_offset, 0)
  176. scribble.write(info.FileHeader())
  177. with zipstream.ChunkingZipFile(fn) as czf:
  178. self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
  179. with czf.readfile("1") as zfe:
  180. self.assertEqual(zfe.read(), b"more contents")
  181. def test_unsupportedCompression(self):
  182. """
  183. A zipfile which describes an unsupported compression mechanism should
  184. raise BadZipfile.
  185. """
  186. fn = self.mktemp()
  187. with zipfile.ZipFile(fn, "w") as zf:
  188. zi = zipfile.ZipInfo("0")
  189. zf.writestr(zi, "some data")
  190. # Mangle its compression type in the central directory; can't do
  191. # this before the writestr call or zipfile will (correctly) tell us
  192. # not to pass bad compression types :)
  193. zi.compress_type = 1234
  194. with zipstream.ChunkingZipFile(fn) as czf:
  195. self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
  196. def test_extraData(self):
  197. """
  198. readfile() should skip over 'extra' data present in the zip metadata.
  199. """
  200. fn = self.mktemp()
  201. with zipfile.ZipFile(fn, 'w') as zf:
  202. zi = zipfile.ZipInfo("0")
  203. zi.extra = b"hello, extra"
  204. zf.writestr(zi, b"the real data")
  205. with zipstream.ChunkingZipFile(fn) as czf, czf.readfile("0") as zfe:
  206. self.assertEqual(zfe.read(), b"the real data")
  207. def test_unzipIterChunky(self):
  208. """
  209. L{twisted.python.zipstream.unzipIterChunky} returns an iterator which
  210. must be exhausted to completely unzip the input archive.
  211. """
  212. numfiles = 10
  213. contents = ['This is test file %d!' % i for i in range(numfiles)]
  214. contents = [i.encode("ascii") for i in contents]
  215. zpfilename = self.makeZipFile(contents)
  216. list(zipstream.unzipIterChunky(zpfilename, self.unzipdir.path))
  217. self.assertEqual(
  218. set(self.unzipdir.listdir()),
  219. set(map(str, range(numfiles))))
  220. for child in self.unzipdir.children():
  221. num = int(child.basename())
  222. self.assertEqual(child.getContent(), contents[num])
  223. def test_unzipIterChunkyDirectory(self):
  224. """
  225. The path to which a file is extracted by L{zipstream.unzipIterChunky}
  226. is determined by joining the C{directory} argument to C{unzip} with the
  227. path within the archive of the file being extracted.
  228. """
  229. numfiles = 10
  230. contents = ['This is test file %d!' % i for i in range(numfiles)]
  231. contents = [i.encode("ascii") for i in contents]
  232. zpfilename = self.makeZipFile(contents, 'foo')
  233. list(zipstream.unzipIterChunky(zpfilename, self.unzipdir.path))
  234. fileContents = {str(num).encode("ascii") for num in range(numfiles)}
  235. self.assertEqual(
  236. set(self.unzipdir.child(b'foo').listdir()),
  237. fileContents)
  238. for child in self.unzipdir.child(b'foo').children():
  239. num = int(child.basename())
  240. self.assertEqual(child.getContent(), contents[num])
  241. # XXX these tests are kind of gross and old, but I think unzipIterChunky is
  242. # kind of a gross function anyway. We should really write an abstract
  243. # copyTo/moveTo that operates on FilePath and make sure ZipPath can support
  244. # it, then just deprecate / remove this stuff.
  245. def _unzipIterChunkyTest(self, compression, chunksize, lower, upper):
  246. """
  247. unzipIterChunky should unzip the given number of bytes per iteration.
  248. """
  249. junk = b''
  250. for n in range(1000):
  251. num = round(random.random(), 12)
  252. numEncoded = str(num).encode("ascii")
  253. junk += b' '+numEncoded
  254. junkmd5 = md5(junk).hexdigest()
  255. tempdir = filepath.FilePath(self.mktemp())
  256. tempdir.makedirs()
  257. zfpath = tempdir.child('bigfile.zip').path
  258. self._makebigfile(zfpath, compression, junk)
  259. uziter = zipstream.unzipIterChunky(zfpath, tempdir.path,
  260. chunksize=chunksize)
  261. r = next(uziter)
  262. # test that the number of chunks is in the right ballpark;
  263. # this could theoretically be any number but statistically it
  264. # should always be in this range
  265. approx = lower < r < upper
  266. self.assertTrue(approx)
  267. for r in uziter:
  268. pass
  269. self.assertEqual(r, 0)
  270. with tempdir.child("zipstreamjunk").open() as f:
  271. newmd5 = md5(f.read()).hexdigest()
  272. self.assertEqual(newmd5, junkmd5)
  273. def test_unzipIterChunkyStored(self):
  274. """
  275. unzipIterChunky should unzip the given number of bytes per iteration on
  276. a stored archive.
  277. """
  278. self._unzipIterChunkyTest(zipfile.ZIP_STORED, 500, 35, 45)
  279. def test_chunkyDeflated(self):
  280. """
  281. unzipIterChunky should unzip the given number of bytes per iteration on
  282. a deflated archive.
  283. """
  284. self._unzipIterChunkyTest(zipfile.ZIP_DEFLATED, 972, 23, 27)
  285. def _makebigfile(self, filename, compression, junk):
  286. """
  287. Create a zip file with the given file name and compression scheme.
  288. """
  289. with zipfile.ZipFile(filename, 'w', compression) as zf:
  290. for i in range(10):
  291. fn = 'zipstream%d' % i
  292. zf.writestr(fn, "")
  293. zf.writestr('zipstreamjunk', junk)