scheduler.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import os
  2. import json
  3. import logging
  4. import warnings
  5. from os.path import join, exists
  6. from queuelib import PriorityQueue
  7. from scrapy.utils.misc import load_object, create_instance
  8. from scrapy.utils.job import job_dir
  9. from scrapy.utils.deprecate import ScrapyDeprecationWarning
  10. logger = logging.getLogger(__name__)
  11. class Scheduler(object):
  12. """
  13. Scrapy Scheduler. It allows to enqueue requests and then get
  14. a next request to download. Scheduler is also handling duplication
  15. filtering, via dupefilter.
  16. Prioritization and queueing is not performed by the Scheduler.
  17. User sets ``priority`` field for each Request, and a PriorityQueue
  18. (defined by :setting:`SCHEDULER_PRIORITY_QUEUE`) uses these priorities
  19. to dequeue requests in a desired order.
  20. Scheduler uses two PriorityQueue instances, configured to work in-memory
  21. and on-disk (optional). When on-disk queue is present, it is used by
  22. default, and an in-memory queue is used as a fallback for cases where
  23. a disk queue can't handle a request (can't serialize it).
  24. :setting:`SCHEDULER_MEMORY_QUEUE` and
  25. :setting:`SCHEDULER_DISK_QUEUE` allow to specify lower-level queue classes
  26. which PriorityQueue instances would be instantiated with, to keep requests
  27. on disk and in memory respectively.
  28. Overall, Scheduler is an object which holds several PriorityQueue instances
  29. (in-memory and on-disk) and implements fallback logic for them.
  30. Also, it handles dupefilters.
  31. """
  32. def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,
  33. logunser=False, stats=None, pqclass=None, crawler=None):
  34. self.df = dupefilter
  35. self.dqdir = self._dqdir(jobdir)
  36. self.pqclass = pqclass
  37. self.dqclass = dqclass
  38. self.mqclass = mqclass
  39. self.logunser = logunser
  40. self.stats = stats
  41. self.crawler = crawler
  42. @classmethod
  43. def from_crawler(cls, crawler):
  44. settings = crawler.settings
  45. dupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])
  46. dupefilter = create_instance(dupefilter_cls, settings, crawler)
  47. pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE'])
  48. if pqclass is PriorityQueue:
  49. warnings.warn("SCHEDULER_PRIORITY_QUEUE='queuelib.PriorityQueue'"
  50. " is no longer supported because of API changes; "
  51. "please use 'scrapy.pqueues.ScrapyPriorityQueue'",
  52. ScrapyDeprecationWarning)
  53. from scrapy.pqueues import ScrapyPriorityQueue
  54. pqclass = ScrapyPriorityQueue
  55. dqclass = load_object(settings['SCHEDULER_DISK_QUEUE'])
  56. mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE'])
  57. logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS',
  58. settings.getbool('SCHEDULER_DEBUG'))
  59. return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
  60. stats=crawler.stats, pqclass=pqclass, dqclass=dqclass,
  61. mqclass=mqclass, crawler=crawler)
  62. def has_pending_requests(self):
  63. return len(self) > 0
  64. def open(self, spider):
  65. self.spider = spider
  66. self.mqs = self._mq()
  67. self.dqs = self._dq() if self.dqdir else None
  68. return self.df.open()
  69. def close(self, reason):
  70. if self.dqs:
  71. state = self.dqs.close()
  72. self._write_dqs_state(self.dqdir, state)
  73. return self.df.close(reason)
  74. def enqueue_request(self, request):
  75. if not request.dont_filter and self.df.request_seen(request):
  76. self.df.log(request, self.spider)
  77. return False
  78. dqok = self._dqpush(request)
  79. if dqok:
  80. self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
  81. else:
  82. self._mqpush(request)
  83. self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
  84. self.stats.inc_value('scheduler/enqueued', spider=self.spider)
  85. return True
  86. def next_request(self):
  87. request = self.mqs.pop()
  88. if request:
  89. self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)
  90. else:
  91. request = self._dqpop()
  92. if request:
  93. self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)
  94. if request:
  95. self.stats.inc_value('scheduler/dequeued', spider=self.spider)
  96. return request
  97. def __len__(self):
  98. return len(self.dqs) + len(self.mqs) if self.dqs else len(self.mqs)
  99. def _dqpush(self, request):
  100. if self.dqs is None:
  101. return
  102. try:
  103. self.dqs.push(request, -request.priority)
  104. except ValueError as e: # non serializable request
  105. if self.logunser:
  106. msg = ("Unable to serialize request: %(request)s - reason:"
  107. " %(reason)s - no more unserializable requests will be"
  108. " logged (stats being collected)")
  109. logger.warning(msg, {'request': request, 'reason': e},
  110. exc_info=True, extra={'spider': self.spider})
  111. self.logunser = False
  112. self.stats.inc_value('scheduler/unserializable',
  113. spider=self.spider)
  114. return
  115. else:
  116. return True
  117. def _mqpush(self, request):
  118. self.mqs.push(request, -request.priority)
  119. def _dqpop(self):
  120. if self.dqs:
  121. return self.dqs.pop()
  122. def _newmq(self, priority):
  123. """ Factory for creating memory queues. """
  124. return self.mqclass()
  125. def _newdq(self, priority):
  126. """ Factory for creating disk queues. """
  127. path = join(self.dqdir, 'p%s' % (priority, ))
  128. return self.dqclass(path)
  129. def _mq(self):
  130. """ Create a new priority queue instance, with in-memory storage """
  131. return create_instance(self.pqclass, None, self.crawler, self._newmq,
  132. serialize=False)
  133. def _dq(self):
  134. """ Create a new priority queue instance, with disk storage """
  135. state = self._read_dqs_state(self.dqdir)
  136. q = create_instance(self.pqclass,
  137. None,
  138. self.crawler,
  139. self._newdq,
  140. state,
  141. serialize=True)
  142. if q:
  143. logger.info("Resuming crawl (%(queuesize)d requests scheduled)",
  144. {'queuesize': len(q)}, extra={'spider': self.spider})
  145. return q
  146. def _dqdir(self, jobdir):
  147. """ Return a folder name to keep disk queue state at """
  148. if jobdir:
  149. dqdir = join(jobdir, 'requests.queue')
  150. if not exists(dqdir):
  151. os.makedirs(dqdir)
  152. return dqdir
  153. def _read_dqs_state(self, dqdir):
  154. path = join(dqdir, 'active.json')
  155. if not exists(path):
  156. return ()
  157. with open(path) as f:
  158. return json.load(f)
  159. def _write_dqs_state(self, dqdir, state):
  160. with open(join(dqdir, 'active.json'), 'w') as f:
  161. json.dump(state, f)