feedexport.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. """
  2. Feed Exports extension
  3. See documentation in docs/topics/feed-exports.rst
  4. """
  5. import os
  6. import sys
  7. import logging
  8. import posixpath
  9. from tempfile import NamedTemporaryFile
  10. from datetime import datetime
  11. import six
  12. from six.moves.urllib.parse import urlparse, unquote
  13. from ftplib import FTP
  14. from zope.interface import Interface, implementer
  15. from twisted.internet import defer, threads
  16. from w3lib.url import file_uri_to_path
  17. from scrapy import signals
  18. from scrapy.utils.ftp import ftp_makedirs_cwd
  19. from scrapy.exceptions import NotConfigured
  20. from scrapy.utils.misc import create_instance, load_object
  21. from scrapy.utils.log import failure_to_exc_info
  22. from scrapy.utils.python import without_none_values
  23. from scrapy.utils.boto import is_botocore
  24. logger = logging.getLogger(__name__)
  25. class IFeedStorage(Interface):
  26. """Interface that all Feed Storages must implement"""
  27. def __init__(uri):
  28. """Initialize the storage with the parameters given in the URI"""
  29. def open(spider):
  30. """Open the storage for the given spider. It must return a file-like
  31. object that will be used for the exporters"""
  32. def store(file):
  33. """Store the given file stream"""
  34. @implementer(IFeedStorage)
  35. class BlockingFeedStorage(object):
  36. def open(self, spider):
  37. path = spider.crawler.settings['FEED_TEMPDIR']
  38. if path and not os.path.isdir(path):
  39. raise OSError('Not a Directory: ' + str(path))
  40. return NamedTemporaryFile(prefix='feed-', dir=path)
  41. def store(self, file):
  42. return threads.deferToThread(self._store_in_thread, file)
  43. def _store_in_thread(self, file):
  44. raise NotImplementedError
  45. @implementer(IFeedStorage)
  46. class StdoutFeedStorage(object):
  47. def __init__(self, uri, _stdout=None):
  48. if not _stdout:
  49. _stdout = sys.stdout if six.PY2 else sys.stdout.buffer
  50. self._stdout = _stdout
  51. def open(self, spider):
  52. return self._stdout
  53. def store(self, file):
  54. pass
  55. @implementer(IFeedStorage)
  56. class FileFeedStorage(object):
  57. def __init__(self, uri):
  58. self.path = file_uri_to_path(uri)
  59. def open(self, spider):
  60. dirname = os.path.dirname(self.path)
  61. if dirname and not os.path.exists(dirname):
  62. os.makedirs(dirname)
  63. return open(self.path, 'ab')
  64. def store(self, file):
  65. file.close()
  66. class S3FeedStorage(BlockingFeedStorage):
  67. def __init__(self, uri, access_key=None, secret_key=None, acl=None):
  68. # BEGIN Backward compatibility for initialising without keys (and
  69. # without using from_crawler)
  70. no_defaults = access_key is None and secret_key is None
  71. if no_defaults:
  72. from scrapy.utils.project import get_project_settings
  73. settings = get_project_settings()
  74. if 'AWS_ACCESS_KEY_ID' in settings or 'AWS_SECRET_ACCESS_KEY' in settings:
  75. import warnings
  76. from scrapy.exceptions import ScrapyDeprecationWarning
  77. warnings.warn(
  78. "Initialising `scrapy.extensions.feedexport.S3FeedStorage` "
  79. "without AWS keys is deprecated. Please supply credentials or "
  80. "use the `from_crawler()` constructor.",
  81. category=ScrapyDeprecationWarning,
  82. stacklevel=2
  83. )
  84. access_key = settings['AWS_ACCESS_KEY_ID']
  85. secret_key = settings['AWS_SECRET_ACCESS_KEY']
  86. # END Backward compatibility
  87. u = urlparse(uri)
  88. self.bucketname = u.hostname
  89. self.access_key = u.username or access_key
  90. self.secret_key = u.password or secret_key
  91. self.is_botocore = is_botocore()
  92. self.keyname = u.path[1:] # remove first "/"
  93. self.acl = acl
  94. if self.is_botocore:
  95. import botocore.session
  96. session = botocore.session.get_session()
  97. self.s3_client = session.create_client(
  98. 's3', aws_access_key_id=self.access_key,
  99. aws_secret_access_key=self.secret_key)
  100. else:
  101. import boto
  102. self.connect_s3 = boto.connect_s3
  103. @classmethod
  104. def from_crawler(cls, crawler, uri):
  105. return cls(
  106. uri=uri,
  107. access_key=crawler.settings['AWS_ACCESS_KEY_ID'],
  108. secret_key=crawler.settings['AWS_SECRET_ACCESS_KEY'],
  109. acl=crawler.settings['FEED_STORAGE_S3_ACL'] or None
  110. )
  111. def _store_in_thread(self, file):
  112. file.seek(0)
  113. if self.is_botocore:
  114. kwargs = {'ACL': self.acl} if self.acl else {}
  115. self.s3_client.put_object(
  116. Bucket=self.bucketname, Key=self.keyname, Body=file,
  117. **kwargs)
  118. else:
  119. conn = self.connect_s3(self.access_key, self.secret_key)
  120. bucket = conn.get_bucket(self.bucketname, validate=False)
  121. key = bucket.new_key(self.keyname)
  122. kwargs = {'policy': self.acl} if self.acl else {}
  123. key.set_contents_from_file(file, **kwargs)
  124. key.close()
  125. class FTPFeedStorage(BlockingFeedStorage):
  126. def __init__(self, uri, use_active_mode=False):
  127. u = urlparse(uri)
  128. self.host = u.hostname
  129. self.port = int(u.port or '21')
  130. self.username = u.username
  131. self.password = unquote(u.password)
  132. self.path = u.path
  133. self.use_active_mode = use_active_mode
  134. @classmethod
  135. def from_crawler(cls, crawler, uri):
  136. return cls(
  137. uri=uri,
  138. use_active_mode=crawler.settings.getbool('FEED_STORAGE_FTP_ACTIVE')
  139. )
  140. def _store_in_thread(self, file):
  141. file.seek(0)
  142. ftp = FTP()
  143. ftp.connect(self.host, self.port)
  144. ftp.login(self.username, self.password)
  145. if self.use_active_mode:
  146. ftp.set_pasv(False)
  147. dirname, filename = posixpath.split(self.path)
  148. ftp_makedirs_cwd(ftp, dirname)
  149. ftp.storbinary('STOR %s' % filename, file)
  150. ftp.quit()
  151. class SpiderSlot(object):
  152. def __init__(self, file, exporter, storage, uri):
  153. self.file = file
  154. self.exporter = exporter
  155. self.storage = storage
  156. self.uri = uri
  157. self.itemcount = 0
  158. class FeedExporter(object):
  159. def __init__(self, settings):
  160. self.settings = settings
  161. self.urifmt = settings['FEED_URI']
  162. if not self.urifmt:
  163. raise NotConfigured
  164. self.format = settings['FEED_FORMAT'].lower()
  165. self.export_encoding = settings['FEED_EXPORT_ENCODING']
  166. self.storages = self._load_components('FEED_STORAGES')
  167. self.exporters = self._load_components('FEED_EXPORTERS')
  168. if not self._storage_supported(self.urifmt):
  169. raise NotConfigured
  170. if not self._exporter_supported(self.format):
  171. raise NotConfigured
  172. self.store_empty = settings.getbool('FEED_STORE_EMPTY')
  173. self._exporting = False
  174. self.export_fields = settings.getlist('FEED_EXPORT_FIELDS') or None
  175. self.indent = None
  176. if settings.get('FEED_EXPORT_INDENT') is not None:
  177. self.indent = settings.getint('FEED_EXPORT_INDENT')
  178. uripar = settings['FEED_URI_PARAMS']
  179. self._uripar = load_object(uripar) if uripar else lambda x, y: None
  180. @classmethod
  181. def from_crawler(cls, crawler):
  182. o = cls(crawler.settings)
  183. o.crawler = crawler
  184. crawler.signals.connect(o.open_spider, signals.spider_opened)
  185. crawler.signals.connect(o.close_spider, signals.spider_closed)
  186. crawler.signals.connect(o.item_scraped, signals.item_scraped)
  187. return o
  188. def open_spider(self, spider):
  189. uri = self.urifmt % self._get_uri_params(spider)
  190. storage = self._get_storage(uri)
  191. file = storage.open(spider)
  192. exporter = self._get_exporter(file, fields_to_export=self.export_fields,
  193. encoding=self.export_encoding, indent=self.indent)
  194. if self.store_empty:
  195. exporter.start_exporting()
  196. self._exporting = True
  197. self.slot = SpiderSlot(file, exporter, storage, uri)
  198. def close_spider(self, spider):
  199. slot = self.slot
  200. if not slot.itemcount and not self.store_empty:
  201. # We need to call slot.storage.store nonetheless to get the file
  202. # properly closed.
  203. return defer.maybeDeferred(slot.storage.store, slot.file)
  204. if self._exporting:
  205. slot.exporter.finish_exporting()
  206. self._exporting = False
  207. logfmt = "%s %%(format)s feed (%%(itemcount)d items) in: %%(uri)s"
  208. log_args = {'format': self.format,
  209. 'itemcount': slot.itemcount,
  210. 'uri': slot.uri}
  211. d = defer.maybeDeferred(slot.storage.store, slot.file)
  212. d.addCallback(lambda _: logger.info(logfmt % "Stored", log_args,
  213. extra={'spider': spider}))
  214. d.addErrback(lambda f: logger.error(logfmt % "Error storing", log_args,
  215. exc_info=failure_to_exc_info(f),
  216. extra={'spider': spider}))
  217. return d
  218. def item_scraped(self, item, spider):
  219. slot = self.slot
  220. if not self._exporting:
  221. slot.exporter.start_exporting()
  222. self._exporting = True
  223. slot.exporter.export_item(item)
  224. slot.itemcount += 1
  225. return item
  226. def _load_components(self, setting_prefix):
  227. conf = without_none_values(self.settings.getwithbase(setting_prefix))
  228. d = {}
  229. for k, v in conf.items():
  230. try:
  231. d[k] = load_object(v)
  232. except NotConfigured:
  233. pass
  234. return d
  235. def _exporter_supported(self, format):
  236. if format in self.exporters:
  237. return True
  238. logger.error("Unknown feed format: %(format)s", {'format': format})
  239. def _storage_supported(self, uri):
  240. scheme = urlparse(uri).scheme
  241. if scheme in self.storages:
  242. try:
  243. self._get_storage(uri)
  244. return True
  245. except NotConfigured as e:
  246. logger.error("Disabled feed storage scheme: %(scheme)s. "
  247. "Reason: %(reason)s",
  248. {'scheme': scheme, 'reason': str(e)})
  249. else:
  250. logger.error("Unknown feed storage scheme: %(scheme)s",
  251. {'scheme': scheme})
  252. def _get_instance(self, objcls, *args, **kwargs):
  253. return create_instance(
  254. objcls, self.settings, getattr(self, 'crawler', None),
  255. *args, **kwargs)
  256. def _get_exporter(self, *args, **kwargs):
  257. return self._get_instance(self.exporters[self.format], *args, **kwargs)
  258. def _get_storage(self, uri):
  259. return self._get_instance(self.storages[urlparse(uri).scheme], uri)
  260. def _get_uri_params(self, spider):
  261. params = {}
  262. for k in dir(spider):
  263. params[k] = getattr(spider, k)
  264. ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')
  265. params['time'] = ts
  266. self._uripar(params, spider)
  267. return params