Skip to content

Commit

Permalink
Run dask commands with a matching interpreter (#8975)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjwatson authored Jan 6, 2025
1 parent 1b92625 commit 40b7646
Show file tree
Hide file tree
Showing 10 changed files with 334 additions and 40 deletions.
115 changes: 104 additions & 11 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _get_dashboard_port(client: Client) -> int:


def test_defaults(loop, requires_default_ports):
with popen(["dask", "scheduler"]):
with popen([sys.executable, "-m", "dask", "scheduler"]):

async def f():
# Default behaviour is to listen on all addresses
Expand All @@ -53,7 +53,17 @@ async def f():

def test_hostport(loop):
port = open_port()
with popen(["dask", "scheduler", "--no-dashboard", "--host", f"127.0.0.1:{port}"]):
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--no-dashboard",
"--host",
f"127.0.0.1:{port}",
]
):

async def f():
# The scheduler's main port can't be contacted from the outside
Expand All @@ -65,7 +75,7 @@ async def f():


def test_no_dashboard(loop, requires_default_ports):
with popen(["dask", "scheduler", "--no-dashboard"]):
with popen([sys.executable, "-m", "dask", "scheduler", "--no-dashboard"]):
with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop):
response = requests.get("http://127.0.0.1:8787/status/")
assert response.status_code == 404
Expand All @@ -76,7 +86,7 @@ def test_dashboard(loop):
port = open_port()

with popen(
["dask", "scheduler", "--host", f"127.0.0.1:{port}"],
[sys.executable, "-m", "dask", "scheduler", "--host", f"127.0.0.1:{port}"],
):
with Client(f"127.0.0.1:{port}", loop=loop) as c:
dashboard_port = _get_dashboard_port(c)
Expand Down Expand Up @@ -109,6 +119,8 @@ def test_dashboard_non_standard_ports(loop):
port2 = open_port()
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
f"--port={port1}",
Expand Down Expand Up @@ -137,6 +149,8 @@ def test_multiple_protocols(loop):
port2 = open_port()
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--protocol=tcp,ws",
Expand All @@ -158,6 +172,8 @@ def test_dashboard_allowlist(loop):
port = open_port()
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
f"--port={port}",
Expand Down Expand Up @@ -198,6 +214,8 @@ def test_interface(loop):
port = open_port()
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
f"--port={port}",
Expand All @@ -208,6 +226,8 @@ def test_interface(loop):
) as s:
with popen(
[
sys.executable,
"-m",
"dask",
"worker",
f"127.0.0.1:{port}",
Expand Down Expand Up @@ -252,12 +272,24 @@ def check_pidfile(proc, pidfile):
assert proc.pid == pid

with tmpfile() as s:
with popen(["dask", "scheduler", "--pid-file", s, "--no-dashboard"]) as sched:
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--pid-file",
s,
"--no-dashboard",
]
) as sched:
check_pidfile(sched, s)

