files.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. """
  2. Files Pipeline
  3. See documentation in topics/media-pipeline.rst
  4. """
  5. import functools
  6. import hashlib
  7. import mimetypes
  8. import os
  9. import os.path
  10. import time
  11. import logging
  12. from email.utils import parsedate_tz, mktime_tz
  13. from six.moves.urllib.parse import urlparse
  14. from collections import defaultdict
  15. import six
  16. try:
  17. from cStringIO import StringIO as BytesIO
  18. except ImportError:
  19. from io import BytesIO
  20. from twisted.internet import defer, threads
  21. from scrapy.pipelines.media import MediaPipeline
  22. from scrapy.settings import Settings
  23. from scrapy.exceptions import NotConfigured, IgnoreRequest
  24. from scrapy.http import Request
  25. from scrapy.utils.misc import md5sum
  26. from scrapy.utils.log import failure_to_exc_info
  27. from scrapy.utils.python import to_bytes
  28. from scrapy.utils.request import referer_str
  29. from scrapy.utils.boto import is_botocore
  30. from scrapy.utils.datatypes import CaselessDict
  31. logger = logging.getLogger(__name__)
  32. class FileException(Exception):
  33. """General media error exception"""
  34. class FSFilesStore(object):
  35. def __init__(self, basedir):
  36. if '://' in basedir:
  37. basedir = basedir.split('://', 1)[1]
  38. self.basedir = basedir
  39. self._mkdir(self.basedir)
  40. self.created_directories = defaultdict(set)
  41. def persist_file(self, path, buf, info, meta=None, headers=None):
  42. absolute_path = self._get_filesystem_path(path)
  43. self._mkdir(os.path.dirname(absolute_path), info)
  44. with open(absolute_path, 'wb') as f:
  45. f.write(buf.getvalue())
  46. def stat_file(self, path, info):
  47. absolute_path = self._get_filesystem_path(path)
  48. try:
  49. last_modified = os.path.getmtime(absolute_path)
  50. except os.error:
  51. return {}
  52. with open(absolute_path, 'rb') as f:
  53. checksum = md5sum(f)
  54. return {'last_modified': last_modified, 'checksum': checksum}
  55. def _get_filesystem_path(self, path):
  56. path_comps = path.split('/')
  57. return os.path.join(self.basedir, *path_comps)
  58. def _mkdir(self, dirname, domain=None):
  59. seen = self.created_directories[domain] if domain else set()
  60. if dirname not in seen:
  61. if not os.path.exists(dirname):
  62. os.makedirs(dirname)
  63. seen.add(dirname)
  64. class S3FilesStore(object):
  65. AWS_ACCESS_KEY_ID = None
  66. AWS_SECRET_ACCESS_KEY = None
  67. AWS_ENDPOINT_URL = None
  68. AWS_REGION_NAME = None
  69. AWS_USE_SSL = None
  70. AWS_VERIFY = None
  71. POLICY = 'private' # Overriden from settings.FILES_STORE_S3_ACL in
  72. # FilesPipeline.from_settings.
  73. HEADERS = {
  74. 'Cache-Control': 'max-age=172800',
  75. }
  76. def __init__(self, uri):
  77. self.is_botocore = is_botocore()
  78. if self.is_botocore:
  79. import botocore.session
  80. session = botocore.session.get_session()
  81. self.s3_client = session.create_client(
  82. 's3',
  83. aws_access_key_id=self.AWS_ACCESS_KEY_ID,
  84. aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY,
  85. endpoint_url=self.AWS_ENDPOINT_URL,
  86. region_name=self.AWS_REGION_NAME,
  87. use_ssl=self.AWS_USE_SSL,
  88. verify=self.AWS_VERIFY
  89. )
  90. else:
  91. from boto.s3.connection import S3Connection
  92. self.S3Connection = S3Connection
  93. assert uri.startswith('s3://')
  94. self.bucket, self.prefix = uri[5:].split('/', 1)
  95. def stat_file(self, path, info):
  96. def _onsuccess(boto_key):
  97. if self.is_botocore:
  98. checksum = boto_key['ETag'].strip('"')
  99. last_modified = boto_key['LastModified']
  100. modified_stamp = time.mktime(last_modified.timetuple())
  101. else:
  102. checksum = boto_key.etag.strip('"')
  103. last_modified = boto_key.last_modified
  104. modified_tuple = parsedate_tz(last_modified)
  105. modified_stamp = int(mktime_tz(modified_tuple))
  106. return {'checksum': checksum, 'last_modified': modified_stamp}
  107. return self._get_boto_key(path).addCallback(_onsuccess)
  108. def _get_boto_bucket(self):
  109. # disable ssl (is_secure=False) because of this python bug:
  110. # https://bugs.python.org/issue5103
  111. c = self.S3Connection(self.AWS_ACCESS_KEY_ID, self.AWS_SECRET_ACCESS_KEY, is_secure=False)
  112. return c.get_bucket(self.bucket, validate=False)
  113. def _get_boto_key(self, path):
  114. key_name = '%s%s' % (self.prefix, path)
  115. if self.is_botocore:
  116. return threads.deferToThread(
  117. self.s3_client.head_object,
  118. Bucket=self.bucket,
  119. Key=key_name)
  120. else:
  121. b = self._get_boto_bucket()
  122. return threads.deferToThread(b.get_key, key_name)
  123. def persist_file(self, path, buf, info, meta=None, headers=None):
  124. """Upload file to S3 storage"""
  125. key_name = '%s%s' % (self.prefix, path)
  126. buf.seek(0)
  127. if self.is_botocore:
  128. extra = self._headers_to_botocore_kwargs(self.HEADERS)
  129. if headers:
  130. extra.update(self._headers_to_botocore_kwargs(headers))
  131. return threads.deferToThread(
  132. self.s3_client.put_object,
  133. Bucket=self.bucket,
  134. Key=key_name,
  135. Body=buf,
  136. Metadata={k: str(v) for k, v in six.iteritems(meta or {})},
  137. ACL=self.POLICY,
  138. **extra)
  139. else:
  140. b = self._get_boto_bucket()
  141. k = b.new_key(key_name)
  142. if meta:
  143. for metakey, metavalue in six.iteritems(meta):
  144. k.set_metadata(metakey, str(metavalue))
  145. h = self.HEADERS.copy()
  146. if headers:
  147. h.update(headers)
  148. return threads.deferToThread(
  149. k.set_contents_from_string, buf.getvalue(),
  150. headers=h, policy=self.POLICY)
  151. def _headers_to_botocore_kwargs(self, headers):
  152. """ Convert headers to botocore keyword agruments.
  153. """
  154. # This is required while we need to support both boto and botocore.
  155. mapping = CaselessDict({
  156. 'Content-Type': 'ContentType',
  157. 'Cache-Control': 'CacheControl',
  158. 'Content-Disposition': 'ContentDisposition',
  159. 'Content-Encoding': 'ContentEncoding',
  160. 'Content-Language': 'ContentLanguage',
  161. 'Content-Length': 'ContentLength',
  162. 'Content-MD5': 'ContentMD5',
  163. 'Expires': 'Expires',
  164. 'X-Amz-Grant-Full-Control': 'GrantFullControl',
  165. 'X-Amz-Grant-Read': 'GrantRead',
  166. 'X-Amz-Grant-Read-ACP': 'GrantReadACP',
  167. 'X-Amz-Grant-Write-ACP': 'GrantWriteACP',
  168. 'X-Amz-Object-Lock-Legal-Hold': 'ObjectLockLegalHoldStatus',
  169. 'X-Amz-Object-Lock-Mode': 'ObjectLockMode',
  170. 'X-Amz-Object-Lock-Retain-Until-Date': 'ObjectLockRetainUntilDate',
  171. 'X-Amz-Request-Payer': 'RequestPayer',
  172. 'X-Amz-Server-Side-Encryption': 'ServerSideEncryption',
  173. 'X-Amz-Server-Side-Encryption-Aws-Kms-Key-Id': 'SSEKMSKeyId',
  174. 'X-Amz-Server-Side-Encryption-Context': 'SSEKMSEncryptionContext',
  175. 'X-Amz-Server-Side-Encryption-Customer-Algorithm': 'SSECustomerAlgorithm',
  176. 'X-Amz-Server-Side-Encryption-Customer-Key': 'SSECustomerKey',
  177. 'X-Amz-Server-Side-Encryption-Customer-Key-Md5': 'SSECustomerKeyMD5',
  178. 'X-Amz-Storage-Class': 'StorageClass',
  179. 'X-Amz-Tagging': 'Tagging',
  180. 'X-Amz-Website-Redirect-Location': 'WebsiteRedirectLocation',
  181. })
  182. extra = {}
  183. for key, value in six.iteritems(headers):
  184. try:
  185. kwarg = mapping[key]
  186. except KeyError:
  187. raise TypeError(
  188. 'Header "%s" is not supported by botocore' % key)
  189. else:
  190. extra[kwarg] = value
  191. return extra
  192. class GCSFilesStore(object):
  193. GCS_PROJECT_ID = None
  194. CACHE_CONTROL = 'max-age=172800'
  195. # The bucket's default object ACL will be applied to the object.
  196. # Overriden from settings.FILES_STORE_GCS_ACL in FilesPipeline.from_settings.
  197. POLICY = None
  198. def __init__(self, uri):
  199. from google.cloud import storage
  200. client = storage.Client(project=self.GCS_PROJECT_ID)
  201. bucket, prefix = uri[5:].split('/', 1)
  202. self.bucket = client.bucket(bucket)
  203. self.prefix = prefix
  204. def stat_file(self, path, info):
  205. def _onsuccess(blob):
  206. if blob:
  207. checksum = blob.md5_hash
  208. last_modified = time.mktime(blob.updated.timetuple())
  209. return {'checksum': checksum, 'last_modified': last_modified}
  210. else:
  211. return {}
  212. return threads.deferToThread(self.bucket.get_blob, path).addCallback(_onsuccess)
  213. def _get_content_type(self, headers):
  214. if headers and 'Content-Type' in headers:
  215. return headers['Content-Type']
  216. else:
  217. return 'application/octet-stream'
  218. def persist_file(self, path, buf, info, meta=None, headers=None):
  219. blob = self.bucket.blob(self.prefix + path)
  220. blob.cache_control = self.CACHE_CONTROL
  221. blob.metadata = {k: str(v) for k, v in six.iteritems(meta or {})}
  222. return threads.deferToThread(
  223. blob.upload_from_string,
  224. data=buf.getvalue(),
  225. content_type=self._get_content_type(headers),
  226. predefined_acl=self.POLICY
  227. )
  228. class FilesPipeline(MediaPipeline):
  229. """Abstract pipeline that implement the file downloading
  230. This pipeline tries to minimize network transfers and file processing,
  231. doing stat of the files and determining if file is new, uptodate or
  232. expired.
  233. ``new`` files are those that pipeline never processed and needs to be
  234. downloaded from supplier site the first time.
  235. ``uptodate`` files are the ones that the pipeline processed and are still
  236. valid files.
  237. ``expired`` files are those that pipeline already processed but the last
  238. modification was made long time ago, so a reprocessing is recommended to
  239. refresh it in case of change.
  240. """
  241. MEDIA_NAME = "file"
  242. EXPIRES = 90
  243. STORE_SCHEMES = {
  244. '': FSFilesStore,
  245. 'file': FSFilesStore,
  246. 's3': S3FilesStore,
  247. 'gs': GCSFilesStore,
  248. }
  249. DEFAULT_FILES_URLS_FIELD = 'file_urls'
  250. DEFAULT_FILES_RESULT_FIELD = 'files'
  251. def __init__(self, store_uri, download_func=None, settings=None):
  252. if not store_uri:
  253. raise NotConfigured
  254. if isinstance(settings, dict) or settings is None:
  255. settings = Settings(settings)
  256. cls_name = "FilesPipeline"
  257. self.store = self._get_store(store_uri)
  258. resolve = functools.partial(self._key_for_pipe,
  259. base_class_name=cls_name,
  260. settings=settings)
  261. self.expires = settings.getint(
  262. resolve('FILES_EXPIRES'), self.EXPIRES
  263. )
  264. if not hasattr(self, "FILES_URLS_FIELD"):
  265. self.FILES_URLS_FIELD = self.DEFAULT_FILES_URLS_FIELD
  266. if not hasattr(self, "FILES_RESULT_FIELD"):
  267. self.FILES_RESULT_FIELD = self.DEFAULT_FILES_RESULT_FIELD
  268. self.files_urls_field = settings.get(
  269. resolve('FILES_URLS_FIELD'), self.FILES_URLS_FIELD
  270. )
  271. self.files_result_field = settings.get(
  272. resolve('FILES_RESULT_FIELD'), self.FILES_RESULT_FIELD
  273. )
  274. super(FilesPipeline, self).__init__(download_func=download_func, settings=settings)
  275. @classmethod
  276. def from_settings(cls, settings):
  277. s3store = cls.STORE_SCHEMES['s3']
  278. s3store.AWS_ACCESS_KEY_ID = settings['AWS_ACCESS_KEY_ID']
  279. s3store.AWS_SECRET_ACCESS_KEY = settings['AWS_SECRET_ACCESS_KEY']
  280. s3store.AWS_ENDPOINT_URL = settings['AWS_ENDPOINT_URL']
  281. s3store.AWS_REGION_NAME = settings['AWS_REGION_NAME']
  282. s3store.AWS_USE_SSL = settings['AWS_USE_SSL']
  283. s3store.AWS_VERIFY = settings['AWS_VERIFY']
  284. s3store.POLICY = settings['FILES_STORE_S3_ACL']
  285. gcs_store = cls.STORE_SCHEMES['gs']
  286. gcs_store.GCS_PROJECT_ID = settings['GCS_PROJECT_ID']
  287. gcs_store.POLICY = settings['FILES_STORE_GCS_ACL'] or None
  288. store_uri = settings['FILES_STORE']
  289. return cls(store_uri, settings=settings)
  290. def _get_store(self, uri):
  291. if os.path.isabs(uri): # to support win32 paths like: C:\\some\dir
  292. scheme = 'file'
  293. else:
  294. scheme = urlparse(uri).scheme
  295. store_cls = self.STORE_SCHEMES[scheme]
  296. return store_cls(uri)
  297. def media_to_download(self, request, info):
  298. def _onsuccess(result):
  299. if not result:
  300. return # returning None force download
  301. last_modified = result.get('last_modified', None)
  302. if not last_modified:
  303. return # returning None force download
  304. age_seconds = time.time() - last_modified
  305. age_days = age_seconds / 60 / 60 / 24
  306. if age_days > self.expires:
  307. return # returning None force download
  308. referer = referer_str(request)
  309. logger.debug(
  310. 'File (uptodate): Downloaded %(medianame)s from %(request)s '
  311. 'referred in <%(referer)s>',
  312. {'medianame': self.MEDIA_NAME, 'request': request,
  313. 'referer': referer},
  314. extra={'spider': info.spider}
  315. )
  316. self.inc_stats(info.spider, 'uptodate')
  317. checksum = result.get('checksum', None)
  318. return {'url': request.url, 'path': path, 'checksum': checksum}
  319. path = self.file_path(request, info=info)
  320. dfd = defer.maybeDeferred(self.store.stat_file, path, info)
  321. dfd.addCallbacks(_onsuccess, lambda _: None)
  322. dfd.addErrback(
  323. lambda f:
  324. logger.error(self.__class__.__name__ + '.store.stat_file',
  325. exc_info=failure_to_exc_info(f),
  326. extra={'spider': info.spider})
  327. )
  328. return dfd
  329. def media_failed(self, failure, request, info):
  330. if not isinstance(failure.value, IgnoreRequest):
  331. referer = referer_str(request)
  332. logger.warning(
  333. 'File (unknown-error): Error downloading %(medianame)s from '
  334. '%(request)s referred in <%(referer)s>: %(exception)s',
  335. {'medianame': self.MEDIA_NAME, 'request': request,
  336. 'referer': referer, 'exception': failure.value},
  337. extra={'spider': info.spider}
  338. )
  339. raise FileException
  340. def media_downloaded(self, response, request, info):
  341. referer = referer_str(request)
  342. if response.status != 200:
  343. logger.warning(
  344. 'File (code: %(status)s): Error downloading file from '
  345. '%(request)s referred in <%(referer)s>',
  346. {'status': response.status,
  347. 'request': request, 'referer': referer},
  348. extra={'spider': info.spider}
  349. )
  350. raise FileException('download-error')
  351. if not response.body:
  352. logger.warning(
  353. 'File (empty-content): Empty file from %(request)s referred '
  354. 'in <%(referer)s>: no-content',
  355. {'request': request, 'referer': referer},
  356. extra={'spider': info.spider}
  357. )
  358. raise FileException('empty-content')
  359. status = 'cached' if 'cached' in response.flags else 'downloaded'
  360. logger.debug(
  361. 'File (%(status)s): Downloaded file from %(request)s referred in '
  362. '<%(referer)s>',
  363. {'status': status, 'request': request, 'referer': referer},
  364. extra={'spider': info.spider}
  365. )
  366. self.inc_stats(info.spider, status)
  367. try:
  368. path = self.file_path(request, response=response, info=info)
  369. checksum = self.file_downloaded(response, request, info)
  370. except FileException as exc:
  371. logger.warning(
  372. 'File (error): Error processing file from %(request)s '
  373. 'referred in <%(referer)s>: %(errormsg)s',
  374. {'request': request, 'referer': referer, 'errormsg': str(exc)},
  375. extra={'spider': info.spider}, exc_info=True
  376. )
  377. raise
  378. except Exception as exc:
  379. logger.error(
  380. 'File (unknown-error): Error processing file from %(request)s '
  381. 'referred in <%(referer)s>',
  382. {'request': request, 'referer': referer},
  383. exc_info=True, extra={'spider': info.spider}
  384. )
  385. raise FileException(str(exc))
  386. return {'url': request.url, 'path': path, 'checksum': checksum}
  387. def inc_stats(self, spider, status):
  388. spider.crawler.stats.inc_value('file_count', spider=spider)
  389. spider.crawler.stats.inc_value('file_status_count/%s' % status, spider=spider)
  390. ### Overridable Interface
  391. def get_media_requests(self, item, info):
  392. return [Request(x) for x in item.get(self.files_urls_field, [])]
  393. def file_downloaded(self, response, request, info):
  394. path = self.file_path(request, response=response, info=info)
  395. buf = BytesIO(response.body)
  396. checksum = md5sum(buf)
  397. buf.seek(0)
  398. self.store.persist_file(path, buf, info)
  399. return checksum
  400. def item_completed(self, results, item, info):
  401. if isinstance(item, dict) or self.files_result_field in item.fields:
  402. item[self.files_result_field] = [x for ok, x in results if ok]
  403. return item
  404. def file_path(self, request, response=None, info=None):
  405. media_guid = hashlib.sha1(to_bytes(request.url)).hexdigest()
  406. media_ext = os.path.splitext(request.url)[1]
  407. # Handles empty and wild extensions by trying to guess the
  408. # mime type then extension or default to empty string otherwise
  409. if media_ext not in mimetypes.types_map:
  410. media_ext = ''
  411. media_type = mimetypes.guess_type(request.url)[0]
  412. if media_type:
  413. media_ext = mimetypes.guess_extension(media_type)
  414. return 'full/%s%s' % (media_guid, media_ext)