diff --git a/distributed/_signals.py b/distributed/_signals.py new file mode 100644 index 0000000000..f25b7245c4 --- /dev/null +++ b/distributed/_signals.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import asyncio +import logging +import signal +from typing import Any + +logger = logging.getLogger(__name__) + + +async def wait_for_signals(signals: list[signal.Signals]) -> None: + """Wait for the passed signals by setting global signal handlers""" + loop = asyncio.get_running_loop() + event = asyncio.Event() + + old_handlers: dict[int, Any] = {} + + def handle_signal(signum, frame): + # Restore old signal handler to allow for quicker exit + # if the user sends the signal again. + signal.signal(signum, old_handlers[signum]) + logger.info("Received signal %s (%d)", signal.Signals(signum).name, signum) + loop.call_soon_threadsafe(event.set) + + for sig in signals: + old_handlers[sig] = signal.signal(sig, handle_signal) + + await event.wait() diff --git a/distributed/cli/dask_scheduler.py b/distributed/cli/dask_scheduler.py index 700765ba6f..b976a5c378 100755 --- a/distributed/cli/dask_scheduler.py +++ b/distributed/cli/dask_scheduler.py @@ -12,7 +12,7 @@ from tornado.ioloop import IOLoop from distributed import Scheduler -from distributed.cli.utils import wait_for_signals +from distributed._signals import wait_for_signals from distributed.preloading import validate_preload_argv from distributed.proctitle import ( enable_proctitle_on_children, diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 0b22db6ae3..207b34fe95 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -20,7 +20,7 @@ from dask.system import CPU_COUNT from distributed import Nanny -from distributed.cli.utils import wait_for_signals +from distributed._signals import wait_for_signals from distributed.comm import get_address_host_port from distributed.deploy.utils import nprocesses_nthreads from distributed.preloading import validate_preload_argv diff --git a/distributed/cli/utils.py b/distributed/cli/utils.py index 7b932e7736..fe4f0aa3f8 100644 --- a/distributed/cli/utils.py +++ b/distributed/cli/utils.py @@ -1,33 +1,10 @@ -from __future__ import annotations - -import asyncio -import logging -import signal -from typing import Any +import warnings from tornado.ioloop import IOLoop -logger = logging.getLogger(__name__) - - -async def wait_for_signals(signals: list[signal.Signals]) -> None: - """Wait for the passed signals by setting global signal handlers""" - loop = asyncio.get_running_loop() - event = asyncio.Event() - - old_handlers: dict[int, Any] = {} - - def handle_signal(signum, frame): - # Restore old signal handler to allow for quicker exit - # if the user sends the signal again. - signal.signal(signum, old_handlers[signum]) - logger.info("Received signal %s (%d)", signal.Signals(signum).name, signum) - loop.call_soon_threadsafe(event.set) - - for sig in signals: - old_handlers[sig] = signal.signal(sig, handle_signal) - - await event.wait() +warnings.warn( + "the distributed.cli.utils module is deprecated", DeprecationWarning, stacklevel=2 +) def install_signal_handlers(loop=None, cleanup=None):