scraper.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """This module implements the Scraper component which parses responses and
  2. extracts information from them"""
  3. import logging
  4. from collections import deque
  5. from twisted.python.failure import Failure
  6. from twisted.internet import defer
  7. from scrapy.utils.defer import defer_result, defer_succeed, parallel, iter_errback
  8. from scrapy.utils.spider import iterate_spider_output
  9. from scrapy.utils.misc import load_object
  10. from scrapy.utils.log import logformatter_adapter, failure_to_exc_info
  11. from scrapy.exceptions import CloseSpider, DropItem, IgnoreRequest
  12. from scrapy import signals
  13. from scrapy.http import Request, Response
  14. from scrapy.item import BaseItem
  15. from scrapy.core.spidermw import SpiderMiddlewareManager
  16. from scrapy.utils.request import referer_str
  17. logger = logging.getLogger(__name__)
  18. class Slot(object):
  19. """Scraper slot (one per running spider)"""
  20. MIN_RESPONSE_SIZE = 1024
  21. def __init__(self, max_active_size=5000000):
  22. self.max_active_size = max_active_size
  23. self.queue = deque()
  24. self.active = set()
  25. self.active_size = 0
  26. self.itemproc_size = 0
  27. self.closing = None
  28. def add_response_request(self, response, request):
  29. deferred = defer.Deferred()
  30. self.queue.append((response, request, deferred))
  31. if isinstance(response, Response):
  32. self.active_size += max(len(response.body), self.MIN_RESPONSE_SIZE)
  33. else:
  34. self.active_size += self.MIN_RESPONSE_SIZE
  35. return deferred
  36. def next_response_request_deferred(self):
  37. response, request, deferred = self.queue.popleft()
  38. self.active.add(request)
  39. return response, request, deferred
  40. def finish_response(self, response, request):
  41. self.active.remove(request)
  42. if isinstance(response, Response):
  43. self.active_size -= max(len(response.body), self.MIN_RESPONSE_SIZE)
  44. else:
  45. self.active_size -= self.MIN_RESPONSE_SIZE
  46. def is_idle(self):
  47. return not (self.queue or self.active)
  48. def needs_backout(self):
  49. return self.active_size > self.max_active_size
  50. class Scraper(object):
  51. def __init__(self, crawler):
  52. self.slot = None
  53. self.spidermw = SpiderMiddlewareManager.from_crawler(crawler)
  54. itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR'])
  55. self.itemproc = itemproc_cls.from_crawler(crawler)
  56. self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS')
  57. self.crawler = crawler
  58. self.signals = crawler.signals
  59. self.logformatter = crawler.logformatter
  60. @defer.inlineCallbacks
  61. def open_spider(self, spider):
  62. """Open the given spider for scraping and allocate resources for it"""
  63. self.slot = Slot()
  64. yield self.itemproc.open_spider(spider)
  65. def close_spider(self, spider):
  66. """Close a spider being scraped and release its resources"""
  67. slot = self.slot
  68. slot.closing = defer.Deferred()
  69. slot.closing.addCallback(self.itemproc.close_spider)
  70. self._check_if_closing(spider, slot)
  71. return slot.closing
  72. def is_idle(self):
  73. """Return True if there isn't any more spiders to process"""
  74. return not self.slot
  75. def _check_if_closing(self, spider, slot):
  76. if slot.closing and slot.is_idle():
  77. slot.closing.callback(spider)
  78. def enqueue_scrape(self, response, request, spider):
  79. slot = self.slot
  80. dfd = slot.add_response_request(response, request)
  81. def finish_scraping(_):
  82. slot.finish_response(response, request)
  83. self._check_if_closing(spider, slot)
  84. self._scrape_next(spider, slot)
  85. return _
  86. dfd.addBoth(finish_scraping)
  87. dfd.addErrback(
  88. lambda f: logger.error('Scraper bug processing %(request)s',
  89. {'request': request},
  90. exc_info=failure_to_exc_info(f),
  91. extra={'spider': spider}))
  92. self._scrape_next(spider, slot)
  93. return dfd
  94. def _scrape_next(self, spider, slot):
  95. while slot.queue:
  96. response, request, deferred = slot.next_response_request_deferred()
  97. self._scrape(response, request, spider).chainDeferred(deferred)
  98. def _scrape(self, response, request, spider):
  99. """Handle the downloaded response or failure through the spider
  100. callback/errback"""
  101. assert isinstance(response, (Response, Failure))
  102. dfd = self._scrape2(response, request, spider) # returns spiders processed output
  103. dfd.addErrback(self.handle_spider_error, request, response, spider)
  104. dfd.addCallback(self.handle_spider_output, request, response, spider)
  105. return dfd
  106. def _scrape2(self, request_result, request, spider):
  107. """Handle the different cases of request's result been a Response or a
  108. Failure"""
  109. if not isinstance(request_result, Failure):
  110. return self.spidermw.scrape_response(
  111. self.call_spider, request_result, request, spider)
  112. else:
  113. dfd = self.call_spider(request_result, request, spider)
  114. return dfd.addErrback(
  115. self._log_download_errors, request_result, request, spider)
  116. def call_spider(self, result, request, spider):
  117. result.request = request
  118. dfd = defer_result(result)
  119. dfd.addCallbacks(callback=request.callback or spider.parse,
  120. errback=request.errback,
  121. callbackKeywords=request.cb_kwargs)
  122. return dfd.addCallback(iterate_spider_output)
  123. def handle_spider_error(self, _failure, request, response, spider):
  124. exc = _failure.value
  125. if isinstance(exc, CloseSpider):
  126. self.crawler.engine.close_spider(spider, exc.reason or 'cancelled')
  127. return
  128. logger.error(
  129. "Spider error processing %(request)s (referer: %(referer)s)",
  130. {'request': request, 'referer': referer_str(request)},
  131. exc_info=failure_to_exc_info(_failure),
  132. extra={'spider': spider}
  133. )
  134. self.signals.send_catch_log(
  135. signal=signals.spider_error,
  136. failure=_failure, response=response,
  137. spider=spider
  138. )
  139. self.crawler.stats.inc_value(
  140. "spider_exceptions/%s" % _failure.value.__class__.__name__,
  141. spider=spider
  142. )
  143. def handle_spider_output(self, result, request, response, spider):
  144. if not result:
  145. return defer_succeed(None)
  146. it = iter_errback(result, self.handle_spider_error, request, response, spider)
  147. dfd = parallel(it, self.concurrent_items,
  148. self._process_spidermw_output, request, response, spider)
  149. return dfd
  150. def _process_spidermw_output(self, output, request, response, spider):
  151. """Process each Request/Item (given in the output parameter) returned
  152. from the given spider
  153. """
  154. if isinstance(output, Request):
  155. self.crawler.engine.crawl(request=output, spider=spider)
  156. elif isinstance(output, (BaseItem, dict)):
  157. self.slot.itemproc_size += 1
  158. dfd = self.itemproc.process_item(output, spider)
  159. dfd.addBoth(self._itemproc_finished, output, response, spider)
  160. return dfd
  161. elif output is None:
  162. pass
  163. else:
  164. typename = type(output).__name__
  165. logger.error('Spider must return Request, BaseItem, dict or None, '
  166. 'got %(typename)r in %(request)s',
  167. {'request': request, 'typename': typename},
  168. extra={'spider': spider})
  169. def _log_download_errors(self, spider_failure, download_failure, request, spider):
  170. """Log and silence errors that come from the engine (typically download
  171. errors that got propagated thru here)
  172. """
  173. if (isinstance(download_failure, Failure) and
  174. not download_failure.check(IgnoreRequest)):
  175. if download_failure.frames:
  176. logger.error('Error downloading %(request)s',
  177. {'request': request},
  178. exc_info=failure_to_exc_info(download_failure),
  179. extra={'spider': spider})
  180. else:
  181. errmsg = download_failure.getErrorMessage()
  182. if errmsg:
  183. logger.error('Error downloading %(request)s: %(errmsg)s',
  184. {'request': request, 'errmsg': errmsg},
  185. extra={'spider': spider})
  186. if spider_failure is not download_failure:
  187. return spider_failure
  188. def _itemproc_finished(self, output, item, response, spider):
  189. """ItemProcessor finished for the given ``item`` and returned ``output``
  190. """
  191. self.slot.itemproc_size -= 1
  192. if isinstance(output, Failure):
  193. ex = output.value
  194. if isinstance(ex, DropItem):
  195. logkws = self.logformatter.dropped(item, ex, response, spider)
  196. if logkws is not None:
  197. logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
  198. return self.signals.send_catch_log_deferred(
  199. signal=signals.item_dropped, item=item, response=response,
  200. spider=spider, exception=output.value)
  201. else:
  202. logger.error('Error processing %(item)s', {'item': item},
  203. exc_info=failure_to_exc_info(output),
  204. extra={'spider': spider})
  205. return self.signals.send_catch_log_deferred(
  206. signal=signals.item_error, item=item, response=response,
  207. spider=spider, failure=output)
  208. else:
  209. logkws = self.logformatter.scraped(output, response, spider)
  210. if logkws is not None:
  211. logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
  212. return self.signals.send_catch_log_deferred(
  213. signal=signals.item_scraped, item=output, response=response,
  214. spider=spider)