with tmpfile() as w:
with popen(
[
sys.executable,
"-m",
"dask",
"worker",
f"127.0.0.1:{port}",
Expand All @@ -273,6 +305,8 @@ def test_scheduler_port_zero(loop):
with tmpfile() as fn:
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--no-dashboard",
Expand All @@ -292,6 +326,8 @@ def test_dashboard_port_zero(loop):
port = open_port()
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--host",
Expand Down Expand Up @@ -329,6 +365,8 @@ def check_scheduler():
with tmpfile() as fn:
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--scheduler-file",
Expand Down Expand Up @@ -359,6 +397,8 @@ def check_scheduler():
with tmpfile() as fn:
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--scheduler-file",
Expand All @@ -382,6 +422,8 @@ def test_preload_remote_module(loop, tmp_path):
):
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--scheduler-file",
Expand All @@ -408,7 +450,9 @@ def test_preload_config(loop):
with tmpfile() as fn:
env = os.environ.copy()
env["DASK_DISTRIBUTED__SCHEDULER__PRELOAD"] = PRELOAD_TEXT
with popen(["dask", "scheduler", "--scheduler-file", fn], env=env):
with popen(
[sys.executable, "-m", "dask", "scheduler", "--scheduler-file", fn], env=env
):
with Client(scheduler_file=fn, loop=loop) as c:
assert (
c.run_on_scheduler(lambda dask_scheduler: dask_scheduler.foo)
Expand Down Expand Up @@ -446,6 +490,8 @@ def check_passthrough():
print(fn)
with popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--scheduler-file",
Expand Down Expand Up @@ -477,7 +523,16 @@ def check_passthrough():
with tmpfile() as fn2:
print(fn2)
with popen(
["dask", "scheduler", "--scheduler-file", fn2, "--preload", path],
[
sys.executable,
"-m",
"dask",
"scheduler",
"--scheduler-file",
fn2,
"--preload",
path,
],
stdout=sys.stdout,
stderr=sys.stderr,
):
Expand Down Expand Up @@ -530,10 +585,20 @@ def dask_setup(worker):
"""
port = open_port()
with popen(
["dask", "scheduler", "--no-dashboard", "--host", f"127.0.0.1:{port}"]
[
sys.executable,
"-m",
"dask",
"scheduler",
"--no-dashboard",
"--host",
f"127.0.0.1:{port}",
]
) as s:
with popen(
[
sys.executable,
"-m",
"dask",
"worker",
f"localhost:{port}",
Expand All @@ -555,9 +620,37 @@ def dask_setup(worker):
def test_multiple_workers(loop):
scheduler_address = f"127.0.0.1:{open_port()}"
with (
popen(["dask", "scheduler", "--no-dashboard", "--host", scheduler_address]),
popen(["dask", "worker", scheduler_address, "--no-dashboard"]),
popen(["dask", "worker", scheduler_address, "--no-dashboard"]),
popen(
[
sys.executable,
"-m",
"dask",
"scheduler",
"--no-dashboard",
"--host",
scheduler_address,
]
),
popen(
[
sys.executable,
"-m",
"dask",
"worker",
scheduler_address,
"--no-dashboard",
]
),
popen(
[
sys.executable,
"-m",
"dask",
"worker",
scheduler_address,
"--no-dashboard",
]
),
Client(scheduler_address, loop=loop) as c,
):
start = time()
Expand Down
15 changes: 14 additions & 1 deletion distributed/cli/tests/test_dask_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import signal
import subprocess
import sys

import pytest
import yaml
Expand All @@ -19,6 +20,8 @@ async def test_text():
port = open_port()
with popen(
[
sys.executable,
"-m",
"dask",
"spec",
"--spec",
Expand All @@ -27,6 +30,8 @@ async def test_text():
):
with popen(
[
sys.executable,
"-m",
"dask",
"spec",
"tcp://localhost:%d" % port,
Expand Down Expand Up @@ -55,6 +60,8 @@ async def test_file(c, s, tmp_path):
)
with popen(
[
sys.executable,
"-m",
"dask",
"spec",
s.address,
Expand All @@ -72,6 +79,8 @@ async def test_file(c, s, tmp_path):
def test_errors():
with popen(
[
sys.executable,
"-m",
"dask",
"spec",
"--spec",
Expand All @@ -85,7 +94,7 @@ def test_errors():
assert "exactly one" in line
assert "--spec" in line and "--spec-file" in line

with popen(["dask", "spec"], capture_output=True) as proc:
with popen([sys.executable, "-m", "dask", "spec"], capture_output=True) as proc:
line = proc.stdout.readline().decode()
assert "exactly one" in line
assert "--spec" in line and "--spec-file" in line
Expand All @@ -97,6 +106,8 @@ def test_errors():
@gen_cluster(client=True, nthreads=[])
async def test_signal_handling_worker(c, s, worker_type, sig):
worker = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"dask",
"spec",
"--spec",
Expand Down Expand Up @@ -136,6 +147,8 @@ async def test_signal_handling_worker(c, s, worker_type, sig):
async def test_signal_handling_scheduler(sig):
port = open_port()
scheduler = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"dask",
"spec",
"--spec",
Expand Down
Loading

0 comments on commit 40b7646

Please sign in to comment.