123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485 |
- """
- Files Pipeline
- See documentation in topics/media-pipeline.rst
- """
- import functools
- import hashlib
- import mimetypes
- import os
- import os.path
- import time
- import logging
- from email.utils import parsedate_tz, mktime_tz
- from six.moves.urllib.parse import urlparse
- from collections import defaultdict
- import six
- try:
- from cStringIO import StringIO as BytesIO
- except ImportError:
- from io import BytesIO
- from twisted.internet import defer, threads
- from scrapy.pipelines.media import MediaPipeline
- from scrapy.settings import Settings
- from scrapy.exceptions import NotConfigured, IgnoreRequest
- from scrapy.http import Request
- from scrapy.utils.misc import md5sum
- from scrapy.utils.log import failure_to_exc_info
- from scrapy.utils.python import to_bytes
- from scrapy.utils.request import referer_str
- from scrapy.utils.boto import is_botocore
- from scrapy.utils.datatypes import CaselessDict
- logger = logging.getLogger(__name__)
- class FileException(Exception):
- """General media error exception"""
- class FSFilesStore(object):
- def __init__(self, basedir):
- if '://' in basedir:
- basedir = basedir.split('://', 1)[1]
- self.basedir = basedir
- self._mkdir(self.basedir)
- self.created_directories = defaultdict(set)
- def persist_file(self, path, buf, info, meta=None, headers=None):
- absolute_path = self._get_filesystem_path(path)
- self._mkdir(os.path.dirname(absolute_path), info)
- with open(absolute_path, 'wb') as f:
- f.write(buf.getvalue())
- def stat_file(self, path, info):
- absolute_path = self._get_filesystem_path(path)
- try:
- last_modified = os.path.getmtime(absolute_path)
- except os.error:
- return {}
- with open(absolute_path, 'rb') as f:
- checksum = md5sum(f)
- return {'last_modified': last_modified, 'checksum': checksum}
- def _get_filesystem_path(self, path):
- path_comps = path.split('/')
- return os.path.join(self.basedir, *path_comps)
- def _mkdir(self, dirname, domain=None):
- seen = self.created_directories[domain] if domain else set()
- if dirname not in seen:
- if not os.path.exists(dirname):
- os.makedirs(dirname)
- seen.add(dirname)
- class S3FilesStore(object):
- AWS_ACCESS_KEY_ID = None
- AWS_SECRET_ACCESS_KEY = None
- AWS_ENDPOINT_URL = None
- AWS_REGION_NAME = None
- AWS_USE_SSL = None
- AWS_VERIFY = None
- POLICY = 'private' # Overriden from settings.FILES_STORE_S3_ACL in
- # FilesPipeline.from_settings.
- HEADERS = {
- 'Cache-Control': 'max-age=172800',
- }
- def __init__(self, uri):
- self.is_botocore = is_botocore()
- if self.is_botocore:
- import botocore.session
- session = botocore.session.get_session()
- self.s3_client = session.create_client(
- 's3',
- aws_access_key_id=self.AWS_ACCESS_KEY_ID,
- aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY,
- endpoint_url=self.AWS_ENDPOINT_URL,
- region_name=self.AWS_REGION_NAME,
- use_ssl=self.AWS_USE_SSL,
- verify=self.AWS_VERIFY
- )
- else:
- from boto.s3.connection import S3Connection
- self.S3Connection = S3Connection
- assert uri.startswith('s3://')
- self.bucket, self.prefix = uri[5:].split('/', 1)
- def stat_file(self, path, info):
- def _onsuccess(boto_key):
- if self.is_botocore:
- checksum = boto_key['ETag'].strip('"')
- last_modified = boto_key['LastModified']
- modified_stamp = time.mktime(last_modified.timetuple())
- else:
- checksum = boto_key.etag.strip('"')
- last_modified = boto_key.last_modified
- modified_tuple = parsedate_tz(last_modified)
- modified_stamp = int(mktime_tz(modified_tuple))
- return {'checksum': checksum, 'last_modified': modified_stamp}
- return self._get_boto_key(path).addCallback(_onsuccess)
- def _get_boto_bucket(self):
- # disable ssl (is_secure=False) because of this python bug:
- # https://bugs.python.org/issue5103
- c = self.S3Connection(self.AWS_ACCESS_KEY_ID, self.AWS_SECRET_ACCESS_KEY, is_secure=False)
- return c.get_bucket(self.bucket, validate=False)
- def _get_boto_key(self, path):
- key_name = '%s%s' % (self.prefix, path)
- if self.is_botocore:
- return threads.deferToThread(
- self.s3_client.head_object,
- Bucket=self.bucket,
- Key=key_name)
- else:
- b = self._get_boto_bucket()
- return threads.deferToThread(b.get_key, key_name)
- def persist_file(self, path, buf, info, meta=None, headers=None):
- """Upload file to S3 storage"""
- key_name = '%s%s' % (self.prefix, path)
- buf.seek(0)
- if self.is_botocore:
- extra = self._headers_to_botocore_kwargs(self.HEADERS)
- if headers:
- extra.update(self._headers_to_botocore_kwargs(headers))
- return threads.deferToThread(
- self.s3_client.put_object,
- Bucket=self.bucket,
- Key=key_name,
- Body=buf,
- Metadata={k: str(v) for k, v in six.iteritems(meta or {})},
- ACL=self.POLICY,
- **extra)
- else:
- b = self._get_boto_bucket()
- k = b.new_key(key_name)
- if meta:
- for metakey, metavalue in six.iteritems(meta):
- k.set_metadata(metakey, str(metavalue))
- h = self.HEADERS.copy()
- if headers:
- h.update(headers)
- return threads.deferToThread(
- k.set_contents_from_string, buf.getvalue(),
- headers=h, policy=self.POLICY)
- def _headers_to_botocore_kwargs(self, headers):
- """ Convert headers to botocore keyword agruments.
- """
- # This is required while we need to support both boto and botocore.
- mapping = CaselessDict({
- 'Content-Type': 'ContentType',
- 'Cache-Control': 'CacheControl',
- 'Content-Disposition': 'ContentDisposition',
- 'Content-Encoding': 'ContentEncoding',
- 'Content-Language': 'ContentLanguage',
- 'Content-Length': 'ContentLength',
- 'Content-MD5': 'ContentMD5',
- 'Expires': 'Expires',
- 'X-Amz-Grant-Full-Control': 'GrantFullControl',
- 'X-Amz-Grant-Read': 'GrantRead',
- 'X-Amz-Grant-Read-ACP': 'GrantReadACP',
- 'X-Amz-Grant-Write-ACP': 'GrantWriteACP',
- 'X-Amz-Object-Lock-Legal-Hold': 'ObjectLockLegalHoldStatus',
- 'X-Amz-Object-Lock-Mode': 'ObjectLockMode',
- 'X-Amz-Object-Lock-Retain-Until-Date': 'ObjectLockRetainUntilDate',
- 'X-Amz-Request-Payer': 'RequestPayer',
- 'X-Amz-Server-Side-Encryption': 'ServerSideEncryption',
- 'X-Amz-Server-Side-Encryption-Aws-Kms-Key-Id': 'SSEKMSKeyId',
- 'X-Amz-Server-Side-Encryption-Context': 'SSEKMSEncryptionContext',
- 'X-Amz-Server-Side-Encryption-Customer-Algorithm': 'SSECustomerAlgorithm',
- 'X-Amz-Server-Side-Encryption-Customer-Key': 'SSECustomerKey',
- 'X-Amz-Server-Side-Encryption-Customer-Key-Md5': 'SSECustomerKeyMD5',
- 'X-Amz-Storage-Class': 'StorageClass',
- 'X-Amz-Tagging': 'Tagging',
- 'X-Amz-Website-Redirect-Location': 'WebsiteRedirectLocation',
- })
- extra = {}
- for key, value in six.iteritems(headers):
- try:
- kwarg = mapping[key]
- except KeyError:
- raise TypeError(
- 'Header "%s" is not supported by botocore' % key)
- else:
- extra[kwarg] = value
- return extra
- class GCSFilesStore(object):
- GCS_PROJECT_ID = None
- CACHE_CONTROL = 'max-age=172800'
- # The bucket's default object ACL will be applied to the object.
- # Overriden from settings.FILES_STORE_GCS_ACL in FilesPipeline.from_settings.
- POLICY = None
- def __init__(self, uri):
- from google.cloud import storage
- client = storage.Client(project=self.GCS_PROJECT_ID)
- bucket, prefix = uri[5:].split('/', 1)
- self.bucket = client.bucket(bucket)
- self.prefix = prefix
- def stat_file(self, path, info):
- def _onsuccess(blob):
- if blob:
- checksum = blob.md5_hash
- last_modified = time.mktime(blob.updated.timetuple())
- return {'checksum': checksum, 'last_modified': last_modified}
- else:
- return {}
- return threads.deferToThread(self.bucket.get_blob, path).addCallback(_onsuccess)
- def _get_content_type(self, headers):
- if headers and 'Content-Type' in headers:
- return headers['Content-Type']
- else:
- return 'application/octet-stream'
- def persist_file(self, path, buf, info, meta=None, headers=None):
- blob = self.bucket.blob(self.prefix + path)
- blob.cache_control = self.CACHE_CONTROL
- blob.metadata = {k: str(v) for k, v in six.iteritems(meta or {})}
- return threads.deferToThread(
- blob.upload_from_string,
- data=buf.getvalue(),
- content_type=self._get_content_type(headers),
- predefined_acl=self.POLICY
- )
- class FilesPipeline(MediaPipeline):
- """Abstract pipeline that implement the file downloading
- This pipeline tries to minimize network transfers and file processing,
- doing stat of the files and determining if file is new, uptodate or
- expired.
- ``new`` files are those that pipeline never processed and needs to be
- downloaded from supplier site the first time.
- ``uptodate`` files are the ones that the pipeline processed and are still
- valid files.
- ``expired`` files are those that pipeline already processed but the last
- modification was made long time ago, so a reprocessing is recommended to
- refresh it in case of change.
- """
- MEDIA_NAME = "file"
- EXPIRES = 90
- STORE_SCHEMES = {
- '': FSFilesStore,
- 'file': FSFilesStore,
- 's3': S3FilesStore,
- 'gs': GCSFilesStore,
- }
- DEFAULT_FILES_URLS_FIELD = 'file_urls'
- DEFAULT_FILES_RESULT_FIELD = 'files'
- def __init__(self, store_uri, download_func=None, settings=None):
- if not store_uri:
- raise NotConfigured
- if isinstance(settings, dict) or settings is None:
- settings = Settings(settings)
- cls_name = "FilesPipeline"
- self.store = self._get_store(store_uri)
- resolve = functools.partial(self._key_for_pipe,
- base_class_name=cls_name,
- settings=settings)
- self.expires = settings.getint(
- resolve('FILES_EXPIRES'), self.EXPIRES
- )
- if not hasattr(self, "FILES_URLS_FIELD"):
- self.FILES_URLS_FIELD = self.DEFAULT_FILES_URLS_FIELD
- if not hasattr(self, "FILES_RESULT_FIELD"):
- self.FILES_RESULT_FIELD = self.DEFAULT_FILES_RESULT_FIELD
- self.files_urls_field = settings.get(
- resolve('FILES_URLS_FIELD'), self.FILES_URLS_FIELD
- )
- self.files_result_field = settings.get(
- resolve('FILES_RESULT_FIELD'), self.FILES_RESULT_FIELD
- )
- super(FilesPipeline, self).__init__(download_func=download_func, settings=settings)
- @classmethod
- def from_settings(cls, settings):
- s3store = cls.STORE_SCHEMES['s3']
- s3store.AWS_ACCESS_KEY_ID = settings['AWS_ACCESS_KEY_ID']
- s3store.AWS_SECRET_ACCESS_KEY = settings['AWS_SECRET_ACCESS_KEY']
- s3store.AWS_ENDPOINT_URL = settings['AWS_ENDPOINT_URL']
- s3store.AWS_REGION_NAME = settings['AWS_REGION_NAME']
- s3store.AWS_USE_SSL = settings['AWS_USE_SSL']
- s3store.AWS_VERIFY = settings['AWS_VERIFY']
- s3store.POLICY = settings['FILES_STORE_S3_ACL']
- gcs_store = cls.STORE_SCHEMES['gs']
- gcs_store.GCS_PROJECT_ID = settings['GCS_PROJECT_ID']
- gcs_store.POLICY = settings['FILES_STORE_GCS_ACL'] or None
- store_uri = settings['FILES_STORE']
- return cls(store_uri, settings=settings)
- def _get_store(self, uri):
- if os.path.isabs(uri): # to support win32 paths like: C:\\some\dir
- scheme = 'file'
- else:
- scheme = urlparse(uri).scheme
- store_cls = self.STORE_SCHEMES[scheme]
- return store_cls(uri)
- def media_to_download(self, request, info):
- def _onsuccess(result):
- if not result:
- return # returning None force download
- last_modified = result.get('last_modified', None)
- if not last_modified:
- return # returning None force download
- age_seconds = time.time() - last_modified
- age_days = age_seconds / 60 / 60 / 24
- if age_days > self.expires:
- return # returning None force download
- referer = referer_str(request)
- logger.debug(
- 'File (uptodate): Downloaded %(medianame)s from %(request)s '
- 'referred in <%(referer)s>',
- {'medianame': self.MEDIA_NAME, 'request': request,
- 'referer': referer},
- extra={'spider': info.spider}
- )
- self.inc_stats(info.spider, 'uptodate')
- checksum = result.get('checksum', None)
- return {'url': request.url, 'path': path, 'checksum': checksum}
- path = self.file_path(request, info=info)
- dfd = defer.maybeDeferred(self.store.stat_file, path, info)
- dfd.addCallbacks(_onsuccess, lambda _: None)
- dfd.addErrback(
- lambda f:
- logger.error(self.__class__.__name__ + '.store.stat_file',
- exc_info=failure_to_exc_info(f),
- extra={'spider': info.spider})
- )
- return dfd
- def media_failed(self, failure, request, info):
- if not isinstance(failure.value, IgnoreRequest):
- referer = referer_str(request)
- logger.warning(
- 'File (unknown-error): Error downloading %(medianame)s from '
- '%(request)s referred in <%(referer)s>: %(exception)s',
- {'medianame': self.MEDIA_NAME, 'request': request,
- 'referer': referer, 'exception': failure.value},
- extra={'spider': info.spider}
- )
- raise FileException
- def media_downloaded(self, response, request, info):
- referer = referer_str(request)
- if response.status != 200:
- logger.warning(
- 'File (code: %(status)s): Error downloading file from '
- '%(request)s referred in <%(referer)s>',
- {'status': response.status,
- 'request': request, 'referer': referer},
- extra={'spider': info.spider}
- )
- raise FileException('download-error')
- if not response.body:
- logger.warning(
- 'File (empty-content): Empty file from %(request)s referred '
- 'in <%(referer)s>: no-content',
- {'request': request, 'referer': referer},
- extra={'spider': info.spider}
- )
- raise FileException('empty-content')
- status = 'cached' if 'cached' in response.flags else 'downloaded'
- logger.debug(
- 'File (%(status)s): Downloaded file from %(request)s referred in '
- '<%(referer)s>',
- {'status': status, 'request': request, 'referer': referer},
- extra={'spider': info.spider}
- )
- self.inc_stats(info.spider, status)
- try:
- path = self.file_path(request, response=response, info=info)
- checksum = self.file_downloaded(response, request, info)
- except FileException as exc:
- logger.warning(
- 'File (error): Error processing file from %(request)s '
- 'referred in <%(referer)s>: %(errormsg)s',
- {'request': request, 'referer': referer, 'errormsg': str(exc)},
- extra={'spider': info.spider}, exc_info=True
- )
- raise
- except Exception as exc:
- logger.error(
- 'File (unknown-error): Error processing file from %(request)s '
- 'referred in <%(referer)s>',
- {'request': request, 'referer': referer},
- exc_info=True, extra={'spider': info.spider}
- )
- raise FileException(str(exc))
- return {'url': request.url, 'path': path, 'checksum': checksum}
- def inc_stats(self, spider, status):
- spider.crawler.stats.inc_value('file_count', spider=spider)
- spider.crawler.stats.inc_value('file_status_count/%s' % status, spider=spider)
- ### Overridable Interface
- def get_media_requests(self, item, info):
- return [Request(x) for x in item.get(self.files_urls_field, [])]
- def file_downloaded(self, response, request, info):
- path = self.file_path(request, response=response, info=info)
- buf = BytesIO(response.body)
- checksum = md5sum(buf)
- buf.seek(0)
- self.store.persist_file(path, buf, info)
- return checksum
- def item_completed(self, results, item, info):
- if isinstance(item, dict) or self.files_result_field in item.fields:
- item[self.files_result_field] = [x for ok, x in results if ok]
- return item
- def file_path(self, request, response=None, info=None):
- media_guid = hashlib.sha1(to_bytes(request.url)).hexdigest()
- media_ext = os.path.splitext(request.url)[1]
- # Handles empty and wild extensions by trying to guess the
- # mime type then extension or default to empty string otherwise
- if media_ext not in mimetypes.types_map:
- media_ext = ''
- media_type = mimetypes.guess_type(request.url)[0]
- if media_type:
- media_ext = mimetypes.guess_extension(media_type)
- return 'full/%s%s' % (media_guid, media_ext)
|