123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- from itertools import cycle
- from py.log import Producer
- from _pytest.runner import CollectReport
- from xdist.workermanage import parse_spec_config
- from xdist.report import report_collection_diff
- class LoadScheduling(object):
- """Implement load scheduling across nodes.
- This distributes the tests collected across all nodes so each test
- is run just once. All nodes collect and submit the test suite and
- when all collections are received it is verified they are
- identical collections. Then the collection gets divided up in
- chunks and chunks get submitted to nodes. Whenever a node finishes
- an item, it calls ``.mark_test_complete()`` which will trigger the
- scheduler to assign more tests if the number of pending tests for
- the node falls below a low-watermark.
- When created, ``numnodes`` defines how many nodes are expected to
- submit a collection. This is used to know when all nodes have
- finished collection or how large the chunks need to be created.
- Attributes:
- :numnodes: The expected number of nodes taking part. The actual
- number of nodes will vary during the scheduler's lifetime as
- nodes are added by the DSession as they are brought up and
- removed either because of a dead node or normal shutdown. This
- number is primarily used to know when the initial collection is
- completed.
- :node2collection: Map of nodes and their test collection. All
- collections should always be identical.
- :node2pending: Map of nodes and the indices of their pending
- tests. The indices are an index into ``.pending`` (which is
- identical to their own collection stored in
- ``.node2collection``).
- :collection: The one collection once it is validated to be
- identical between all the nodes. It is initialised to None
- until ``.schedule()`` is called.
- :pending: List of indices of globally pending tests. These are
- tests which have not yet been allocated to a chunk for a node
- to process.
- :log: A py.log.Producer instance.
- :config: Config object, used for handling hooks.
- """
- def __init__(self, config, log=None):
- self.numnodes = len(parse_spec_config(config))
- self.node2collection = {}
- self.node2pending = {}
- self.pending = []
- self.collection = None
- if log is None:
- self.log = Producer("loadsched")
- else:
- self.log = log.loadsched
- self.config = config
- @property
- def nodes(self):
- """A list of all nodes in the scheduler."""
- return list(self.node2pending.keys())
- @property
- def collection_is_completed(self):
- """Boolean indication initial test collection is complete.
- This is a boolean indicating all initial participating nodes
- have finished collection. The required number of initial
- nodes is defined by ``.numnodes``.
- """
- return len(self.node2collection) >= self.numnodes
- @property
- def tests_finished(self):
- """Return True if all tests have been executed by the nodes."""
- if not self.collection_is_completed:
- return False
- if self.pending:
- return False
- for pending in self.node2pending.values():
- if len(pending) >= 2:
- return False
- return True
- @property
- def has_pending(self):
- """Return True if there are pending test items
- This indicates that collection has finished and nodes are
- still processing test items, so this can be thought of as
- "the scheduler is active".
- """
- if self.pending:
- return True
- for pending in self.node2pending.values():
- if pending:
- return True
- return False
- def add_node(self, node):
- """Add a new node to the scheduler.
- From now on the node will be allocated chunks of tests to
- execute.
- Called by the ``DSession.worker_workerready`` hook when it
- successfully bootstraps a new node.
- """
- assert node not in self.node2pending
- self.node2pending[node] = []
- def add_node_collection(self, node, collection):
- """Add the collected test items from a node
- The collection is stored in the ``.node2collection`` map.
- Called by the ``DSession.worker_collectionfinish`` hook.
- """
- assert node in self.node2pending
- if self.collection_is_completed:
- # A new node has been added later, perhaps an original one died.
- # .schedule() should have
- # been called by now
- assert self.collection
- if collection != self.collection:
- other_node = next(iter(self.node2collection.keys()))
- msg = report_collection_diff(
- self.collection, collection, other_node.gateway.id, node.gateway.id
- )
- self.log(msg)
- return
- self.node2collection[node] = list(collection)
- def mark_test_complete(self, node, item_index, duration=0):
- """Mark test item as completed by node
- The duration it took to execute the item is used as a hint to
- the scheduler.
- This is called by the ``DSession.worker_testreport`` hook.
- """
- self.node2pending[node].remove(item_index)
- self.check_schedule(node, duration=duration)
- def check_schedule(self, node, duration=0):
- """Maybe schedule new items on the node
- If there are any globally pending nodes left then this will
- check if the given node should be given any more tests. The
- ``duration`` of the last test is optionally used as a
- heuristic to influence how many tests the node is assigned.
- """
- if node.shutting_down:
- return
- if self.pending:
- # how many nodes do we have?
- num_nodes = len(self.node2pending)
- # if our node goes below a heuristic minimum, fill it out to
- # heuristic maximum
- items_per_node_min = max(2, len(self.pending) // num_nodes // 4)
- items_per_node_max = max(2, len(self.pending) // num_nodes // 2)
- node_pending = self.node2pending[node]
- if len(node_pending) < items_per_node_min:
- if duration >= 0.1 and len(node_pending) >= 2:
- # seems the node is doing long-running tests
- # and has enough items to continue
- # so let's rather wait with sending new items
- return
- num_send = items_per_node_max - len(node_pending)
- self._send_tests(node, num_send)
- self.log("num items waiting for node:", len(self.pending))
- def remove_node(self, node):
- """Remove a node from the scheduler
- This should be called either when the node crashed or at
- shutdown time. In the former case any pending items assigned
- to the node will be re-scheduled. Called by the
- ``DSession.worker_workerfinished`` and
- ``DSession.worker_errordown`` hooks.
- Return the item which was being executing while the node
- crashed or None if the node has no more pending items.
- """
- pending = self.node2pending.pop(node)
- if not pending:
- return
- # The node crashed, reassing pending items
- crashitem = self.collection[pending.pop(0)]
- self.pending.extend(pending)
- for node in self.node2pending:
- self.check_schedule(node)
- return crashitem
- def schedule(self):
- """Initiate distribution of the test collection
- Initiate scheduling of the items across the nodes. If this
- gets called again later it behaves the same as calling
- ``.check_schedule()`` on all nodes so that newly added nodes
- will start to be used.
- This is called by the ``DSession.worker_collectionfinish`` hook
- if ``.collection_is_completed`` is True.
- """
- assert self.collection_is_completed
- # Initial distribution already happened, reschedule on all nodes
- if self.collection is not None:
- for node in self.nodes:
- self.check_schedule(node)
- return
- # XXX allow nodes to have different collections
- if not self._check_nodes_have_same_collection():
- self.log("**Different tests collected, aborting run**")
- return
- # Collections are identical, create the index of pending items.
- self.collection = list(self.node2collection.values())[0]
- self.pending[:] = range(len(self.collection))
- if not self.collection:
- return
- # Send a batch of tests to run. If we don't have at least two
- # tests per node, we have to send them all so that we can send
- # shutdown signals and get all nodes working.
- initial_batch = max(len(self.pending) // 4, 2 * len(self.nodes))
- # distribute tests round-robin up to the batch size
- # (or until we run out)
- nodes = cycle(self.nodes)
- for i in range(initial_batch):
- self._send_tests(next(nodes), 1)
- if not self.pending:
- # initial distribution sent all tests, start node shutdown
- for node in self.nodes:
- node.shutdown()
- def _send_tests(self, node, num):
- tests_per_node = self.pending[:num]
- if tests_per_node:
- del self.pending[:num]
- self.node2pending[node].extend(tests_per_node)
- node.send_runtest_some(tests_per_node)
- def _check_nodes_have_same_collection(self):
- """Return True if all nodes have collected the same items.
- If collections differ, this method returns False while logging
- the collection differences and posting collection errors to
- pytest_collectreport hook.
- """
- node_collection_items = list(self.node2collection.items())
- first_node, col = node_collection_items[0]
- same_collection = True
- for node, collection in node_collection_items[1:]:
- msg = report_collection_diff(
- col, collection, first_node.gateway.id, node.gateway.id
- )
- if msg:
- same_collection = False
- self.log(msg)
- if self.config is not None:
- rep = CollectReport(
- node.gateway.id, "failed", longrepr=msg, result=[]
- )
- self.config.hook.pytest_collectreport(report=rep)
- return same_collection
|