Skip to content

Commit

Permalink
Add a new method interrupt to send SIGINT to tasks
Browse files Browse the repository at this point in the history
fix: #100
Required from dvc to stop the checkpoint during running.

1. Add a new method interrupt to send SIGINT to tasks
2. Modify the unit test for this new method.
3. Add `group` option to `send_signal`, `kill`, `terminate`, `interrrupt`
  • Loading branch information
karajan1001 committed Dec 8, 2022
1 parent 01b5843 commit 33a3250
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 40 deletions.
25 changes: 18 additions & 7 deletions src/dvc_task/proc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def run_signature(
immutable=immutable,
)

def send_signal(self, name: str, sig: int):
def send_signal(self, name: str, sig: int, group: bool = True):
"""Send `signal` to the specified named process."""
try:
process_info = self[name]
Expand All @@ -134,7 +134,12 @@ def handle_closed_process():

if process_info.returncode is None:
try:
os.kill(process_info.pid, sig)
if sys.platform != "win32" and group:
os.killpg( # pylint: disable=no-member
process_info.pid, sig
)
else:
os.kill(process_info.pid, sig)
except ProcessLookupError:
handle_closed_process()
raise
Expand All @@ -147,16 +152,22 @@ def handle_closed_process():
else:
raise ProcessLookupError

def terminate(self, name: str):
def interrupt(self, name: str, group: bool = True):
"""Send interrupt signal to specified named process"""
self.send_signal(name, signal.SIGINT, group)

def terminate(self, name: str, group: bool = True):
"""Terminate the specified named process."""
self.send_signal(name, signal.SIGTERM)
self.send_signal(name, signal.SIGTERM, group)

def kill(self, name: str):
def kill(self, name: str, group: bool = True):
"""Kill the specified named process."""
if sys.platform == "win32":
self.send_signal(name, signal.SIGTERM)
self.send_signal(name, signal.SIGTERM, group)
else:
self.send_signal(name, signal.SIGKILL) # pylint: disable=no-member
self.send_signal(
name, signal.SIGKILL, group # pylint: disable=no-member
)

def remove(self, name: str, force: bool = False):
"""Remove the specified named process from this manager.
Expand Down
60 changes: 27 additions & 33 deletions tests/proc/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ def test_send_signal(
):
"""Terminate signal should be sent."""
mock_kill = mocker.patch("os.kill")
process_manager.send_signal(running_process, signal.SIGTERM)
process_manager.send_signal(running_process, signal.SIGTERM, False)
mock_kill.assert_called_once_with(PID_RUNNING, signal.SIGTERM)

if sys.platform != "win32":
mock_killpg = mocker.patch("os.killpg")
process_manager.send_signal(running_process, signal.SIGINT, True)
mock_killpg.assert_called_once_with(PID_RUNNING, signal.SIGINT)

mock_kill.reset_mock()
with pytest.raises(ProcessLookupError):
process_manager.send_signal(finished_process, signal.SIGTERM)
Expand Down Expand Up @@ -61,43 +66,32 @@ def side_effect(*args):
process_manager.send_signal("nonexists", signal.SIGTERM)


def test_kill(
mocker: MockerFixture,
process_manager: ProcessManager,
finished_process: str,
running_process: str,
):
"""Kill signal should be sent."""
mock_kill = mocker.patch("os.kill")
process_manager.kill(running_process)
if sys.platform == "win32":
mock_kill.assert_called_once_with(PID_RUNNING, signal.SIGTERM)
else:
mock_kill.assert_called_once_with(
PID_RUNNING, signal.SIGKILL # pylint: disable=no-member
)

mock_kill.reset_mock()
with pytest.raises(ProcessLookupError):
process_manager.kill(finished_process)
mock_kill.assert_not_called()
if sys.platform == "win32":
SIGKILL = signal.SIGTERM
else:
SIGKILL = signal.SIGKILL # pylint: disable=no-member


def test_terminate(
@pytest.mark.parametrize(
"method, sig",
[
("kill", SIGKILL),
("terminate", signal.SIGTERM),
("interrupt", signal.SIGINT),
],
)
def test_kill_commands(
mocker: MockerFixture,
process_manager: ProcessManager,
running_process: str,
finished_process: str,
method: str,
sig: signal.Signals,
):
"""Terminate signal should be sent."""
mock_kill = mocker.patch("os.kill")
process_manager.terminate(running_process)
mock_kill.assert_called_once_with(PID_RUNNING, signal.SIGTERM)

mock_kill.reset_mock()
with pytest.raises(ProcessLookupError):
process_manager.terminate(finished_process)
mock_kill.assert_not_called()
"""Test shortcut for different signals."""
name = "process"
mock_kill = mocker.patch.object(process_manager, "send_signal")
func = getattr(process_manager, method)
func(name)
mock_kill.assert_called_once_with(name, sig, True)


def test_remove(
Expand Down

0 comments on commit 33a3250

Please sign in to comment.