each.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. from py.log import Producer
  2. from xdist.workermanage import parse_spec_config
  3. from xdist.report import report_collection_diff
  4. class EachScheduling(object):
  5. """Implement scheduling of test items on all nodes
  6. If a node gets added after the test run is started then it is
  7. assumed to replace a node which got removed before it finished
  8. its collection. In this case it will only be used if a node
  9. with the same spec got removed earlier.
  10. Any nodes added after the run is started will only get items
  11. assigned if a node with a matching spec was removed before it
  12. finished all its pending items. The new node will then be
  13. assigned the remaining items from the removed node.
  14. """
  15. def __init__(self, config, log=None):
  16. self.config = config
  17. self.numnodes = len(parse_spec_config(config))
  18. self.node2collection = {}
  19. self.node2pending = {}
  20. self._started = []
  21. self._removed2pending = {}
  22. if log is None:
  23. self.log = Producer("eachsched")
  24. else:
  25. self.log = log.eachsched
  26. self.collection_is_completed = False
  27. @property
  28. def nodes(self):
  29. """A list of all nodes in the scheduler."""
  30. return list(self.node2pending.keys())
  31. @property
  32. def tests_finished(self):
  33. if not self.collection_is_completed:
  34. return False
  35. if self._removed2pending:
  36. return False
  37. for pending in self.node2pending.values():
  38. if len(pending) >= 2:
  39. return False
  40. return True
  41. @property
  42. def has_pending(self):
  43. """Return True if there are pending test items
  44. This indicates that collection has finished and nodes are
  45. still processing test items, so this can be thought of as
  46. "the scheduler is active".
  47. """
  48. for pending in self.node2pending.values():
  49. if pending:
  50. return True
  51. return False
  52. def add_node(self, node):
  53. assert node not in self.node2pending
  54. self.node2pending[node] = []
  55. def add_node_collection(self, node, collection):
  56. """Add the collected test items from a node
  57. Collection is complete once all nodes have submitted their
  58. collection. In this case its pending list is set to an empty
  59. list. When the collection is already completed this
  60. submission is from a node which was restarted to replace a
  61. dead node. In this case we already assign the pending items
  62. here. In either case ``.schedule()`` will instruct the
  63. node to start running the required tests.
  64. """
  65. assert node in self.node2pending
  66. if not self.collection_is_completed:
  67. self.node2collection[node] = list(collection)
  68. self.node2pending[node] = []
  69. if len(self.node2collection) >= self.numnodes:
  70. self.collection_is_completed = True
  71. elif self._removed2pending:
  72. for deadnode in self._removed2pending:
  73. if deadnode.gateway.spec == node.gateway.spec:
  74. dead_collection = self.node2collection[deadnode]
  75. if collection != dead_collection:
  76. msg = report_collection_diff(
  77. dead_collection,
  78. collection,
  79. deadnode.gateway.id,
  80. node.gateway.id,
  81. )
  82. self.log(msg)
  83. return
  84. pending = self._removed2pending.pop(deadnode)
  85. self.node2pending[node] = pending
  86. break
  87. def mark_test_complete(self, node, item_index, duration=0):
  88. self.node2pending[node].remove(item_index)
  89. def remove_node(self, node):
  90. # KeyError if we didn't get an add_node() yet
  91. pending = self.node2pending.pop(node)
  92. if not pending:
  93. return
  94. crashitem = self.node2collection[node][pending.pop(0)]
  95. if pending:
  96. self._removed2pending[node] = pending
  97. return crashitem
  98. def schedule(self):
  99. """Schedule the test items on the nodes
  100. If the node's pending list is empty it is a new node which
  101. needs to run all the tests. If the pending list is already
  102. populated (by ``.add_node_collection()``) then it replaces a
  103. dead node and we only need to run those tests.
  104. """
  105. assert self.collection_is_completed
  106. for node, pending in self.node2pending.items():
  107. if node in self._started:
  108. continue
  109. if not pending:
  110. pending[:] = range(len(self.node2collection[node]))
  111. node.send_runtest_all()
  112. else:
  113. node.send_runtest_some(pending)
  114. self._started.append(node)