Skip to content

Commit

Permalink
Move wait_for_signals to private module and deprecate `distributed.…
Browse files Browse the repository at this point in the history
…cli.utils` (#6367)
  • Loading branch information
hendrikmakait authored May 20, 2022
1 parent 8865ccc commit fb3589c
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 29 deletions.
28 changes: 28 additions & 0 deletions distributed/_signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

import asyncio
import logging
import signal
from typing import Any

logger = logging.getLogger(__name__)


async def wait_for_signals(signals: list[signal.Signals]) -> None:
"""Wait for the passed signals by setting global signal handlers"""
loop = asyncio.get_running_loop()
event = asyncio.Event()

old_handlers: dict[int, Any] = {}

def handle_signal(signum, frame):
# Restore old signal handler to allow for quicker exit
# if the user sends the signal again.
signal.signal(signum, old_handlers[signum])
logger.info("Received signal %s (%d)", signal.Signals(signum).name, signum)
loop.call_soon_threadsafe(event.set)

for sig in signals:
old_handlers[sig] = signal.signal(sig, handle_signal)

await event.wait()
2 changes: 1 addition & 1 deletion distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from tornado.ioloop import IOLoop

from distributed import Scheduler
from distributed.cli.utils import wait_for_signals
from distributed._signals import wait_for_signals
from distributed.preloading import validate_preload_argv
from distributed.proctitle import (
enable_proctitle_on_children,
Expand Down
2 changes: 1 addition & 1 deletion distributed/cli/dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dask.system import CPU_COUNT

from distributed import Nanny
from distributed.cli.utils import wait_for_signals
from distributed._signals import wait_for_signals
from distributed.comm import get_address_host_port
from distributed.deploy.utils import nprocesses_nthreads
from distributed.preloading import validate_preload_argv
Expand Down
31 changes: 4 additions & 27 deletions distributed/cli/utils.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,10 @@
from __future__ import annotations

import asyncio
import logging
import signal
from typing import Any
import warnings

from tornado.ioloop import IOLoop

logger = logging.getLogger(__name__)


async def wait_for_signals(signals: list[signal.Signals]) -> None:
"""Wait for the passed signals by setting global signal handlers"""
loop = asyncio.get_running_loop()
event = asyncio.Event()

old_handlers: dict[int, Any] = {}

def handle_signal(signum, frame):
# Restore old signal handler to allow for quicker exit
# if the user sends the signal again.
signal.signal(signum, old_handlers[signum])
logger.info("Received signal %s (%d)", signal.Signals(signum).name, signum)
loop.call_soon_threadsafe(event.set)

for sig in signals:
old_handlers[sig] = signal.signal(sig, handle_signal)

await event.wait()
warnings.warn(
"the distributed.cli.utils module is deprecated", DeprecationWarning, stacklevel=2
)


def install_signal_handlers(loop=None, cleanup=None):
Expand Down

0 comments on commit fb3589c

Please sign in to comment.