media.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. from __future__ import print_function
  2. import functools
  3. import logging
  4. from collections import defaultdict
  5. from twisted.internet.defer import Deferred, DeferredList, _DefGen_Return
  6. from twisted.python.failure import Failure
  7. from scrapy.settings import Settings
  8. from scrapy.utils.datatypes import SequenceExclude
  9. from scrapy.utils.defer import mustbe_deferred, defer_result
  10. from scrapy.utils.request import request_fingerprint
  11. from scrapy.utils.misc import arg_to_iter
  12. from scrapy.utils.log import failure_to_exc_info
  13. logger = logging.getLogger(__name__)
  14. class MediaPipeline(object):
  15. LOG_FAILED_RESULTS = True
  16. class SpiderInfo(object):
  17. def __init__(self, spider):
  18. self.spider = spider
  19. self.downloading = set()
  20. self.downloaded = {}
  21. self.waiting = defaultdict(list)
  22. def __init__(self, download_func=None, settings=None):
  23. self.download_func = download_func
  24. if isinstance(settings, dict) or settings is None:
  25. settings = Settings(settings)
  26. resolve = functools.partial(self._key_for_pipe,
  27. base_class_name="MediaPipeline",
  28. settings=settings)
  29. self.allow_redirects = settings.getbool(
  30. resolve('MEDIA_ALLOW_REDIRECTS'), False
  31. )
  32. self._handle_statuses(self.allow_redirects)
  33. def _handle_statuses(self, allow_redirects):
  34. self.handle_httpstatus_list = None
  35. if allow_redirects:
  36. self.handle_httpstatus_list = SequenceExclude(range(300, 400))
  37. def _key_for_pipe(self, key, base_class_name=None,
  38. settings=None):
  39. """
  40. >>> MediaPipeline()._key_for_pipe("IMAGES")
  41. 'IMAGES'
  42. >>> class MyPipe(MediaPipeline):
  43. ... pass
  44. >>> MyPipe()._key_for_pipe("IMAGES", base_class_name="MediaPipeline")
  45. 'MYPIPE_IMAGES'
  46. """
  47. class_name = self.__class__.__name__
  48. formatted_key = "{}_{}".format(class_name.upper(), key)
  49. if class_name == base_class_name or not base_class_name \
  50. or (settings and not settings.get(formatted_key)):
  51. return key
  52. return formatted_key
  53. @classmethod
  54. def from_crawler(cls, crawler):
  55. try:
  56. pipe = cls.from_settings(crawler.settings)
  57. except AttributeError:
  58. pipe = cls()
  59. pipe.crawler = crawler
  60. return pipe
  61. def open_spider(self, spider):
  62. self.spiderinfo = self.SpiderInfo(spider)
  63. def process_item(self, item, spider):
  64. info = self.spiderinfo
  65. requests = arg_to_iter(self.get_media_requests(item, info))
  66. dlist = [self._process_request(r, info) for r in requests]
  67. dfd = DeferredList(dlist, consumeErrors=1)
  68. return dfd.addCallback(self.item_completed, item, info)
  69. def _process_request(self, request, info):
  70. fp = request_fingerprint(request)
  71. cb = request.callback or (lambda _: _)
  72. eb = request.errback
  73. request.callback = None
  74. request.errback = None
  75. # Return cached result if request was already seen
  76. if fp in info.downloaded:
  77. return defer_result(info.downloaded[fp]).addCallbacks(cb, eb)
  78. # Otherwise, wait for result
  79. wad = Deferred().addCallbacks(cb, eb)
  80. info.waiting[fp].append(wad)
  81. # Check if request is downloading right now to avoid doing it twice
  82. if fp in info.downloading:
  83. return wad
  84. # Download request checking media_to_download hook output first
  85. info.downloading.add(fp)
  86. dfd = mustbe_deferred(self.media_to_download, request, info)
  87. dfd.addCallback(self._check_media_to_download, request, info)
  88. dfd.addBoth(self._cache_result_and_execute_waiters, fp, info)
  89. dfd.addErrback(lambda f: logger.error(
  90. f.value, exc_info=failure_to_exc_info(f), extra={'spider': info.spider})
  91. )
  92. return dfd.addBoth(lambda _: wad) # it must return wad at last
  93. def _modify_media_request(self, request):
  94. if self.handle_httpstatus_list:
  95. request.meta['handle_httpstatus_list'] = self.handle_httpstatus_list
  96. else:
  97. request.meta['handle_httpstatus_all'] = True
  98. def _check_media_to_download(self, result, request, info):
  99. if result is not None:
  100. return result
  101. if self.download_func:
  102. # this ugly code was left only to support tests. TODO: remove
  103. dfd = mustbe_deferred(self.download_func, request, info.spider)
  104. dfd.addCallbacks(
  105. callback=self.media_downloaded, callbackArgs=(request, info),
  106. errback=self.media_failed, errbackArgs=(request, info))
  107. else:
  108. self._modify_media_request(request)
  109. dfd = self.crawler.engine.download(request, info.spider)
  110. dfd.addCallbacks(
  111. callback=self.media_downloaded, callbackArgs=(request, info),
  112. errback=self.media_failed, errbackArgs=(request, info))
  113. return dfd
  114. def _cache_result_and_execute_waiters(self, result, fp, info):
  115. if isinstance(result, Failure):
  116. # minimize cached information for failure
  117. result.cleanFailure()
  118. result.frames = []
  119. result.stack = None
  120. # This code fixes a memory leak by avoiding to keep references to
  121. # the Request and Response objects on the Media Pipeline cache.
  122. #
  123. # Twisted inline callbacks pass return values using the function
  124. # twisted.internet.defer.returnValue, which encapsulates the return
  125. # value inside a _DefGen_Return base exception.
  126. #
  127. # What happens when the media_downloaded callback raises another
  128. # exception, for example a FileException('download-error') when
  129. # the Response status code is not 200 OK, is that it stores the
  130. # _DefGen_Return exception on the FileException context.
  131. #
  132. # To avoid keeping references to the Response and therefore Request
  133. # objects on the Media Pipeline cache, we should wipe the context of
  134. # the exception encapsulated by the Twisted Failure when its a
  135. # _DefGen_Return instance.
  136. #
  137. # This problem does not occur in Python 2.7 since we don't have
  138. # Exception Chaining (https://www.python.org/dev/peps/pep-3134/).
  139. context = getattr(result.value, '__context__', None)
  140. if isinstance(context, _DefGen_Return):
  141. setattr(result.value, '__context__', None)
  142. info.downloading.remove(fp)
  143. info.downloaded[fp] = result # cache result
  144. for wad in info.waiting.pop(fp):
  145. defer_result(result).chainDeferred(wad)
  146. ### Overridable Interface
  147. def media_to_download(self, request, info):
  148. """Check request before starting download"""
  149. pass
  150. def get_media_requests(self, item, info):
  151. """Returns the media requests to download"""
  152. pass
  153. def media_downloaded(self, response, request, info):
  154. """Handler for success downloads"""
  155. return response
  156. def media_failed(self, failure, request, info):
  157. """Handler for failed downloads"""
  158. return failure
  159. def item_completed(self, results, item, info):
  160. """Called per item when all media requests has been processed"""
  161. if self.LOG_FAILED_RESULTS:
  162. for ok, value in results:
  163. if not ok:
  164. logger.error(
  165. '%(class)s found errors processing %(item)s',
  166. {'class': self.__class__.__name__, 'item': item},
  167. exc_info=failure_to_exc_info(value),
  168. extra={'spider': info.spider}
  169. )
  170. return item