gz.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import struct
  2. try:
  3. from cStringIO import StringIO as BytesIO
  4. except ImportError:
  5. from io import BytesIO
  6. from gzip import GzipFile
  7. import six
  8. import re
  9. from scrapy.utils.decorators import deprecated
  10. # - Python>=3.5 GzipFile's read() has issues returning leftover
  11. # uncompressed data when input is corrupted
  12. # (regression or bug-fix compared to Python 3.4)
  13. # - read1(), which fetches data before raising EOFError on next call
  14. # works here but is only available from Python>=3.3
  15. # - scrapy does not support Python 3.2
  16. # - Python 2.7 GzipFile works fine with standard read() + extrabuf
  17. if six.PY2:
  18. def read1(gzf, size=-1):
  19. return gzf.read(size)
  20. else:
  21. def read1(gzf, size=-1):
  22. return gzf.read1(size)
  23. def gunzip(data):
  24. """Gunzip the given data and return as much data as possible.
  25. This is resilient to CRC checksum errors.
  26. """
  27. f = GzipFile(fileobj=BytesIO(data))
  28. output_list = []
  29. chunk = b'.'
  30. while chunk:
  31. try:
  32. chunk = read1(f, 8196)
  33. output_list.append(chunk)
  34. except (IOError, EOFError, struct.error):
  35. # complete only if there is some data, otherwise re-raise
  36. # see issue 87 about catching struct.error
  37. # some pages are quite small so output_list is empty and f.extrabuf
  38. # contains the whole page content
  39. if output_list or getattr(f, 'extrabuf', None):
  40. try:
  41. output_list.append(f.extrabuf[-f.extrasize:])
  42. finally:
  43. break
  44. else:
  45. raise
  46. return b''.join(output_list)
  47. _is_gzipped = re.compile(br'^application/(x-)?gzip\b', re.I).search
  48. _is_octetstream = re.compile(br'^(application|binary)/octet-stream\b', re.I).search
  49. @deprecated
  50. def is_gzipped(response):
  51. """Return True if the response is gzipped, or False otherwise"""
  52. ctype = response.headers.get('Content-Type', b'')
  53. cenc = response.headers.get('Content-Encoding', b'').lower()
  54. return (_is_gzipped(ctype) or
  55. (_is_octetstream(ctype) and cenc in (b'gzip', b'x-gzip')))
  56. def gzip_magic_number(response):
  57. return response.body[:3] == b'\x1f\x8b\x08'