diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index e6ea59867a..308ee86a01 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -26,6 +26,7 @@ assert_can_connect_from_everywhere_4_6, assert_can_connect_locally_4, popen, + wait_for_log_line, ) @@ -66,12 +67,8 @@ def test_dashboard(loop): pytest.importorskip("bokeh") with popen(["dask-scheduler"], flush_output=False) as proc: - for line in proc.stdout: - if b"dashboard at" in line: - dashboard_port = int(line.decode().split(":")[-1].strip()) - break - else: - assert False # pragma: nocover + line = wait_for_log_line(b"dashboard at", proc.stdout) + dashboard_port = int(line.decode().split(":")[-1].strip()) with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop): pass @@ -223,13 +220,9 @@ def test_dashboard_port_zero(loop): ["dask-scheduler", "--dashboard-address", ":0"], flush_output=False, ) as proc: - for line in proc.stdout: - if b"dashboard at" in line: - dashboard_port = int(line.decode().split(":")[-1].strip()) - assert dashboard_port != 0 - break - else: - assert False # pragma: nocover + line = wait_for_log_line(b"dashboard at", proc.stdout) + dashboard_port = int(line.decode().split(":")[-1].strip()) + assert dashboard_port != 0 PRELOAD_TEXT = """ diff --git a/distributed/cli/tests/test_dask_ssh.py b/distributed/cli/tests/test_dask_ssh.py index e087382fff..d7b011737f 100644 --- a/distributed/cli/tests/test_dask_ssh.py +++ b/distributed/cli/tests/test_dask_ssh.py @@ -4,7 +4,7 @@ from distributed import Client from distributed.cli.dask_ssh import main from distributed.compatibility import MACOS, WINDOWS -from distributed.utils_test import popen +from distributed.utils_test import popen, wait_for_log_line pytest.importorskip("paramiko") pytestmark = [ @@ -30,9 +30,7 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop): # This interrupt is necessary for the cluster to place output into the stdout # and stderr pipes proc.send_signal(2) - assert any( - b"renamed to --nworkers" in proc.stdout.readline() for _ in range(15) - ) + wait_for_log_line(b"renamed to --nworkers", proc.stdout, max_lines=15) def test_ssh_cli_nworkers_with_nprocs_is_an_error(): @@ -40,6 +38,4 @@ def test_ssh_cli_nworkers_with_nprocs_is_an_error(): ["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"], flush_output=False, ) as proc: - assert any( - b"Both --nprocs and --nworkers" in proc.stdout.readline() for _ in range(15) - ) + wait_for_log_line(b"Both --nprocs and --nworkers", proc.stdout, max_lines=15) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 5b654cc8a8..ca8ed37aac 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -19,7 +19,7 @@ from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time from distributed.utils import open_port -from distributed.utils_test import gen_cluster, popen, requires_ipv6 +from distributed.utils_test import gen_cluster, popen, requires_ipv6, wait_for_log_line @pytest.mark.parametrize( @@ -246,9 +246,7 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s): ], flush_output=False, ) as worker: - assert any( - b"Not enough ports in range" in worker.stdout.readline() for _ in range(100) - ) + wait_for_log_line(b"Not enough ports in range", worker.stdout, max_lines=100) @pytest.mark.slow @@ -282,26 +280,14 @@ async def test_reconnect_deprecated(c, s): ["dask-worker", s.address, "--reconnect"], flush_output=False, ) as worker: - for _ in range(10): - line = worker.stdout.readline() - print(line) - if b"`--reconnect` option has been removed" in line: - break - else: - raise AssertionError("Message not printed, see stdout") + wait_for_log_line(b"`--reconnect` option has been removed", worker.stdout) assert worker.wait() == 1 with popen( ["dask-worker", s.address, "--no-reconnect"], flush_output=False, ) as worker: - for _ in range(10): - line = worker.stdout.readline() - print(line) - if b"flag is deprecated, and will be removed" in line: - break - else: - raise AssertionError("Message not printed, see stdout") + wait_for_log_line(b"flag is deprecated, and will be removed", worker.stdout) await c.wait_for_workers(1) await c.shutdown() @@ -377,9 +363,7 @@ async def test_nworkers_requires_nanny(s): ["dask-worker", s.address, "--nworkers=2", "--no-nanny"], flush_output=False, ) as worker: - assert any( - b"Failed to launch worker" in worker.stdout.readline() for _ in range(15) - ) + wait_for_log_line(b"Failed to launch worker", worker.stdout, max_lines=15) @pytest.mark.slow @@ -419,9 +403,7 @@ async def test_worker_cli_nprocs_renamed_to_nworkers(c, s): flush_output=False, ) as worker: await c.wait_for_workers(2) - assert any( - b"renamed to --nworkers" in worker.stdout.readline() for _ in range(15) - ) + wait_for_log_line(b"renamed to --nworkers", worker.stdout, max_lines=15) @gen_cluster(nthreads=[]) @@ -430,10 +412,7 @@ async def test_worker_cli_nworkers_with_nprocs_is_an_error(s): ["dask-worker", s.address, "--nprocs=2", "--nworkers=2"], flush_output=False, ) as worker: - assert any( - b"Both --nprocs and --nworkers" in worker.stdout.readline() - for _ in range(15) - ) + wait_for_log_line(b"Both --nprocs and --nworkers", worker.stdout, max_lines=15) @pytest.mark.slow @@ -733,12 +712,10 @@ def test_error_during_startup(monkeypatch, nanny): ) as scheduler: start = time() # Wait for the scheduler to be up - while line := scheduler.stdout.readline(): - if b"Scheduler at" in line: - break - # Ensure this is not killed by pytest-timeout - if time() - start > 5: - raise TimeoutError("Scheduler failed to start in time.") + wait_for_log_line(b"Scheduler at", scheduler.stdout) + # Ensure this is not killed by pytest-timeout + if time() - start > 5: + raise TimeoutError("Scheduler failed to start in time.") with popen( [ diff --git a/distributed/utils_test.py b/distributed/utils_test.py index e9d0d91e68..46e795fd3c 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -25,7 +25,7 @@ from contextlib import contextmanager, nullcontext, suppress from itertools import count from time import sleep -from typing import Any, Generator, Literal +from typing import IO, Any, Generator, Iterator, Literal import pytest import yaml @@ -1257,7 +1257,9 @@ def _terminate_process(proc): @contextmanager -def popen(args: list[str], flush_output: bool = True, **kwargs): +def popen( + args: list[str], flush_output: bool = True, **kwargs +) -> Iterator[subprocess.Popen[bytes]]: """Start a shell command in a subprocess. Yields a subprocess.Popen object. @@ -2189,3 +2191,25 @@ def ucx_loop(): yield loop ucp.reset() loop.close() + + +def wait_for_log_line( + match: bytes, stream: IO[bytes] | None, max_lines: int | None = 10 +) -> bytes: + """ + Read lines from an IO stream until the match is found, and return the matching line. + + Prints each line to test stdout for easier debugging of failures. + """ + assert stream + i = 0 + while True: + if max_lines is not None and i == max_lines: + raise AssertionError( + f"{match!r} not found in {max_lines} log lines. See test stdout for details." + ) + line = stream.readline() + print(line) + if match in line: + return line + i += 1