Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle SIGTERM received by agent gracefully #8691

Merged
merged 134 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
134 commits
Select commit Hold shift + click to select a range
5a85027
Handle SIGTERM signal gracefully
ddelange Dec 20, 2022
12e8307
PR Suggestions
ddelange Dec 27, 2022
338bbd0
Add graceful SIGTERM shutdown test
ddelange Dec 27, 2022
3352ac7
Add returncode assertions
ddelange Dec 27, 2022
4da8b06
Merge branch 'main' into patch-1
ddelange Dec 27, 2022
8908490
Merge branch 'main' into patch-1
ddelange Jan 4, 2023
faa9052
pre-commit run --all-files
ddelange Jan 6, 2023
39c27ed
Merge branch 'main' into patch-1
ddelange Jan 6, 2023
513678d
Merge branch 'main' into patch-1
ddelange Jan 6, 2023
7fa5fa8
Bump SHUTDOWN_TIMEOUT for tests
ddelange Jan 7, 2023
b6859d0
Merge branch 'main' into patch-1
ddelange Jan 7, 2023
cab59e5
Merge branch 'main' into patch-1
ddelange Jan 7, 2023
a7004a5
Merge branch 'main' into patch-1
ddelange Jan 9, 2023
200005c
Merge branch 'main' into patch-1
ddelange Jan 9, 2023
1f386ea
Add print statements
ddelange Jan 13, 2023
98dce31
Switch from subprocess to anyio
ddelange Jan 26, 2023
21c4042
Cosmetics
ddelange Jan 26, 2023
39abfb1
Amend docstring
ddelange Jan 26, 2023
8621870
Merge branch 'main' of https://github.com/prefecthq/prefect into sigt…
ddelange Jan 26, 2023
fc75845
Add output to AssertionError
ddelange Jan 26, 2023
462728d
Use anyio.fail_after
ddelange Jan 26, 2023
e526e76
Fix test_sigterm_sends_sigterm_directly
ddelange Jan 26, 2023
8f48e36
Remove returncode assertions
ddelange Jan 26, 2023
05dad74
Reduce sleep
ddelange Jan 26, 2023
d6a5b0b
Typo
ddelange Jan 26, 2023
fa01b41
Close stdout tempfile
ddelange Jan 26, 2023
076cff0
Bump timeout a bit
ddelange Jan 26, 2023
45f296f
Reduce timeout a bit
ddelange Jan 26, 2023
cd9af10
Amend when SIGKILL comes too late
ddelange Jan 26, 2023
4663a23
Amend
ddelange Jan 26, 2023
9f4213c
Deprecate re.search in test
ddelange Feb 1, 2023
8b06c0b
Merge branch 'main' of https://github.com/prefecthq/prefect into sigt…
ddelange Feb 1, 2023
e25ac43
Merge branch 'PrefectHQ:main' into patch-1
ddelange Feb 1, 2023
7014deb
Merge branch 'sigterm-testing' of https://github.com/ddelange/prefect…
ddelange Feb 1, 2023
9b6fe02
Merge branch 'main' into patch-1
ddelange Feb 3, 2023
8b1bd7d
Refactor tests with more comments
ddelange Feb 9, 2023
a006038
Merge branch 'main' of https://github.com/prefecthq/prefect into patch-1
ddelange Feb 9, 2023
1277ebb
Add extra sleep
ddelange Feb 9, 2023
d900655
Merge branch 'main' into patch-1
ddelange Feb 10, 2023
ab4c86f
Increase sleep before return
ddelange Feb 10, 2023
8151865
Merge branch 'main' into patch-1
ddelange Feb 13, 2023
2ece2c8
Merge branch 'main' of https://github.com/prefecthq/prefect into patch-1
ddelange Feb 22, 2023
8cba3d5
Merge branch 'patch-1' of https://github.com/ddelange/prefect into pa…
ddelange Feb 22, 2023
f884a5f
Make lint
ddelange Feb 22, 2023
447f870
Merge branch 'main' into patch-1
ddelange Feb 23, 2023
93bf35b
Merge leftover
ddelange Feb 23, 2023
0759da1
Add docstring
ddelange Feb 23, 2023
1634036
Handle SIGTERM received by agent gracefully
ddelange Feb 24, 2023
cca9d29
Typo
ddelange Feb 24, 2023
f0a8243
Merge branch 'main' into patch-1
ddelange Feb 24, 2023
34303aa
Merge branch 'patch-1' of https://github.com/ddelange/prefect into gr…
ddelange Feb 24, 2023
cdf2958
Typo
ddelange Feb 24, 2023
a91a102
Fix f-strings
ddelange Feb 24, 2023
a4fc116
Fix agent startup waiting
ddelange Feb 24, 2023
76b54d3
Amend docstring
ddelange Feb 24, 2023
8350ac9
Typo
ddelange Feb 24, 2023
1429b1f
Merge branch 'main' into patch-1
zanieb Feb 24, 2023
f115dbd
Merge branch 'main' into patch-1
ddelange Feb 24, 2023
b9a4d82
Merge branch 'main' into patch-1
ddelange Feb 24, 2023
d8a61ed
Merge branch 'main' of https://github.com/prefecthq/prefect into patch-1
ddelange Feb 27, 2023
4062d34
Allow more ports
ddelange Feb 27, 2023
094e661
Merge branch 'main' into patch-1
ddelange Feb 28, 2023
2f69318
Rename tests, increase sleep
ddelange Mar 1, 2023
80b403f
Merge branch 'patch-1' of https://github.com/ddelange/prefect into gr…
ddelange Mar 1, 2023
812619b
Increase sleep
ddelange Mar 1, 2023
f6df82d
Merge branch 'main' into patch-1
ddelange Mar 1, 2023
d1eb1d8
Increase verbosity
ddelange Mar 1, 2023
446eaff
Merge branch 'patch-1' of https://github.com/ddelange/prefect into gr…
ddelange Mar 1, 2023
82ed35f
Amend
ddelange Mar 1, 2023
b7a5f28
Merge branch 'patch-1' of https://github.com/ddelange/prefect into gr…
ddelange Mar 1, 2023
b3a74d1
Remove unused var
ddelange Mar 1, 2023
85e0178
Use ephemeral API
ddelange Mar 1, 2023
4be41f2
Merge branch 'main' into patch-1
ddelange Mar 1, 2023
8763521
Add back comment
ddelange Mar 1, 2023
d77a4c6
Merge branch 'patch-1' into graceful-agent
ddelange Mar 2, 2023
6bb831d
Merge branch 'main' of https://github.com/prefecthq/prefect into grac…
ddelange Mar 2, 2023
3341a95
Merge branch 'main' of https://github.com/prefecthq/prefect into grac…
ddelange Mar 3, 2023
62a7538
Move testing signals into with statement
ddelange Mar 3, 2023
40429cc
Merge branch 'main' into graceful-agent
ddelange Mar 4, 2023
b012b5f
Revert "Move testing signals into with statement"
ddelange Mar 4, 2023
1ca698e
Increase agent startup stability
ddelange Mar 4, 2023
cafdca8
Amend
ddelange Mar 4, 2023
f86d54c
Merge branch 'main' into graceful-agent
ddelange Mar 6, 2023
4bbf8a7
Merge branch 'main' into graceful-agent
ddelange Mar 8, 2023
484ae5b
Add startup timeout check
ddelange Mar 9, 2023
33df054
Force agent cli tests to run sequentially
ddelange Mar 9, 2023
45ae19f
Amend
ddelange Mar 9, 2023
913e5cc
Remove semaphore
ddelange Mar 9, 2023
56b8474
Run agent tests in the same xdist worker
ddelange Mar 9, 2023
7a6dbab
Merge branch 'main' into graceful-agent
ddelange Mar 10, 2023
244806d
Merge branch 'main' into graceful-agent
ddelange Mar 10, 2023
737f00a
Merge branch 'main' into graceful-agent
ddelange Mar 13, 2023
eb2f518
Merge branch 'main' into graceful-agent
ddelange Mar 14, 2023
372f5ad
Merge branch 'main' into graceful-agent
ddelange Mar 15, 2023
9008862
Attempt to reduce flakiness
ddelange Mar 15, 2023
2d98912
PR Suggestions
ddelange Mar 16, 2023
6dfa993
Merge branch 'main' into graceful-agent
ddelange Mar 16, 2023
6c35602
Merge branch 'main' into graceful-agent
ddelange Mar 16, 2023
c92a0c4
Merge branch 'main' into graceful-agent
ddelange Mar 16, 2023
91b10fc
Merge branch 'main' into graceful-agent
ddelange Mar 17, 2023
7f026e1
Merge branch 'main' into graceful-agent
ddelange Mar 18, 2023
89dc276
Merge branch 'main' into graceful-agent
ddelange Mar 20, 2023
a2d0412
Merge branch 'main' into graceful-agent
ddelange Mar 22, 2023
459651d
Add Aborted. in out
ddelange Mar 22, 2023
940b736
Merge branch 'main' into graceful-agent
ddelange Mar 23, 2023
b32fab1
Longer startup
ddelange Mar 23, 2023
4f79859
Merge branch 'main' into graceful-agent
ddelange Mar 23, 2023
f8c3a37
Merge branch 'main' into graceful-agent
ddelange Mar 23, 2023
ba1bcda
Merge branch 'main' into graceful-agent
ddelange Mar 24, 2023
a41605f
Bump SHUTDOWN_TIMEOUT
ddelange Mar 24, 2023
e0e9855
Merge branch 'main' into graceful-agent
ddelange Mar 24, 2023
0bcca8d
Merge branch 'main' into graceful-agent
ddelange Mar 25, 2023
54a6149
Merge branch 'main' into graceful-agent
ddelange Mar 27, 2023
8771503
Merge branch 'main' into graceful-agent
ddelange Mar 29, 2023
cf807bd
Merge branch 'main' into graceful-agent
ddelange Mar 29, 2023
66b5562
Merge branch 'main' into graceful-agent
ddelange Mar 30, 2023
b6528ca
Merge branch 'main' into graceful-agent
ddelange Mar 31, 2023
83ed2fc
Merge branch 'main' into graceful-agent
ddelange Apr 3, 2023
a99e11c
Merge branch 'main' into graceful-agent
ddelange Apr 3, 2023
8e47fc0
Merge branch 'main' into graceful-agent
ddelange Apr 4, 2023
1095b7b
Merge branch 'main' into graceful-agent
ddelange Apr 11, 2023
94c7cd4
Merge branch 'main' into graceful-agent
ddelange Apr 12, 2023
3f2a8cd
Merge branch 'main' into graceful-agent
ddelange Apr 14, 2023
7f8c7e7
Merge branch 'main' into graceful-agent
ddelange Apr 17, 2023
dbc3da7
Merge branch 'main' into graceful-agent
ddelange Apr 19, 2023
547126e
Merge branch 'main' into graceful-agent
ddelange Apr 20, 2023
e2d97fc
Merge branch 'main' into graceful-agent
ddelange Apr 22, 2023
484a46d
Merge branch 'main' into graceful-agent
ddelange Apr 24, 2023
16aeb6f
Merge branch 'main' into graceful-agent
ddelange Apr 26, 2023
eb4545c
Merge branch 'main' into graceful-agent
ddelange May 4, 2023
4174619
Merge branch 'main' into graceful-agent
ddelange May 8, 2023
ffe496a
Attempt another process.wait upon timeout
ddelange May 8, 2023
989f893
Merge branch 'main' into graceful-agent
ddelange May 9, 2023
f8f5968
Use hosted API server for tests
desertaxle May 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/prefect/cli/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Command line interface for working with agent services
"""
import os
from functools import partial
from typing import List
from uuid import UUID
Expand All @@ -20,6 +21,7 @@
PREFECT_AGENT_QUERY_INTERVAL,
PREFECT_API_URL,
)
from prefect.utilities.processutils import setup_signal_handlers_agent
from prefect.utilities.services import critical_service_loop

agent_app = PrefectTyper(
Expand Down Expand Up @@ -161,6 +163,11 @@ async def start(
f"Starting v{prefect.__version__} agent with ephemeral API..."
)

agent_process_id = os.getpid()
setup_signal_handlers_agent(
agent_process_id, "the Prefect agent", app.console.print
)

async with PrefectAgent(
work_queues=work_queues,
work_queue_prefix=work_queue_prefix,
Expand Down
31 changes: 29 additions & 2 deletions src/prefect/utilities/processutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,20 @@ def forward_signal_handler(
"""Forward subsequent signum events (e.g. interrupts) to respective signums."""
current_signal, future_signals = signums[0], signums[1:]

# avoid RecursionError when setting up a direct signal forward to the same signal for the main pid
avoid_infinite_recursion = signum == current_signal and pid == os.getpid()
if avoid_infinite_recursion:
# store the vanilla handler so it can be temporarily restored below
original_handler = signal.getsignal(current_signal)

def handler(*args):
print_fn(
f"Received {getattr(signum, 'name', signum)}. "
f"Sending {getattr(current_signal, 'name', current_signal)} to"
f" {process_name} (PID {pid})..."
)
if avoid_infinite_recursion:
signal.signal(current_signal, original_handler)
os.kill(pid, current_signal)
if future_signals:
forward_signal_handler(
Expand All @@ -345,12 +353,31 @@ def setup_signal_handlers_server(pid: int, process_name: str, print_fn: Callable
setup_handler = partial(
forward_signal_handler, pid, process_name=process_name, print_fn=print_fn
)
# on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
# https://bugs.python.org/issue26350
# when server receives a signal, it needs to be propagated to the uvicorn subprocess
if sys.platform == "win32":
# on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
# https://bugs.python.org/issue26350
setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT)
else:
# first interrupt: SIGTERM, second interrupt: SIGKILL
setup_handler(signal.SIGINT, signal.SIGTERM, signal.SIGKILL)
# forward first SIGTERM directly, send SIGKILL on subsequent SIGTERM
setup_handler(signal.SIGTERM, signal.SIGTERM, signal.SIGKILL)


def setup_signal_handlers_agent(pid: int, process_name: str, print_fn: Callable):
"""Handle interrupts of the agent gracefully."""
setup_handler = partial(
forward_signal_handler, pid, process_name=process_name, print_fn=print_fn
)
# when agent receives SIGINT, it stops dequeueing new FlowRuns, and runs until the subprocesses finish
# the signal is not forwarded to subprocesses, so they can continue to run and hopefully still complete
if sys.platform == "win32":
# on Windows, use CTRL_BREAK_EVENT as SIGTERM is useless:
# https://bugs.python.org/issue26350
setup_handler(signal.SIGINT, signal.CTRL_BREAK_EVENT)
else:
# forward first SIGINT directly, send SIGKILL on subsequent interrupt
setup_handler(signal.SIGINT, signal.SIGINT, signal.SIGKILL)
# first SIGTERM: send SIGINT, send SIGKILL on subsequent SIGTERM
setup_handler(signal.SIGTERM, signal.SIGINT, signal.SIGKILL)
170 changes: 170 additions & 0 deletions tests/cli/test_start_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import os
import signal
import sys
import tempfile

import anyio
import pytest

from prefect.settings import get_current_settings
from prefect.utilities.processutils import open_process

POLL_INTERVAL = 0.5
STARTUP_TIMEOUT = 20
SHUTDOWN_TIMEOUT = 5


async def safe_shutdown(process):
try:
with anyio.fail_after(SHUTDOWN_TIMEOUT):
await process.wait()
except TimeoutError:
# try twice in case process.wait() hangs
with anyio.fail_after(SHUTDOWN_TIMEOUT):
await process.wait()


@pytest.fixture(scope="function")
async def agent_process(use_hosted_api_server):
"""
Runs an agent listening to all queues.
Yields:
The anyio.Process.
"""
out = tempfile.TemporaryFile() # capture output for test assertions

# Will connect to the same database as normal test clients
async with open_process(
command=[
"prefect",
"agent",
"start",
"--match=nonexist",
],
stdout=out,
stderr=out,
env={**os.environ, **get_current_settings().to_environment_variables()},
) as process:
process.out = out

for _ in range(int(STARTUP_TIMEOUT / POLL_INTERVAL)):
await anyio.sleep(POLL_INTERVAL)
if out.tell() > 400:
await anyio.sleep(2)
break

assert out.tell() > 400, "The agent did not start up in time"
assert process.returncode is None, "The agent failed to start up"

# Yield to the consuming tests
yield process

# Then shutdown the process
try:
process.terminate()
except ProcessLookupError:
pass
out.close()


class TestAgentSignalForwarding:
@pytest.mark.skipif(
sys.platform == "win32",
reason="SIGTERM is only used in non-Windows environments",
)
async def test_sigint_sends_sigterm(self, agent_process):
agent_process.send_signal(signal.SIGINT)
await safe_shutdown(agent_process)
agent_process.out.seek(0)
out = agent_process.out.read().decode()

assert "Sending SIGINT" in out, (
"When sending a SIGINT, the main process should receive a SIGINT."
f" Output:\n{out}"
)
assert "Agent stopped!" in out, (
"When sending a SIGINT, the main process should shutdown gracefully."
f" Output:\n{out}"
)

@pytest.mark.skipif(
sys.platform == "win32",
reason="SIGTERM is only used in non-Windows environments",
)
async def test_sigterm_sends_sigterm_directly(self, agent_process):
agent_process.send_signal(signal.SIGTERM)
await safe_shutdown(agent_process)
agent_process.out.seek(0)
out = agent_process.out.read().decode()

assert "Sending SIGINT" in out, (
"When sending a SIGTERM, the main process should receive a SIGINT."
f" Output:\n{out}"
)
assert "Agent stopped!" in out, (
"When sending a SIGTERM, the main process should shutdown gracefully."
f" Output:\n{out}"
)

@pytest.mark.skipif(
sys.platform == "win32",
reason="SIGTERM is only used in non-Windows environments",
)
async def test_sigint_sends_sigterm_then_sigkill(self, agent_process):
agent_process.send_signal(signal.SIGINT)
await anyio.sleep(0.01) # some time needed for the recursive signal handler
agent_process.send_signal(signal.SIGINT)
await safe_shutdown(agent_process)
agent_process.out.seek(0)
out = agent_process.out.read().decode()

assert (
# either the main PID is still waiting for shutdown, so forwards the SIGKILL
"Sending SIGKILL" in out
# or SIGKILL came too late, and the main PID is already closing
or "KeyboardInterrupt" in out
or "Agent stopped!" in out
or "Aborted." in out
), (
"When sending two SIGINT shortly after each other, the main process should"
f" first receive a SIGINT and then a SIGKILL. Output:\n{out}"
)

@pytest.mark.skipif(
sys.platform == "win32",
reason="SIGTERM is only used in non-Windows environments",
)
async def test_sigterm_sends_sigterm_then_sigkill(self, agent_process):
agent_process.send_signal(signal.SIGTERM)
await anyio.sleep(0.01) # some time needed for the recursive signal handler
agent_process.send_signal(signal.SIGTERM)
await safe_shutdown(agent_process)
agent_process.out.seek(0)
out = agent_process.out.read().decode()

assert (
# either the main PID is still waiting for shutdown, so forwards the SIGKILL
"Sending SIGKILL" in out
# or SIGKILL came too late, and the main PID is already closing
or "KeyboardInterrupt" in out
or "Agent stopped!" in out
or "Aborted." in out
), (
"When sending two SIGTERM shortly after each other, the main process should"
f" first receive a SIGINT and then a SIGKILL. Output:\n{out}"
)

@pytest.mark.skipif(
sys.platform != "win32",
reason="CTRL_BREAK_EVENT is only defined in Windows",
)
async def test_sends_ctrl_break_win32(self, agent_process):
agent_process.send_signal(signal.SIGINT)
await safe_shutdown(agent_process)
agent_process.out.seek(0)
out = agent_process.out.read().decode()

assert "Sending CTRL_BREAK_EVENT" in out, (
"When sending a SIGINT, the main process should send a CTRL_BREAK_EVENT to"
f" the uvicorn subprocess. Output:\n{out}"
)
3 changes: 2 additions & 1 deletion tests/cli/test_start_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from prefect.testing.fixtures import is_port_in_use
from prefect.utilities.processutils import open_process

POLL_INTERVAL = 0.5
STARTUP_TIMEOUT = 20
SHUTDOWN_TIMEOUT = 20

Expand Down Expand Up @@ -70,7 +71,7 @@ async def server_process():
if response.status_code == 200:
await anyio.sleep(0.5) # extra sleep for less flakiness
break
await anyio.sleep(0.1)
await anyio.sleep(POLL_INTERVAL)
if response:
response.raise_for_status()
if not response:
Expand Down