pqueues.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import hashlib
  2. import logging
  3. from collections import namedtuple
  4. from queuelib import PriorityQueue
  5. from scrapy.utils.reqser import request_to_dict, request_from_dict
  6. logger = logging.getLogger(__name__)
  7. def _path_safe(text):
  8. """
  9. Return a filesystem-safe version of a string ``text``
  10. >>> _path_safe('simple.org').startswith('simple.org')
  11. True
  12. >>> _path_safe('dash-underscore_.org').startswith('dash-underscore_.org')
  13. True
  14. >>> _path_safe('some@symbol?').startswith('some_symbol_')
  15. True
  16. """
  17. pathable_slot = "".join([c if c.isalnum() or c in '-._' else '_'
  18. for c in text])
  19. # as we replace some letters we can get collision for different slots
  20. # add we add unique part
  21. unique_slot = hashlib.md5(text.encode('utf8')).hexdigest()
  22. return '-'.join([pathable_slot, unique_slot])
  23. class _Priority(namedtuple("_Priority", ["priority", "slot"])):
  24. """ Slot-specific priority. It is a hack - ``(priority, slot)`` tuple
  25. which can be used instead of int priorities in queues:
  26. * they are ordered in the same way - order is still by priority value,
  27. min(prios) works;
  28. * str(p) representation is guaranteed to be different when slots
  29. are different - this is important because str(p) is used to create
  30. queue files on disk;
  31. * they have readable str(p) representation which is safe
  32. to use as a file name.
  33. """
  34. __slots__ = ()
  35. def __str__(self):
  36. return '%s_%s' % (self.priority, _path_safe(str(self.slot)))
  37. class _SlotPriorityQueues(object):
  38. """ Container for multiple priority queues. """
  39. def __init__(self, pqfactory, slot_startprios=None):
  40. """
  41. ``pqfactory`` is a factory for creating new PriorityQueues.
  42. It must be a function which accepts a single optional ``startprios``
  43. argument, with a list of priorities to create queues for.
  44. ``slot_startprios`` is a ``{slot: startprios}`` dict.
  45. """
  46. self.pqfactory = pqfactory
  47. self.pqueues = {} # slot -> priority queue
  48. for slot, startprios in (slot_startprios or {}).items():
  49. self.pqueues[slot] = self.pqfactory(startprios)
  50. def pop_slot(self, slot):
  51. """ Pop an object from a priority queue for this slot """
  52. queue = self.pqueues[slot]
  53. request = queue.pop()
  54. if len(queue) == 0:
  55. del self.pqueues[slot]
  56. return request
  57. def push_slot(self, slot, obj, priority):
  58. """ Push an object to a priority queue for this slot """
  59. if slot not in self.pqueues:
  60. self.pqueues[slot] = self.pqfactory()
  61. queue = self.pqueues[slot]
  62. queue.push(obj, priority)
  63. def close(self):
  64. active = {slot: queue.close()
  65. for slot, queue in self.pqueues.items()}
  66. self.pqueues.clear()
  67. return active
  68. def __len__(self):
  69. return sum(len(x) for x in self.pqueues.values()) if self.pqueues else 0
  70. def __contains__(self, slot):
  71. return slot in self.pqueues
  72. class ScrapyPriorityQueue(PriorityQueue):
  73. """
  74. PriorityQueue which works with scrapy.Request instances and
  75. can optionally convert them to/from dicts before/after putting to a queue.
  76. """
  77. def __init__(self, crawler, qfactory, startprios=(), serialize=False):
  78. super(ScrapyPriorityQueue, self).__init__(qfactory, startprios)
  79. self.serialize = serialize
  80. self.spider = crawler.spider
  81. @classmethod
  82. def from_crawler(cls, crawler, qfactory, startprios=(), serialize=False):
  83. return cls(crawler, qfactory, startprios, serialize)
  84. def push(self, request, priority=0):
  85. if self.serialize:
  86. request = request_to_dict(request, self.spider)
  87. super(ScrapyPriorityQueue, self).push(request, priority)
  88. def pop(self):
  89. request = super(ScrapyPriorityQueue, self).pop()
  90. if request and self.serialize:
  91. request = request_from_dict(request, self.spider)
  92. return request
  93. class DownloaderInterface(object):
  94. def __init__(self, crawler):
  95. self.downloader = crawler.engine.downloader
  96. def stats(self, possible_slots):
  97. return [(self._active_downloads(slot), slot)
  98. for slot in possible_slots]
  99. def get_slot_key(self, request):
  100. return self.downloader._get_slot_key(request, None)
  101. def _active_downloads(self, slot):
  102. """ Return a number of requests in a Downloader for a given slot """
  103. if slot not in self.downloader.slots:
  104. return 0
  105. return len(self.downloader.slots[slot].active)
  106. class DownloaderAwarePriorityQueue(object):
  107. """ PriorityQueue which takes Downlaoder activity in account:
  108. domains (slots) with the least amount of active downloads are dequeued
  109. first.
  110. """
  111. @classmethod
  112. def from_crawler(cls, crawler, qfactory, slot_startprios=None, serialize=False):
  113. return cls(crawler, qfactory, slot_startprios, serialize)
  114. def __init__(self, crawler, qfactory, slot_startprios=None, serialize=False):
  115. if crawler.settings.getint('CONCURRENT_REQUESTS_PER_IP') != 0:
  116. raise ValueError('"%s" does not support CONCURRENT_REQUESTS_PER_IP'
  117. % (self.__class__,))
  118. if slot_startprios and not isinstance(slot_startprios, dict):
  119. raise ValueError("DownloaderAwarePriorityQueue accepts "
  120. "``slot_startprios`` as a dict; %r instance "
  121. "is passed. Most likely, it means the state is"
  122. "created by an incompatible priority queue. "
  123. "Only a crawl started with the same priority "
  124. "queue class can be resumed." %
  125. slot_startprios.__class__)
  126. slot_startprios = {
  127. slot: [_Priority(p, slot) for p in startprios]
  128. for slot, startprios in (slot_startprios or {}).items()}
  129. def pqfactory(startprios=()):
  130. return ScrapyPriorityQueue(crawler, qfactory, startprios, serialize)
  131. self._slot_pqueues = _SlotPriorityQueues(pqfactory, slot_startprios)
  132. self.serialize = serialize
  133. self._downloader_interface = DownloaderInterface(crawler)
  134. def pop(self):
  135. stats = self._downloader_interface.stats(self._slot_pqueues.pqueues)
  136. if not stats:
  137. return
  138. slot = min(stats)[1]
  139. request = self._slot_pqueues.pop_slot(slot)
  140. return request
  141. def push(self, request, priority):
  142. slot = self._downloader_interface.get_slot_key(request)
  143. priority_slot = _Priority(priority=priority, slot=slot)
  144. self._slot_pqueues.push_slot(slot, request, priority_slot)
  145. def close(self):
  146. active = self._slot_pqueues.close()
  147. return {slot: [p.priority for p in startprios]
  148. for slot, startprios in active.items()}
  149. def __len__(self):
  150. return len(self._slot_pqueues)