Skip to content

Commit

Permalink
Improve tests that watch for subprocess logs (#6461)
Browse files Browse the repository at this point in the history
Tests like #6395 will fail (timeout) because a log statement doesn't get printed, but since you never get to see what _was_ printed, CI failuers are hard to debug.

Adds a `wait_for_log_line` helper that tees the output to stdout, so you can at least see what happened.
  • Loading branch information
gjoseph92 authored May 27, 2022
1 parent 1346671 commit 34a9409
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 56 deletions.
19 changes: 6 additions & 13 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
assert_can_connect_from_everywhere_4_6,
assert_can_connect_locally_4,
popen,
wait_for_log_line,
)


Expand Down Expand Up @@ -66,12 +67,8 @@ def test_dashboard(loop):
pytest.importorskip("bokeh")

with popen(["dask-scheduler"], flush_output=False) as proc:
for line in proc.stdout:
if b"dashboard at" in line:
dashboard_port = int(line.decode().split(":")[-1].strip())
break
else:
assert False # pragma: nocover
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())

with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop):
pass
Expand Down Expand Up @@ -223,13 +220,9 @@ def test_dashboard_port_zero(loop):
["dask-scheduler", "--dashboard-address", ":0"],
flush_output=False,
) as proc:
for line in proc.stdout:
if b"dashboard at" in line:
dashboard_port = int(line.decode().split(":")[-1].strip())
assert dashboard_port != 0
break
else:
assert False # pragma: nocover
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())
assert dashboard_port != 0


PRELOAD_TEXT = """
Expand Down
10 changes: 3 additions & 7 deletions distributed/cli/tests/test_dask_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from distributed import Client
from distributed.cli.dask_ssh import main
from distributed.compatibility import MACOS, WINDOWS
from distributed.utils_test import popen
from distributed.utils_test import popen, wait_for_log_line

pytest.importorskip("paramiko")
pytestmark = [
Expand All @@ -30,16 +30,12 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
# This interrupt is necessary for the cluster to place output into the stdout
# and stderr pipes
proc.send_signal(2)
assert any(
b"renamed to --nworkers" in proc.stdout.readline() for _ in range(15)
)
wait_for_log_line(b"renamed to --nworkers", proc.stdout, max_lines=15)


def test_ssh_cli_nworkers_with_nprocs_is_an_error():
with popen(
["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"],
flush_output=False,
) as proc:
assert any(
b"Both --nprocs and --nworkers" in proc.stdout.readline() for _ in range(15)
)
wait_for_log_line(b"Both --nprocs and --nworkers", proc.stdout, max_lines=15)
45 changes: 11 additions & 34 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from distributed.deploy.utils import nprocesses_nthreads
from distributed.metrics import time
from distributed.utils import open_port
from distributed.utils_test import gen_cluster, popen, requires_ipv6
from distributed.utils_test import gen_cluster, popen, requires_ipv6, wait_for_log_line


@pytest.mark.parametrize(
Expand Down Expand Up @@ -246,9 +246,7 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s):
],
flush_output=False,
) as worker:
assert any(
b"Not enough ports in range" in worker.stdout.readline() for _ in range(100)
)
wait_for_log_line(b"Not enough ports in range", worker.stdout, max_lines=100)


@pytest.mark.slow
Expand Down Expand Up @@ -282,26 +280,14 @@ async def test_reconnect_deprecated(c, s):
["dask-worker", s.address, "--reconnect"],
flush_output=False,
) as worker:
for _ in range(10):
line = worker.stdout.readline()
print(line)
if b"`--reconnect` option has been removed" in line:
break
else:
raise AssertionError("Message not printed, see stdout")
wait_for_log_line(b"`--reconnect` option has been removed", worker.stdout)
assert worker.wait() == 1

with popen(
["dask-worker", s.address, "--no-reconnect"],
flush_output=False,
) as worker:
for _ in range(10):
line = worker.stdout.readline()
print(line)
if b"flag is deprecated, and will be removed" in line:
break
else:
raise AssertionError("Message not printed, see stdout")
wait_for_log_line(b"flag is deprecated, and will be removed", worker.stdout)
await c.wait_for_workers(1)
await c.shutdown()

Expand Down Expand Up @@ -377,9 +363,7 @@ async def test_nworkers_requires_nanny(s):
["dask-worker", s.address, "--nworkers=2", "--no-nanny"],
flush_output=False,
) as worker:
assert any(
b"Failed to launch worker" in worker.stdout.readline() for _ in range(15)
)
wait_for_log_line(b"Failed to launch worker", worker.stdout, max_lines=15)


@pytest.mark.slow
Expand Down Expand Up @@ -419,9 +403,7 @@ async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
flush_output=False,
) as worker:
await c.wait_for_workers(2)
assert any(
b"renamed to --nworkers" in worker.stdout.readline() for _ in range(15)
)
wait_for_log_line(b"renamed to --nworkers", worker.stdout, max_lines=15)


@gen_cluster(nthreads=[])
Expand All @@ -430,10 +412,7 @@ async def test_worker_cli_nworkers_with_nprocs_is_an_error(s):
["dask-worker", s.address, "--nprocs=2", "--nworkers=2"],
flush_output=False,
) as worker:
assert any(
b"Both --nprocs and --nworkers" in worker.stdout.readline()
for _ in range(15)
)
wait_for_log_line(b"Both --nprocs and --nworkers", worker.stdout, max_lines=15)


@pytest.mark.slow
Expand Down Expand Up @@ -733,12 +712,10 @@ def test_error_during_startup(monkeypatch, nanny):
) as scheduler:
start = time()
# Wait for the scheduler to be up
while line := scheduler.stdout.readline():
if b"Scheduler at" in line:
break
# Ensure this is not killed by pytest-timeout
if time() - start > 5:
raise TimeoutError("Scheduler failed to start in time.")
wait_for_log_line(b"Scheduler at", scheduler.stdout)
# Ensure this is not killed by pytest-timeout
if time() - start > 5:
raise TimeoutError("Scheduler failed to start in time.")

with popen(
[
Expand Down
28 changes: 26 additions & 2 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from contextlib import contextmanager, nullcontext, suppress
from itertools import count
from time import sleep
from typing import Any, Generator, Literal
from typing import IO, Any, Generator, Iterator, Literal

import pytest
import yaml
Expand Down Expand Up @@ -1257,7 +1257,9 @@ def _terminate_process(proc):


@contextmanager
def popen(args: list[str], flush_output: bool = True, **kwargs):
def popen(
args: list[str], flush_output: bool = True, **kwargs
) -> Iterator[subprocess.Popen[bytes]]:
"""Start a shell command in a subprocess.
Yields a subprocess.Popen object.
Expand Down Expand Up @@ -2189,3 +2191,25 @@ def ucx_loop():
yield loop
ucp.reset()
loop.close()


def wait_for_log_line(
match: bytes, stream: IO[bytes] | None, max_lines: int | None = 10
) -> bytes:
"""
Read lines from an IO stream until the match is found, and return the matching line.
Prints each line to test stdout for easier debugging of failures.
"""
assert stream
i = 0
while True:
if max_lines is not None and i == max_lines:
raise AssertionError(
f"{match!r} not found in {max_lines} log lines. See test stdout for details."
)
line = stream.readline()
print(line)
if match in line:
return line
i += 1

0 comments on commit 34a9409

Please sign in to comment.