Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement work-stealing scheduler #862

Merged
merged 3 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/858.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New ``worksteal`` scheduler, based on the idea of `work stealing <https://en.wikipedia.org/wiki/Work_stealing>`_. It's similar to ``load`` scheduler, but it should handle tests with significantly differing duration better, and, at the same time, it should provide similar or better reuse of fixtures.
amezin marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 9 additions & 0 deletions docs/distribution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,13 @@ The test distribution algorithm is configured with the ``--dist`` command-line o
This will make sure ``test1`` and ``TestA::test2`` will run in the same worker.
Tests without the ``xdist_group`` mark are distributed normally as in the ``--dist=load`` mode.

* ``--dist worksteal``: Initially, tests are distributed evenly among all
available workers. When a worker completes most of its assigned tests and
doesn't have enough tests to continue (currently, every worker needs at least
two tests in its queue), an attempt is made to reassign ("steal") a portion
of tests from some other worker's queue. The results should be similar to
the ``load`` method, but ``worksteal`` should handle tests with significantly
differing duration better, and, at the same time, it should provide similar
or better reuse of fixtures.

* ``--dist no``: The normal pytest execution mode, runs one test at a time (no distribution at all).
13 changes: 13 additions & 0 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
LoadScopeScheduling,
LoadFileScheduling,
LoadGroupScheduling,
WorkStealingScheduling,
)


Expand Down Expand Up @@ -100,6 +101,7 @@ def pytest_xdist_make_scheduler(self, config, log):
"loadscope": LoadScopeScheduling,
"loadfile": LoadFileScheduling,
"loadgroup": LoadGroupScheduling,
"worksteal": WorkStealingScheduling,
}
return schedulers[dist](config, log)

Expand Down Expand Up @@ -282,6 +284,17 @@ def worker_runtest_protocol_complete(self, node, item_index, duration):
"""
self.sched.mark_test_complete(node, item_index, duration)

def worker_unscheduled(self, node, indices):
"""
Emitted when a node fires the 'unscheduled' event, signalling that
some tests have been removed from the worker's queue and should be
sent to some worker again.
nicoddemus marked this conversation as resolved.
Show resolved Hide resolved

This should happen only in response to 'steal' command, so schedulers
not using 'steal' command don't have to implement it.
"""
self.sched.remove_pending_tests_from_node(node, indices)

def worker_collectreport(self, node, rep):
"""Emitted when a node calls the pytest_collectreport hook.

Expand Down
12 changes: 11 additions & 1 deletion src/xdist/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,15 @@ def pytest_addoption(parser):
"--dist",
metavar="distmode",
action="store",
choices=["each", "load", "loadscope", "loadfile", "loadgroup", "no"],
choices=[
"each",
"load",
"loadscope",
"loadfile",
"loadgroup",
"worksteal",
"no",
],
dest="dist",
default="no",
help=(
Expand All @@ -107,6 +115,8 @@ def pytest_addoption(parser):
"loadfile: load balance by sending test grouped by file"
" to any available environment.\n\n"
"loadgroup: like load, but sends tests marked with 'xdist_group' to the same worker.\n\n"
"worksteal: split the test suite between available environments,"
" then rebalance when any worker runs out of tests.\n\n"
"(default) no: run tests inprocess, don't distribute."
),
)
Expand Down
78 changes: 54 additions & 24 deletions src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
needs not to be installed in remote environments.
"""

import contextlib
import sys
import os
import time
Expand Down Expand Up @@ -56,14 +57,21 @@ def worker_title(title):


class WorkerInteractor:
SHUTDOWN_MARK = object()

def __init__(self, config, channel):
self.config = config
self.workerid = config.workerinput.get("workerid", "?")
self.testrunuid = config.workerinput["testrunuid"]
self.log = Producer(f"worker-{self.workerid}", enabled=config.option.debug)
self.channel = channel
self.torun = self._make_queue()
self.nextitem_index = None
config.pluginmanager.register(self)

def _make_queue(self):
return self.channel.gateway.execmodel.queue.Queue()

def sendevent(self, name, **kwargs):
self.log("sending", name, kwargs)
self.channel.send((name, kwargs))
Expand Down Expand Up @@ -92,38 +100,60 @@ def pytest_sessionfinish(self, exitstatus):
def pytest_collection(self, session):
self.sendevent("collectionstart")

def handle_command(self, command):
nicoddemus marked this conversation as resolved.
Show resolved Hide resolved
if command is self.SHUTDOWN_MARK:
self.torun.put(self.SHUTDOWN_MARK)
return

name, kwargs = command

self.log("received command", name, kwargs)
if name == "runtests":
for i in kwargs["indices"]:
self.torun.put(i)
elif name == "runtests_all":
for i in range(len(self.session.items)):
self.torun.put(i)
elif name == "shutdown":
self.torun.put(self.SHUTDOWN_MARK)
elif name == "steal":
self.steal(kwargs["indices"])

def steal(self, indices):
indices = set(indices)
stolen = []

old_queue, self.torun = self.torun, self._make_queue()

def old_queue_get_nowait_noraise():
with contextlib.suppress(self.channel.gateway.execmodel.queue.Empty):
return old_queue.get_nowait()

for i in iter(old_queue_get_nowait_noraise, None):
if i in indices:
stolen.append(i)
else:
self.torun.put(i)

self.sendevent("unscheduled", indices=stolen)

@pytest.hookimpl
def pytest_runtestloop(self, session):
self.log("entering main loop")
torun = []
while 1:
try:
name, kwargs = self.channel.receive()
except EOFError:
return True
self.log("received command", name, kwargs)
if name == "runtests":
torun.extend(kwargs["indices"])
elif name == "runtests_all":
torun.extend(range(len(session.items)))
self.log("items to run:", torun)
# only run if we have an item and a next item
while len(torun) >= 2:
self.run_one_test(torun)
if name == "shutdown":
if torun:
self.run_one_test(torun)
break
self.channel.setcallback(self.handle_command, endmarker=self.SHUTDOWN_MARK)
self.nextitem_index = self.torun.get()
while self.nextitem_index is not self.SHUTDOWN_MARK:
self.run_one_test()
return True

def run_one_test(self, torun):
def run_one_test(self):
items = self.session.items
self.item_index = torun.pop(0)
self.item_index, self.nextitem_index = self.nextitem_index, self.torun.get()
item = items[self.item_index]
if torun:
nextitem = items[torun[0]]
else:
if self.nextitem_index is self.SHUTDOWN_MARK:
nextitem = None
else:
nextitem = items[self.nextitem_index]

worker_title("[pytest-xdist running] %s" % item.nodeid)

Expand Down
1 change: 1 addition & 0 deletions src/xdist/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from xdist.scheduler.loadfile import LoadFileScheduling # noqa
from xdist.scheduler.loadscope import LoadScopeScheduling # noqa
from xdist.scheduler.loadgroup import LoadGroupScheduling # noqa
from xdist.scheduler.worksteal import WorkStealingScheduling # noqa
Loading