123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- """
- This is the Scrapy engine which controls the Scheduler, Downloader and Spiders.
- For more information see docs/topics/architecture.rst
- """
- import logging
- from time import time
- from twisted.internet import defer, task
- from twisted.python.failure import Failure
- from scrapy import signals
- from scrapy.core.scraper import Scraper
- from scrapy.exceptions import DontCloseSpider
- from scrapy.http import Response, Request
- from scrapy.utils.misc import load_object
- from scrapy.utils.reactor import CallLaterOnce
- from scrapy.utils.log import logformatter_adapter, failure_to_exc_info
- logger = logging.getLogger(__name__)
- class Slot(object):
- def __init__(self, start_requests, close_if_idle, nextcall, scheduler):
- self.closing = False
- self.inprogress = set() # requests in progress
- self.start_requests = iter(start_requests)
- self.close_if_idle = close_if_idle
- self.nextcall = nextcall
- self.scheduler = scheduler
- self.heartbeat = task.LoopingCall(nextcall.schedule)
- def add_request(self, request):
- self.inprogress.add(request)
- def remove_request(self, request):
- self.inprogress.remove(request)
- self._maybe_fire_closing()
- def close(self):
- self.closing = defer.Deferred()
- self._maybe_fire_closing()
- return self.closing
- def _maybe_fire_closing(self):
- if self.closing and not self.inprogress:
- if self.nextcall:
- self.nextcall.cancel()
- if self.heartbeat.running:
- self.heartbeat.stop()
- self.closing.callback(None)
- class ExecutionEngine(object):
- def __init__(self, crawler, spider_closed_callback):
- self.crawler = crawler
- self.settings = crawler.settings
- self.signals = crawler.signals
- self.logformatter = crawler.logformatter
- self.slot = None
- self.spider = None
- self.running = False
- self.paused = False
- self.scheduler_cls = load_object(self.settings['SCHEDULER'])
- downloader_cls = load_object(self.settings['DOWNLOADER'])
- self.downloader = downloader_cls(crawler)
- self.scraper = Scraper(crawler)
- self._spider_closed_callback = spider_closed_callback
- @defer.inlineCallbacks
- def start(self):
- """Start the execution engine"""
- assert not self.running, "Engine already running"
- self.start_time = time()
- yield self.signals.send_catch_log_deferred(signal=signals.engine_started)
- self.running = True
- self._closewait = defer.Deferred()
- yield self._closewait
- def stop(self):
- """Stop the execution engine gracefully"""
- assert self.running, "Engine not running"
- self.running = False
- dfd = self._close_all_spiders()
- return dfd.addBoth(lambda _: self._finish_stopping_engine())
- def close(self):
- """Close the execution engine gracefully.
- If it has already been started, stop it. In all cases, close all spiders
- and the downloader.
- """
- if self.running:
- # Will also close spiders and downloader
- return self.stop()
- elif self.open_spiders:
- # Will also close downloader
- return self._close_all_spiders()
- else:
- return defer.succeed(self.downloader.close())
- def pause(self):
- """Pause the execution engine"""
- self.paused = True
- def unpause(self):
- """Resume the execution engine"""
- self.paused = False
- def _next_request(self, spider):
- slot = self.slot
- if not slot:
- return
- if self.paused:
- return
- while not self._needs_backout(spider):
- if not self._next_request_from_scheduler(spider):
- break
- if slot.start_requests and not self._needs_backout(spider):
- try:
- request = next(slot.start_requests)
- except StopIteration:
- slot.start_requests = None
- except Exception:
- slot.start_requests = None
- logger.error('Error while obtaining start requests',
- exc_info=True, extra={'spider': spider})
- else:
- self.crawl(request, spider)
- if self.spider_is_idle(spider) and slot.close_if_idle:
- self._spider_idle(spider)
- def _needs_backout(self, spider):
- slot = self.slot
- return not self.running \
- or slot.closing \
- or self.downloader.needs_backout() \
- or self.scraper.slot.needs_backout()
- def _next_request_from_scheduler(self, spider):
- slot = self.slot
- request = slot.scheduler.next_request()
- if not request:
- return
- d = self._download(request, spider)
- d.addBoth(self._handle_downloader_output, request, spider)
- d.addErrback(lambda f: logger.info('Error while handling downloader output',
- exc_info=failure_to_exc_info(f),
- extra={'spider': spider}))
- d.addBoth(lambda _: slot.remove_request(request))
- d.addErrback(lambda f: logger.info('Error while removing request from slot',
- exc_info=failure_to_exc_info(f),
- extra={'spider': spider}))
- d.addBoth(lambda _: slot.nextcall.schedule())
- d.addErrback(lambda f: logger.info('Error while scheduling new request',
- exc_info=failure_to_exc_info(f),
- extra={'spider': spider}))
- return d
- def _handle_downloader_output(self, response, request, spider):
- assert isinstance(response, (Request, Response, Failure)), response
- # downloader middleware can return requests (for example, redirects)
- if isinstance(response, Request):
- self.crawl(response, spider)
- return
- # response is a Response or Failure
- d = self.scraper.enqueue_scrape(response, request, spider)
- d.addErrback(lambda f: logger.error('Error while enqueuing downloader output',
- exc_info=failure_to_exc_info(f),
- extra={'spider': spider}))
- return d
- def spider_is_idle(self, spider):
- if not self.scraper.slot.is_idle():
- # scraper is not idle
- return False
- if self.downloader.active:
- # downloader has pending requests
- return False
- if self.slot.start_requests is not None:
- # not all start requests are handled
- return False
- if self.slot.scheduler.has_pending_requests():
- # scheduler has pending requests
- return False
- return True
- @property
- def open_spiders(self):
- return [self.spider] if self.spider else []
- def has_capacity(self):
- """Does the engine have capacity to handle more spiders"""
- return not bool(self.slot)
- def crawl(self, request, spider):
- assert spider in self.open_spiders, \
- "Spider %r not opened when crawling: %s" % (spider.name, request)
- self.schedule(request, spider)
- self.slot.nextcall.schedule()
- def schedule(self, request, spider):
- self.signals.send_catch_log(signal=signals.request_scheduled,
- request=request, spider=spider)
- if not self.slot.scheduler.enqueue_request(request):
- self.signals.send_catch_log(signal=signals.request_dropped,
- request=request, spider=spider)
- def download(self, request, spider):
- d = self._download(request, spider)
- d.addBoth(self._downloaded, self.slot, request, spider)
- return d
- def _downloaded(self, response, slot, request, spider):
- slot.remove_request(request)
- return self.download(response, spider) \
- if isinstance(response, Request) else response
- def _download(self, request, spider):
- slot = self.slot
- slot.add_request(request)
- def _on_success(response):
- assert isinstance(response, (Response, Request))
- if isinstance(response, Response):
- response.request = request # tie request to response received
- logkws = self.logformatter.crawled(request, response, spider)
- if logkws is not None:
- logger.log(*logformatter_adapter(logkws), extra={'spider': spider})
- self.signals.send_catch_log(signal=signals.response_received,
- response=response, request=request, spider=spider)
- return response
- def _on_complete(_):
- slot.nextcall.schedule()
- return _
- dwld = self.downloader.fetch(request, spider)
- dwld.addCallbacks(_on_success)
- dwld.addBoth(_on_complete)
- return dwld
- @defer.inlineCallbacks
- def open_spider(self, spider, start_requests=(), close_if_idle=True):
- assert self.has_capacity(), "No free spider slot when opening %r" % \
- spider.name
- logger.info("Spider opened", extra={'spider': spider})
- nextcall = CallLaterOnce(self._next_request, spider)
- scheduler = self.scheduler_cls.from_crawler(self.crawler)
- start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
- slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
- self.slot = slot
- self.spider = spider
- yield scheduler.open(spider)
- yield self.scraper.open_spider(spider)
- self.crawler.stats.open_spider(spider)
- yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
- slot.nextcall.schedule()
- slot.heartbeat.start(5)
- def _spider_idle(self, spider):
- """Called when a spider gets idle. This function is called when there
- are no remaining pages to download or schedule. It can be called
- multiple times. If some extension raises a DontCloseSpider exception
- (in the spider_idle signal handler) the spider is not closed until the
- next loop and this function is guaranteed to be called (at least) once
- again for this spider.
- """
- res = self.signals.send_catch_log(signal=signals.spider_idle, \
- spider=spider, dont_log=DontCloseSpider)
- if any(isinstance(x, Failure) and isinstance(x.value, DontCloseSpider) \
- for _, x in res):
- return
- if self.spider_is_idle(spider):
- self.close_spider(spider, reason='finished')
- def close_spider(self, spider, reason='cancelled'):
- """Close (cancel) spider and clear all its outstanding requests"""
- slot = self.slot
- if slot.closing:
- return slot.closing
- logger.info("Closing spider (%(reason)s)",
- {'reason': reason},
- extra={'spider': spider})
- dfd = slot.close()
- def log_failure(msg):
- def errback(failure):
- logger.error(
- msg,
- exc_info=failure_to_exc_info(failure),
- extra={'spider': spider}
- )
- return errback
- dfd.addBoth(lambda _: self.downloader.close())
- dfd.addErrback(log_failure('Downloader close failure'))
- dfd.addBoth(lambda _: self.scraper.close_spider(spider))
- dfd.addErrback(log_failure('Scraper close failure'))
- dfd.addBoth(lambda _: slot.scheduler.close(reason))
- dfd.addErrback(log_failure('Scheduler close failure'))
- dfd.addBoth(lambda _: self.signals.send_catch_log_deferred(
- signal=signals.spider_closed, spider=spider, reason=reason))
- dfd.addErrback(log_failure('Error while sending spider_close signal'))
- dfd.addBoth(lambda _: self.crawler.stats.close_spider(spider, reason=reason))
- dfd.addErrback(log_failure('Stats close failure'))
- dfd.addBoth(lambda _: logger.info("Spider closed (%(reason)s)",
- {'reason': reason},
- extra={'spider': spider}))
- dfd.addBoth(lambda _: setattr(self, 'slot', None))
- dfd.addErrback(log_failure('Error while unassigning slot'))
- dfd.addBoth(lambda _: setattr(self, 'spider', None))
- dfd.addErrback(log_failure('Error while unassigning spider'))
- dfd.addBoth(lambda _: self._spider_closed_callback(spider))
- return dfd
- def _close_all_spiders(self):
- dfds = [self.close_spider(s, reason='shutdown') for s in self.open_spiders]
- dlist = defer.DeferredList(dfds)
- return dlist
- @defer.inlineCallbacks
- def _finish_stopping_engine(self):
- yield self.signals.send_catch_log_deferred(signal=signals.engine_stopped)
- self._closewait.callback(None)
|