loadscope.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. from collections import OrderedDict
  2. from _pytest.runner import CollectReport
  3. from py.log import Producer
  4. from xdist.report import report_collection_diff
  5. from xdist.workermanage import parse_spec_config
  6. class LoadScopeScheduling(object):
  7. """Implement load scheduling across nodes, but grouping test by scope.
  8. This distributes the tests collected across all nodes so each test is run
  9. just once. All nodes collect and submit the list of tests and when all
  10. collections are received it is verified they are identical collections.
  11. Then the collection gets divided up in work units, grouped by test scope,
  12. and those work units get submitted to nodes. Whenever a node finishes an
  13. item, it calls ``.mark_test_complete()`` which will trigger the scheduler
  14. to assign more work units if the number of pending tests for the node falls
  15. below a low-watermark.
  16. When created, ``numnodes`` defines how many nodes are expected to submit a
  17. collection. This is used to know when all nodes have finished collection.
  18. Attributes:
  19. :numnodes: The expected number of nodes taking part. The actual number of
  20. nodes will vary during the scheduler's lifetime as nodes are added by
  21. the DSession as they are brought up and removed either because of a dead
  22. node or normal shutdown. This number is primarily used to know when the
  23. initial collection is completed.
  24. :collection: The final list of tests collected by all nodes once it is
  25. validated to be identical between all the nodes. It is initialised to
  26. None until ``.schedule()`` is called.
  27. :workqueue: Ordered dictionary that maps all available scopes with their
  28. associated tests (nodeid). Nodeids are in turn associated with their
  29. completion status. One entry of the workqueue is called a work unit.
  30. In turn, a collection of work unit is called a workload.
  31. ::
  32. workqueue = {
  33. '<full>/<path>/<to>/test_module.py': {
  34. '<full>/<path>/<to>/test_module.py::test_case1': False,
  35. '<full>/<path>/<to>/test_module.py::test_case2': False,
  36. (...)
  37. },
  38. (...)
  39. }
  40. :assigned_work: Ordered dictionary that maps worker nodes with their
  41. assigned work units.
  42. ::
  43. assigned_work = {
  44. '<worker node A>': {
  45. '<full>/<path>/<to>/test_module.py': {
  46. '<full>/<path>/<to>/test_module.py::test_case1': False,
  47. '<full>/<path>/<to>/test_module.py::test_case2': False,
  48. (...)
  49. },
  50. (...)
  51. },
  52. (...)
  53. }
  54. :registered_collections: Ordered dictionary that maps worker nodes with
  55. their collection of tests gathered during test discovery.
  56. ::
  57. registered_collections = {
  58. '<worker node A>': [
  59. '<full>/<path>/<to>/test_module.py::test_case1',
  60. '<full>/<path>/<to>/test_module.py::test_case2',
  61. ],
  62. (...)
  63. }
  64. :log: A py.log.Producer instance.
  65. :config: Config object, used for handling hooks.
  66. """
  67. def __init__(self, config, log=None):
  68. self.numnodes = len(parse_spec_config(config))
  69. self.collection = None
  70. self.workqueue = OrderedDict()
  71. self.assigned_work = OrderedDict()
  72. self.registered_collections = OrderedDict()
  73. if log is None:
  74. self.log = Producer("loadscopesched")
  75. else:
  76. self.log = log.loadscopesched
  77. self.config = config
  78. @property
  79. def nodes(self):
  80. """A list of all active nodes in the scheduler."""
  81. return list(self.assigned_work.keys())
  82. @property
  83. def collection_is_completed(self):
  84. """Boolean indication initial test collection is complete.
  85. This is a boolean indicating all initial participating nodes have
  86. finished collection. The required number of initial nodes is defined
  87. by ``.numnodes``.
  88. """
  89. return len(self.registered_collections) >= self.numnodes
  90. @property
  91. def tests_finished(self):
  92. """Return True if all tests have been executed by the nodes."""
  93. if not self.collection_is_completed:
  94. return False
  95. if self.workqueue:
  96. return False
  97. for assigned_unit in self.assigned_work.values():
  98. if self._pending_of(assigned_unit) >= 2:
  99. return False
  100. return True
  101. @property
  102. def has_pending(self):
  103. """Return True if there are pending test items.
  104. This indicates that collection has finished and nodes are still
  105. processing test items, so this can be thought of as
  106. "the scheduler is active".
  107. """
  108. if self.workqueue:
  109. return True
  110. for assigned_unit in self.assigned_work.values():
  111. if self._pending_of(assigned_unit) > 0:
  112. return True
  113. return False
  114. def add_node(self, node):
  115. """Add a new node to the scheduler.
  116. From now on the node will be assigned work units to be executed.
  117. Called by the ``DSession.worker_workerready`` hook when it successfully
  118. bootstraps a new node.
  119. """
  120. assert node not in self.assigned_work
  121. self.assigned_work[node] = OrderedDict()
  122. def remove_node(self, node):
  123. """Remove a node from the scheduler.
  124. This should be called either when the node crashed or at shutdown time.
  125. In the former case any pending items assigned to the node will be
  126. re-scheduled.
  127. Called by the hooks:
  128. - ``DSession.worker_workerfinished``.
  129. - ``DSession.worker_errordown``.
  130. Return the item being executed while the node crashed or None if the
  131. node has no more pending items.
  132. """
  133. workload = self.assigned_work.pop(node)
  134. if not self._pending_of(workload):
  135. return None
  136. # The node crashed, identify test that crashed
  137. for work_unit in workload.values():
  138. for nodeid, completed in work_unit.items():
  139. if not completed:
  140. crashitem = nodeid
  141. break
  142. else:
  143. continue
  144. break
  145. else:
  146. raise RuntimeError(
  147. "Unable to identify crashitem on a workload with " "pending items"
  148. )
  149. # Made uncompleted work unit available again
  150. self.workqueue.update(workload)
  151. for node in self.assigned_work:
  152. self._reschedule(node)
  153. return crashitem
  154. def add_node_collection(self, node, collection):
  155. """Add the collected test items from a node.
  156. The collection is stored in the ``.registered_collections`` dictionary.
  157. Called by the hook:
  158. - ``DSession.worker_collectionfinish``.
  159. """
  160. # Check that add_node() was called on the node before
  161. assert node in self.assigned_work
  162. # A new node has been added later, perhaps an original one died.
  163. if self.collection_is_completed:
  164. # Assert that .schedule() should have been called by now
  165. assert self.collection
  166. # Check that the new collection matches the official collection
  167. if collection != self.collection:
  168. other_node = next(iter(self.registered_collections.keys()))
  169. msg = report_collection_diff(
  170. self.collection, collection, other_node.gateway.id, node.gateway.id
  171. )
  172. self.log(msg)
  173. return
  174. self.registered_collections[node] = list(collection)
  175. def mark_test_complete(self, node, item_index, duration=0):
  176. """Mark test item as completed by node.
  177. Called by the hook:
  178. - ``DSession.worker_testreport``.
  179. """
  180. nodeid = self.registered_collections[node][item_index]
  181. scope = self._split_scope(nodeid)
  182. self.assigned_work[node][scope][nodeid] = True
  183. self._reschedule(node)
  184. def _assign_work_unit(self, node):
  185. """Assign a work unit to a node."""
  186. assert self.workqueue
  187. # Grab a unit of work
  188. scope, work_unit = self.workqueue.popitem(last=False)
  189. # Keep track of the assigned work
  190. assigned_to_node = self.assigned_work.setdefault(node, default=OrderedDict())
  191. assigned_to_node[scope] = work_unit
  192. # Ask the node to execute the workload
  193. worker_collection = self.registered_collections[node]
  194. nodeids_indexes = [
  195. worker_collection.index(nodeid)
  196. for nodeid, completed in work_unit.items()
  197. if not completed
  198. ]
  199. node.send_runtest_some(nodeids_indexes)
  200. def _split_scope(self, nodeid):
  201. """Determine the scope (grouping) of a nodeid.
  202. There are usually 3 cases for a nodeid::
  203. example/loadsuite/test/test_beta.py::test_beta0
  204. example/loadsuite/test/test_delta.py::Delta1::test_delta0
  205. example/loadsuite/epsilon/__init__.py::epsilon.epsilon
  206. #. Function in a test module.
  207. #. Method of a class in a test module.
  208. #. Doctest in a function in a package.
  209. This function will group tests with the scope determined by splitting
  210. the first ``::`` from the right. That is, classes will be grouped in a
  211. single work unit, and functions from a test module will be grouped by
  212. their module. In the above example, scopes will be::
  213. example/loadsuite/test/test_beta.py
  214. example/loadsuite/test/test_delta.py::Delta1
  215. example/loadsuite/epsilon/__init__.py
  216. """
  217. return nodeid.rsplit("::", 1)[0]
  218. def _pending_of(self, workload):
  219. """Return the number of pending tests in a workload."""
  220. pending = sum(list(scope.values()).count(False) for scope in workload.values())
  221. return pending
  222. def _reschedule(self, node):
  223. """Maybe schedule new items on the node.
  224. If there are any globally pending work units left then this will check
  225. if the given node should be given any more tests.
  226. """
  227. # Do not add more work to a node shutting down
  228. if node.shutting_down:
  229. return
  230. # Check that more work is available
  231. if not self.workqueue:
  232. return
  233. self.log("Number of units waiting for node:", len(self.workqueue))
  234. # Check that the node is almost depleted of work
  235. # 2: Heuristic of minimum tests to enqueue more work
  236. if self._pending_of(self.assigned_work[node]) > 2:
  237. return
  238. # Pop one unit of work and assign it
  239. self._assign_work_unit(node)
  240. def schedule(self):
  241. """Initiate distribution of the test collection.
  242. Initiate scheduling of the items across the nodes. If this gets called
  243. again later it behaves the same as calling ``._reschedule()`` on all
  244. nodes so that newly added nodes will start to be used.
  245. If ``.collection_is_completed`` is True, this is called by the hook:
  246. - ``DSession.worker_collectionfinish``.
  247. """
  248. assert self.collection_is_completed
  249. # Initial distribution already happened, reschedule on all nodes
  250. if self.collection is not None:
  251. for node in self.nodes:
  252. self._reschedule(node)
  253. return
  254. # Check that all nodes collected the same tests
  255. if not self._check_nodes_have_same_collection():
  256. self.log("**Different tests collected, aborting run**")
  257. return
  258. # Collections are identical, create the final list of items
  259. self.collection = list(next(iter(self.registered_collections.values())))
  260. if not self.collection:
  261. return
  262. # Determine chunks of work (scopes)
  263. for nodeid in self.collection:
  264. scope = self._split_scope(nodeid)
  265. work_unit = self.workqueue.setdefault(scope, default=OrderedDict())
  266. work_unit[nodeid] = False
  267. # Avoid having more workers than work
  268. extra_nodes = len(self.nodes) - len(self.workqueue)
  269. if extra_nodes > 0:
  270. self.log("Shuting down {0} nodes".format(extra_nodes))
  271. for _ in range(extra_nodes):
  272. unused_node, assigned = self.assigned_work.popitem(last=True)
  273. self.log("Shuting down unused node {0}".format(unused_node))
  274. unused_node.shutdown()
  275. # Assign initial workload
  276. for node in self.nodes:
  277. self._assign_work_unit(node)
  278. # Ensure nodes start with at least two work units if possible (#277)
  279. for node in self.nodes:
  280. self._reschedule(node)
  281. # Initial distribution sent all tests, start node shutdown
  282. if not self.workqueue:
  283. for node in self.nodes:
  284. node.shutdown()
  285. def _check_nodes_have_same_collection(self):
  286. """Return True if all nodes have collected the same items.
  287. If collections differ, this method returns False while logging
  288. the collection differences and posting collection errors to
  289. pytest_collectreport hook.
  290. """
  291. node_collection_items = list(self.registered_collections.items())
  292. first_node, col = node_collection_items[0]
  293. same_collection = True
  294. for node, collection in node_collection_items[1:]:
  295. msg = report_collection_diff(
  296. col, collection, first_node.gateway.id, node.gateway.id
  297. )
  298. if not msg:
  299. continue
  300. same_collection = False
  301. self.log(msg)
  302. if self.config is None:
  303. continue
  304. rep = CollectReport(node.gateway.id, "failed", longrepr=msg, result=[])
  305. self.config.hook.pytest_collectreport(report=rep)
  306. return same_collection