engine.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. """
  2. This is the Scrapy engine which controls the Scheduler, Downloader and Spiders.
  3. For more information see docs/topics/architecture.rst
  4. """
  5. import logging
  6. from time import time
  7. from twisted.internet import defer, task
  8. from twisted.python.failure import Failure
  9. from scrapy import signals
  10. from scrapy.core.scraper import Scraper
  11. from scrapy.exceptions import DontCloseSpider
  12. from scrapy.http import Response, Request
  13. from scrapy.utils.misc import load_object
  14. from scrapy.utils.reactor import CallLaterOnce
  15. from scrapy.utils.log import logformatter_adapter, failure_to_exc_info
  16. logger = logging.getLogger(__name__)
  17. class Slot(object):
  18. def __init__(self, start_requests, close_if_idle, nextcall, scheduler):
  19. self.closing = False
  20. self.inprogress = set() # requests in progress
  21. self.start_requests = iter(start_requests)
  22. self.close_if_idle = close_if_idle
  23. self.nextcall = nextcall
  24. self.scheduler = scheduler
  25. self.heartbeat = task.LoopingCall(nextcall.schedule)
  26. def add_request(self, request):
  27. self.inprogress.add(request)
  28. def remove_request(self, request):
  29. self.inprogress.remove(request)
  30. self._maybe_fire_closing()
  31. def close(self):
  32. self.closing = defer.Deferred()
  33. self._maybe_fire_closing()
  34. return self.closing
  35. def _maybe_fire_closing(self):
  36. if self.closing and not self.inprogress:
  37. if self.nextcall:
  38. self.nextcall.cancel()
  39. if self.heartbeat.running:
  40. self.heartbeat.stop()
  41. self.closing.callback(None)
  42. class ExecutionEngine(object):
  43. def __init__(self, crawler, spider_closed_callback):
  44. self.crawler = crawler
  45. self.settings = crawler.settings
  46. self.signals = crawler.signals
  47. self.logformatter = crawler.logformatter
  48. self.slot = None
  49. self.spider = None
  50. self.running = False
  51. self.paused = False
  52. self.scheduler_cls = load_object(self.settings['SCHEDULER'])
  53. downloader_cls = load_object(self.settings['DOWNLOADER'])
  54. self.downloader = downloader_cls(crawler)
  55. self.scraper = Scraper(crawler)
  56. self._spider_closed_callback = spider_closed_callback
  57. @defer.inlineCallbacks
  58. def start(self):
  59. """Start the execution engine"""
  60. assert not self.running, "Engine already running"
  61. self.start_time = time()
  62. yield self.signals.send_catch_log_deferred(signal=signals.engine_started)
  63. self.running = True
  64. self._closewait = defer.Deferred()
  65. yield self._closewait
  66. def stop(self):
  67. """Stop the execution engine gracefully"""
  68. assert self.running, "Engine not running"
  69. self.running = False
  70. dfd = self._close_all_spiders()
  71. return dfd.addBoth(lambda _: self._finish_stopping_engine())
  72. def close(self):
  73. """Close the execution engine gracefully.
  74. If it has already been started, stop it. In all cases, close all spiders
  75. and the downloader.
  76. """
  77. if self.running:
  78. # Will also close spiders and downloader
  79. return self.stop()
  80. elif self.open_spiders:
  81. # Will also close downloader
  82. return self._close_all_spiders()
  83. else:
  84. return defer.succeed(self.downloader.close())
  85. def pause(self):
  86. """Pause the execution engine"""
  87. self.paused = True
  88. def unpause(self):
  89. """Resume the execution engine"""
  90. self.paused = False
  91. def _next_request(self, spider):
  92. slot = self.slot
  93. if not slot:
  94. return
  95. if self.paused:
  96. return
  97. while not self._needs_backout(spider):
  98. if not self._next_request_from_scheduler(spider):
  99. break
  100. if slot.start_requests and not self._needs_backout(spider):
  101. try:
  102. request = next(slot.start_requests)
  103. except StopIteration:
  104. slot.start_requests = None
  105. except Exception:
  106. slot.start_requests = None
  107. logger.error('Error while obtaining start requests',
  108. exc_info=True, extra={'spider': spider})
  109. else:
  110. self.crawl(request, spider)
  111. if self.spider_is_idle(spider) and slot.close_if_idle:
  112. self._spider_idle(spider)
  113. def _needs_backout(self, spider):
  114. slot = self.slot
  115. return not self.running \
  116. or slot.closing \
  117. or self.downloader.needs_backout() \
  118. or self.scraper.slot.needs_backout()
  119. def _next_request_from_scheduler(self, spider):
  120. slot = self.slot
  121. request = slot.scheduler.next_request()
  122. if not request:
  123. return
  124. d = self._download(request, spider)
  125. d.addBoth(self._handle_downloader_output, request, spider)
  126. d.addErrback(lambda f: logger.info('Error while handling downloader output',
  127. exc_info=failure_to_exc_info(f),
  128. extra={'spider': spider}))
  129. d.addBoth(lambda _: slot.remove_request(request))
  130. d.addErrback(lambda f: logger.info('Error while removing request from slot',
  131. exc_info=failure_to_exc_info(f),
  132. extra={'spider': spider}))
  133. d.addBoth(lambda _: slot.nextcall.schedule())
  134. d.addErrback(lambda f: logger.info('Error while scheduling new request',
  135. exc_info=failure_to_exc_info(f),
  136. extra={'spider': spider}))
  137. return d
  138. def _handle_downloader_output(self, response, request, spider):
  139. assert isinstance(response, (Request, Response, Failure)), response
  140. # downloader middleware can return requests (for example, redirects)
  141. if isinstance(response, Request):
  142. self.crawl(response, spider)
  143. return
  144. # response is a Response or Failure
  145. d = self.scraper.enqueue_scrape(response, request, spider)
  146. d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
  147. exc_info=failure_to_exc_info(f),
  148. extra={'spider': spider}))
  149. return d
  150. def spider_is_idle(self, spider):
  151. if not self.scraper.slot.is_idle():
  152. # scraper is not idle
  153. return False
  154. if self.downloader.active:
  155. # downloader has pending requests
  156. return False
  157. if self.slot.start_requests is not None:
  158. # not all start requests are handled
  159. return False
  160. if self.slot.scheduler.has_pending_requests():
  161. # scheduler has pending requests
  162. return False
  163. return True
  164. @property
  165. def open_spiders(self):
  166. return [self.spider] if self.spider else []
  167. def has_capacity(self):
  168. """Does the engine have capacity to handle more spiders"""
  169. return not bool(self.slot)
  170. def crawl(self, request, spider):
  171. assert spider in self.open_spiders, \
  172. "Spider %r not opened when crawling: %s" % (spider.name, request)
  173. self.schedule(request, spider)
  174. self.slot.nextcall.schedule()
  175. def schedule(self, request, spider):
  176. self.signals.send_catch_log(signal=signals.request_scheduled,
  177. request=request, spider=spider)
  178. if not self.slot.scheduler.enqueue_request(request):
  179. self.signals.send_catch_log(signal=signals.request_dropped,
  180. request=request, spider=spider)
  181. def download(self, request, spider):
  182. d = self._download(request, spider)
  183. d.addBoth(self._downloaded, self.slot, request, spider)
  184. return d
  185. def _downloaded(self, response, slot, request, spider):
  186. slot.remove_request(request)
  187. return self.download(response, spider) \
  188. if isinstance(response, Request) else response
  189. def _download(self, request, spider):
  190. slot = self.slot
  191. slot.add_request(request)
  192. def _on_success(response):
  193. assert isinstance(response, (Response, Request))
  194. if isinstance(response, Response):
  195. response.request = request # tie request to response received
  196. logkws = self.logformatter.crawled(request, response, spider)
  197. if logkws is not None:
  198. logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
  199. self.signals.send_catch_log(signal=signals.response_received,
  200. response=response, request=request, spider=spider)
  201. return response
  202. def _on_complete(_):
  203. slot.nextcall.schedule()
  204. return _
  205. dwld = self.downloader.fetch(request, spider)
  206. dwld.addCallbacks(_on_success)
  207. dwld.addBoth(_on_complete)
  208. return dwld
  209. @defer.inlineCallbacks
  210. def open_spider(self, spider, start_requests=(), close_if_idle=True):
  211. assert self.has_capacity(), "No free spider slot when opening %r" % \
  212. spider.name
  213. logger.info("Spider opened", extra={'spider': spider})
  214. nextcall = CallLaterOnce(self._next_request, spider)
  215. scheduler = self.scheduler_cls.from_crawler(self.crawler)
  216. start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
  217. slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
  218. self.slot = slot
  219. self.spider = spider
  220. yield scheduler.open(spider)
  221. yield self.scraper.open_spider(spider)
  222. self.crawler.stats.open_spider(spider)
  223. yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
  224. slot.nextcall.schedule()
  225. slot.heartbeat.start(5)
  226. def _spider_idle(self, spider):
  227. """Called when a spider gets idle. This function is called when there
  228. are no remaining pages to download or schedule. It can be called
  229. multiple times. If some extension raises a DontCloseSpider exception
  230. (in the spider_idle signal handler) the spider is not closed until the
  231. next loop and this function is guaranteed to be called (at least) once
  232. again for this spider.
  233. """
  234. res = self.signals.send_catch_log(signal=signals.spider_idle, \
  235. spider=spider, dont_log=DontCloseSpider)
  236. if any(isinstance(x, Failure) and isinstance(x.value, DontCloseSpider) \
  237. for _, x in res):
  238. return
  239. if self.spider_is_idle(spider):
  240. self.close_spider(spider, reason='finished')
  241. def close_spider(self, spider, reason='cancelled'):
  242. """Close (cancel) spider and clear all its outstanding requests"""
  243. slot = self.slot
  244. if slot.closing:
  245. return slot.closing
  246. logger.info("Closing spider (%(reason)s)",
  247. {'reason': reason},
  248. extra={'spider': spider})
  249. dfd = slot.close()
  250. def log_failure(msg):
  251. def errback(failure):
  252. logger.error(
  253. msg,
  254. exc_info=failure_to_exc_info(failure),
  255. extra={'spider': spider}
  256. )
  257. return errback
  258. dfd.addBoth(lambda _: self.downloader.close())
  259. dfd.addErrback(log_failure('Downloader close failure'))
  260. dfd.addBoth(lambda _: self.scraper.close_spider(spider))
  261. dfd.addErrback(log_failure('Scraper close failure'))
  262. dfd.addBoth(lambda _: slot.scheduler.close(reason))
  263. dfd.addErrback(log_failure('Scheduler close failure'))
  264. dfd.addBoth(lambda _: self.signals.send_catch_log_deferred(
  265. signal=signals.spider_closed, spider=spider, reason=reason))
  266. dfd.addErrback(log_failure('Error while sending spider_close signal'))
  267. dfd.addBoth(lambda _: self.crawler.stats.close_spider(spider, reason=reason))
  268. dfd.addErrback(log_failure('Stats close failure'))
  269. dfd.addBoth(lambda _: logger.info("Spider closed (%(reason)s)",
  270. {'reason': reason},
  271. extra={'spider': spider}))
  272. dfd.addBoth(lambda _: setattr(self, 'slot', None))
  273. dfd.addErrback(log_failure('Error while unassigning slot'))
  274. dfd.addBoth(lambda _: setattr(self, 'spider', None))
  275. dfd.addErrback(log_failure('Error while unassigning spider'))
  276. dfd.addBoth(lambda _: self._spider_closed_callback(spider))
  277. return dfd
  278. def _close_all_spiders(self):
  279. dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders]
  280. dlist = defer.DeferredList(dfds)
  281. return dlist
  282. @defer.inlineCallbacks
  283. def _finish_stopping_engine(self):
  284. yield self.signals.send_catch_log_deferred(signal=signals.engine_stopped)
  285. self._closewait.callback(None)