123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- """
- Feed Exports extension
- See documentation in docs/topics/feed-exports.rst
- """
- import os
- import sys
- import logging
- import posixpath
- from tempfile import NamedTemporaryFile
- from datetime import datetime
- import six
- from six.moves.urllib.parse import urlparse, unquote
- from ftplib import FTP
- from zope.interface import Interface, implementer
- from twisted.internet import defer, threads
- from w3lib.url import file_uri_to_path
- from scrapy import signals
- from scrapy.utils.ftp import ftp_makedirs_cwd
- from scrapy.exceptions import NotConfigured
- from scrapy.utils.misc import create_instance, load_object
- from scrapy.utils.log import failure_to_exc_info
- from scrapy.utils.python import without_none_values
- from scrapy.utils.boto import is_botocore
- logger = logging.getLogger(__name__)
- class IFeedStorage(Interface):
- """Interface that all Feed Storages must implement"""
- def __init__(uri):
- """Initialize the storage with the parameters given in the URI"""
- def open(spider):
- """Open the storage for the given spider. It must return a file-like
- object that will be used for the exporters"""
- def store(file):
- """Store the given file stream"""
- @implementer(IFeedStorage)
- class BlockingFeedStorage(object):
- def open(self, spider):
- path = spider.crawler.settings['FEED_TEMPDIR']
- if path and not os.path.isdir(path):
- raise OSError('Not a Directory: ' + str(path))
- return NamedTemporaryFile(prefix='feed-', dir=path)
- def store(self, file):
- return threads.deferToThread(self._store_in_thread, file)
- def _store_in_thread(self, file):
- raise NotImplementedError
- @implementer(IFeedStorage)
- class StdoutFeedStorage(object):
- def __init__(self, uri, _stdout=None):
- if not _stdout:
- _stdout = sys.stdout if six.PY2 else sys.stdout.buffer
- self._stdout = _stdout
- def open(self, spider):
- return self._stdout
- def store(self, file):
- pass
- @implementer(IFeedStorage)
- class FileFeedStorage(object):
- def __init__(self, uri):
- self.path = file_uri_to_path(uri)
- def open(self, spider):
- dirname = os.path.dirname(self.path)
- if dirname and not os.path.exists(dirname):
- os.makedirs(dirname)
- return open(self.path, 'ab')
- def store(self, file):
- file.close()
- class S3FeedStorage(BlockingFeedStorage):
- def __init__(self, uri, access_key=None, secret_key=None, acl=None):
- # BEGIN Backward compatibility for initialising without keys (and
- # without using from_crawler)
- no_defaults = access_key is None and secret_key is None
- if no_defaults:
- from scrapy.utils.project import get_project_settings
- settings = get_project_settings()
- if 'AWS_ACCESS_KEY_ID' in settings or 'AWS_SECRET_ACCESS_KEY' in settings:
- import warnings
- from scrapy.exceptions import ScrapyDeprecationWarning
- warnings.warn(
- "Initialising `scrapy.extensions.feedexport.S3FeedStorage` "
- "without AWS keys is deprecated. Please supply credentials or "
- "use the `from_crawler()` constructor.",
- category=ScrapyDeprecationWarning,
- stacklevel=2
- )
- access_key = settings['AWS_ACCESS_KEY_ID']
- secret_key = settings['AWS_SECRET_ACCESS_KEY']
- # END Backward compatibility
- u = urlparse(uri)
- self.bucketname = u.hostname
- self.access_key = u.username or access_key
- self.secret_key = u.password or secret_key
- self.is_botocore = is_botocore()
- self.keyname = u.path[1:] # remove first "/"
- self.acl = acl
- if self.is_botocore:
- import botocore.session
- session = botocore.session.get_session()
- self.s3_client = session.create_client(
- 's3', aws_access_key_id=self.access_key,
- aws_secret_access_key=self.secret_key)
- else:
- import boto
- self.connect_s3 = boto.connect_s3
- @classmethod
- def from_crawler(cls, crawler, uri):
- return cls(
- uri=uri,
- access_key=crawler.settings['AWS_ACCESS_KEY_ID'],
- secret_key=crawler.settings['AWS_SECRET_ACCESS_KEY'],
- acl=crawler.settings['FEED_STORAGE_S3_ACL'] or None
- )
- def _store_in_thread(self, file):
- file.seek(0)
- if self.is_botocore:
- kwargs = {'ACL': self.acl} if self.acl else {}
- self.s3_client.put_object(
- Bucket=self.bucketname, Key=self.keyname, Body=file,
- **kwargs)
- else:
- conn = self.connect_s3(self.access_key, self.secret_key)
- bucket = conn.get_bucket(self.bucketname, validate=False)
- key = bucket.new_key(self.keyname)
- kwargs = {'policy': self.acl} if self.acl else {}
- key.set_contents_from_file(file, **kwargs)
- key.close()
- class FTPFeedStorage(BlockingFeedStorage):
- def __init__(self, uri, use_active_mode=False):
- u = urlparse(uri)
- self.host = u.hostname
- self.port = int(u.port or '21')
- self.username = u.username
- self.password = unquote(u.password)
- self.path = u.path
- self.use_active_mode = use_active_mode
- @classmethod
- def from_crawler(cls, crawler, uri):
- return cls(
- uri=uri,
- use_active_mode=crawler.settings.getbool('FEED_STORAGE_FTP_ACTIVE')
- )
- def _store_in_thread(self, file):
- file.seek(0)
- ftp = FTP()
- ftp.connect(self.host, self.port)
- ftp.login(self.username, self.password)
- if self.use_active_mode:
- ftp.set_pasv(False)
- dirname, filename = posixpath.split(self.path)
- ftp_makedirs_cwd(ftp, dirname)
- ftp.storbinary('STOR %s' % filename, file)
- ftp.quit()
- class SpiderSlot(object):
- def __init__(self, file, exporter, storage, uri):
- self.file = file
- self.exporter = exporter
- self.storage = storage
- self.uri = uri
- self.itemcount = 0
- class FeedExporter(object):
- def __init__(self, settings):
- self.settings = settings
- self.urifmt = settings['FEED_URI']
- if not self.urifmt:
- raise NotConfigured
- self.format = settings['FEED_FORMAT'].lower()
- self.export_encoding = settings['FEED_EXPORT_ENCODING']
- self.storages = self._load_components('FEED_STORAGES')
- self.exporters = self._load_components('FEED_EXPORTERS')
- if not self._storage_supported(self.urifmt):
- raise NotConfigured
- if not self._exporter_supported(self.format):
- raise NotConfigured
- self.store_empty = settings.getbool('FEED_STORE_EMPTY')
- self._exporting = False
- self.export_fields = settings.getlist('FEED_EXPORT_FIELDS') or None
- self.indent = None
- if settings.get('FEED_EXPORT_INDENT') is not None:
- self.indent = settings.getint('FEED_EXPORT_INDENT')
- uripar = settings['FEED_URI_PARAMS']
- self._uripar = load_object(uripar) if uripar else lambda x, y: None
- @classmethod
- def from_crawler(cls, crawler):
- o = cls(crawler.settings)
- o.crawler = crawler
- crawler.signals.connect(o.open_spider, signals.spider_opened)
- crawler.signals.connect(o.close_spider, signals.spider_closed)
- crawler.signals.connect(o.item_scraped, signals.item_scraped)
- return o
- def open_spider(self, spider):
- uri = self.urifmt % self._get_uri_params(spider)
- storage = self._get_storage(uri)
- file = storage.open(spider)
- exporter = self._get_exporter(file, fields_to_export=self.export_fields,
- encoding=self.export_encoding, indent=self.indent)
- if self.store_empty:
- exporter.start_exporting()
- self._exporting = True
- self.slot = SpiderSlot(file, exporter, storage, uri)
- def close_spider(self, spider):
- slot = self.slot
- if not slot.itemcount and not self.store_empty:
- # We need to call slot.storage.store nonetheless to get the file
- # properly closed.
- return defer.maybeDeferred(slot.storage.store, slot.file)
- if self._exporting:
- slot.exporter.finish_exporting()
- self._exporting = False
- logfmt = "%s %%(format)s feed (%%(itemcount)d items) in: %%(uri)s"
- log_args = {'format': self.format,
- 'itemcount': slot.itemcount,
- 'uri': slot.uri}
- d = defer.maybeDeferred(slot.storage.store, slot.file)
- d.addCallback(lambda _: logger.info(logfmt % "Stored", log_args,
- extra={'spider': spider}))
- d.addErrback(lambda f: logger.error(logfmt % "Error storing", log_args,
- exc_info=failure_to_exc_info(f),
- extra={'spider': spider}))
- return d
- def item_scraped(self, item, spider):
- slot = self.slot
- if not self._exporting:
- slot.exporter.start_exporting()
- self._exporting = True
- slot.exporter.export_item(item)
- slot.itemcount += 1
- return item
- def _load_components(self, setting_prefix):
- conf = without_none_values(self.settings.getwithbase(setting_prefix))
- d = {}
- for k, v in conf.items():
- try:
- d[k] = load_object(v)
- except NotConfigured:
- pass
- return d
- def _exporter_supported(self, format):
- if format in self.exporters:
- return True
- logger.error("Unknown feed format: %(format)s", {'format': format})
- def _storage_supported(self, uri):
- scheme = urlparse(uri).scheme
- if scheme in self.storages:
- try:
- self._get_storage(uri)
- return True
- except NotConfigured as e:
- logger.error("Disabled feed storage scheme: %(scheme)s. "
- "Reason: %(reason)s",
- {'scheme': scheme, 'reason': str(e)})
- else:
- logger.error("Unknown feed storage scheme: %(scheme)s",
- {'scheme': scheme})
- def _get_instance(self, objcls, *args, **kwargs):
- return create_instance(
- objcls, self.settings, getattr(self, 'crawler', None),
- *args, **kwargs)
- def _get_exporter(self, *args, **kwargs):
- return self._get_instance(self.exporters[self.format], *args, **kwargs)
- def _get_storage(self, uri):
- return self._get_instance(self.storages[urlparse(uri).scheme], uri)
- def _get_uri_params(self, spider):
- params = {}
- for k in dir(spider):
- params[k] = getattr(spider, k)
- ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')
- params['time'] = ts
- self._uripar(params, spider)
- return params
|