-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle SIGTERM received by agent gracefully #8691
Merged
Merged
Changes from 89 commits
Commits
Show all changes
134 commits
Select commit
Hold shift + click to select a range
5a85027
Handle SIGTERM signal gracefully
ddelange 12e8307
PR Suggestions
ddelange 338bbd0
Add graceful SIGTERM shutdown test
ddelange 3352ac7
Add returncode assertions
ddelange 4da8b06
Merge branch 'main' into patch-1
ddelange 8908490
Merge branch 'main' into patch-1
ddelange faa9052
pre-commit run --all-files
ddelange 39c27ed
Merge branch 'main' into patch-1
ddelange 513678d
Merge branch 'main' into patch-1
ddelange 7fa5fa8
Bump SHUTDOWN_TIMEOUT for tests
ddelange b6859d0
Merge branch 'main' into patch-1
ddelange cab59e5
Merge branch 'main' into patch-1
ddelange a7004a5
Merge branch 'main' into patch-1
ddelange 200005c
Merge branch 'main' into patch-1
ddelange 1f386ea
Add print statements
ddelange 98dce31
Switch from subprocess to anyio
ddelange 21c4042
Cosmetics
ddelange 39abfb1
Amend docstring
ddelange 8621870
Merge branch 'main' of https://github.com/prefecthq/prefect into sigt…
ddelange fc75845
Add output to AssertionError
ddelange 462728d
Use anyio.fail_after
ddelange e526e76
Fix test_sigterm_sends_sigterm_directly
ddelange 8f48e36
Remove returncode assertions
ddelange 05dad74
Reduce sleep
ddelange d6a5b0b
Typo
ddelange fa01b41
Close stdout tempfile
ddelange 076cff0
Bump timeout a bit
ddelange 45f296f
Reduce timeout a bit
ddelange cd9af10
Amend when SIGKILL comes too late
ddelange 4663a23
Amend
ddelange 9f4213c
Deprecate re.search in test
ddelange 8b06c0b
Merge branch 'main' of https://github.com/prefecthq/prefect into sigt…
ddelange e25ac43
Merge branch 'PrefectHQ:main' into patch-1
ddelange 7014deb
Merge branch 'sigterm-testing' of https://github.com/ddelange/prefect…
ddelange 9b6fe02
Merge branch 'main' into patch-1
ddelange 8b1bd7d
Refactor tests with more comments
ddelange a006038
Merge branch 'main' of https://github.com/prefecthq/prefect into patch-1
ddelange 1277ebb
Add extra sleep
ddelange d900655
Merge branch 'main' into patch-1
ddelange ab4c86f
Increase sleep before return
ddelange 8151865
Merge branch 'main' into patch-1
ddelange 2ece2c8
Merge branch 'main' of https://github.com/prefecthq/prefect into patch-1
ddelange 8cba3d5
Merge branch 'patch-1' of https://github.com/ddelange/prefect into pa…
ddelange f884a5f
Make lint
ddelange 447f870
Merge branch 'main' into patch-1
ddelange 93bf35b
Merge leftover
ddelange 0759da1
Add docstring
ddelange 1634036
Handle SIGTERM received by agent gracefully
ddelange cca9d29
Typo
ddelange f0a8243
Merge branch 'main' into patch-1
ddelange 34303aa
Merge branch 'patch-1' of https://github.com/ddelange/prefect into gr…
ddelange cdf2958
Typo
ddelange a91a102
Fix f-strings
ddelange a4fc116
Fix agent startup waiting
ddelange 76b54d3
Amend docstring
ddelange 8350ac9
Typo
ddelange 1429b1f
Merge branch 'main' into patch-1
zanieb f115dbd
Merge branch 'main' into patch-1
ddelange b9a4d82
Merge branch 'main' into patch-1
ddelange d8a61ed
Merge branch 'main' of https://github.com/prefecthq/prefect into patch-1
ddelange 4062d34
Allow more ports
ddelange 094e661
Merge branch 'main' into patch-1
ddelange 2f69318
Rename tests, increase sleep
ddelange 80b403f
Merge branch 'patch-1' of https://github.com/ddelange/prefect into gr…
ddelange 812619b
Increase sleep
ddelange f6df82d
Merge branch 'main' into patch-1
ddelange d1eb1d8
Increase verbosity
ddelange 446eaff
Merge branch 'patch-1' of https://github.com/ddelange/prefect into gr…
ddelange 82ed35f
Amend
ddelange b7a5f28
Merge branch 'patch-1' of https://github.com/ddelange/prefect into gr…
ddelange b3a74d1
Remove unused var
ddelange 85e0178
Use ephemeral API
ddelange 4be41f2
Merge branch 'main' into patch-1
ddelange 8763521
Add back comment
ddelange d77a4c6
Merge branch 'patch-1' into graceful-agent
ddelange 6bb831d
Merge branch 'main' of https://github.com/prefecthq/prefect into grac…
ddelange 3341a95
Merge branch 'main' of https://github.com/prefecthq/prefect into grac…
ddelange 62a7538
Move testing signals into with statement
ddelange 40429cc
Merge branch 'main' into graceful-agent
ddelange b012b5f
Revert "Move testing signals into with statement"
ddelange 1ca698e
Increase agent startup stability
ddelange cafdca8
Amend
ddelange f86d54c
Merge branch 'main' into graceful-agent
ddelange 4bbf8a7
Merge branch 'main' into graceful-agent
ddelange 484ae5b
Add startup timeout check
ddelange 33df054
Force agent cli tests to run sequentially
ddelange 45ae19f
Amend
ddelange 913e5cc
Remove semaphore
ddelange 56b8474
Run agent tests in the same xdist worker
ddelange 7a6dbab
Merge branch 'main' into graceful-agent
ddelange 244806d
Merge branch 'main' into graceful-agent
ddelange 737f00a
Merge branch 'main' into graceful-agent
ddelange eb2f518
Merge branch 'main' into graceful-agent
ddelange 372f5ad
Merge branch 'main' into graceful-agent
ddelange 9008862
Attempt to reduce flakiness
ddelange 2d98912
PR Suggestions
ddelange 6dfa993
Merge branch 'main' into graceful-agent
ddelange 6c35602
Merge branch 'main' into graceful-agent
ddelange c92a0c4
Merge branch 'main' into graceful-agent
ddelange 91b10fc
Merge branch 'main' into graceful-agent
ddelange 7f026e1
Merge branch 'main' into graceful-agent
ddelange 89dc276
Merge branch 'main' into graceful-agent
ddelange a2d0412
Merge branch 'main' into graceful-agent
ddelange 459651d
Add Aborted. in out
ddelange 940b736
Merge branch 'main' into graceful-agent
ddelange b32fab1
Longer startup
ddelange 4f79859
Merge branch 'main' into graceful-agent
ddelange f8c3a37
Merge branch 'main' into graceful-agent
ddelange ba1bcda
Merge branch 'main' into graceful-agent
ddelange a41605f
Bump SHUTDOWN_TIMEOUT
ddelange e0e9855
Merge branch 'main' into graceful-agent
ddelange 0bcca8d
Merge branch 'main' into graceful-agent
ddelange 54a6149
Merge branch 'main' into graceful-agent
ddelange 8771503
Merge branch 'main' into graceful-agent
ddelange cf807bd
Merge branch 'main' into graceful-agent
ddelange 66b5562
Merge branch 'main' into graceful-agent
ddelange b6528ca
Merge branch 'main' into graceful-agent
ddelange 83ed2fc
Merge branch 'main' into graceful-agent
ddelange a99e11c
Merge branch 'main' into graceful-agent
ddelange 8e47fc0
Merge branch 'main' into graceful-agent
ddelange 1095b7b
Merge branch 'main' into graceful-agent
ddelange 94c7cd4
Merge branch 'main' into graceful-agent
ddelange 3f2a8cd
Merge branch 'main' into graceful-agent
ddelange 7f8c7e7
Merge branch 'main' into graceful-agent
ddelange dbc3da7
Merge branch 'main' into graceful-agent
ddelange 547126e
Merge branch 'main' into graceful-agent
ddelange e2d97fc
Merge branch 'main' into graceful-agent
ddelange 484a46d
Merge branch 'main' into graceful-agent
ddelange 16aeb6f
Merge branch 'main' into graceful-agent
ddelange eb4545c
Merge branch 'main' into graceful-agent
ddelange 4174619
Merge branch 'main' into graceful-agent
ddelange ffe496a
Attempt another process.wait upon timeout
ddelange 989f893
Merge branch 'main' into graceful-agent
ddelange f8f5968
Use hosted API server for tests
desertaxle File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,170 @@ | ||
import os | ||
import signal | ||
import sys | ||
import tempfile | ||
|
||
import anyio | ||
import pytest | ||
|
||
from prefect.settings import get_current_settings | ||
from prefect.utilities.processutils import open_process | ||
|
||
POLL_INTERVAL = 0.5 | ||
STARTUP_TIMEOUT = 20 | ||
SHUTDOWN_TIMEOUT = 20 | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
async def agent_process(): | ||
""" | ||
Runs an agent listening to all queues. | ||
Yields: | ||
The anyio.Process. | ||
""" | ||
out = tempfile.TemporaryFile() # capture output for test assertions | ||
|
||
# Will connect to the same database as normal test clients | ||
async with open_process( | ||
command=[ | ||
"prefect", | ||
"agent", | ||
"start", | ||
"--match=nonexist", | ||
"--api=", # ephemeral API | ||
], | ||
stdout=out, | ||
stderr=out, | ||
env={**os.environ, **get_current_settings().to_environment_variables()}, | ||
) as process: | ||
process.out = out | ||
|
||
for _ in range(int(STARTUP_TIMEOUT / POLL_INTERVAL)): | ||
await anyio.sleep(POLL_INTERVAL) | ||
if out.tell() > 200: | ||
await anyio.sleep(0.5) | ||
break | ||
|
||
assert out.tell() > 200, "The agent did not start up in time" | ||
assert process.returncode is None, "The agent failed to start up" | ||
|
||
# Yield to the consuming tests | ||
yield process | ||
|
||
# Then shutdown the process | ||
try: | ||
process.terminate() | ||
except ProcessLookupError: | ||
pass | ||
out.close() | ||
|
||
|
||
class TestAgentSignalForwarding: | ||
# run these tests sequentially in the same xdist worker to fix flakiness | ||
@pytest.mark.xdist_group(name="test_start_agent") | ||
@pytest.mark.skipif( | ||
sys.platform == "win32", | ||
reason="SIGTERM is only used in non-Windows environments", | ||
) | ||
async def test_sigint_sends_sigterm(self, agent_process): | ||
agent_process.send_signal(signal.SIGINT) | ||
with anyio.fail_after(SHUTDOWN_TIMEOUT): | ||
await agent_process.wait() | ||
agent_process.out.seek(0) | ||
out = agent_process.out.read().decode() | ||
|
||
assert "Sending SIGINT" in out, ( | ||
"When sending a SIGINT, the main process should receive a SIGINT." | ||
f" Output:\n{out}" | ||
) | ||
assert "Agent stopped!" in out, ( | ||
"When sending a SIGINT, the main process should shutdown gracefully." | ||
f" Output:\n{out}" | ||
) | ||
|
||
@pytest.mark.xdist_group(name="test_start_agent") | ||
@pytest.mark.skipif( | ||
sys.platform == "win32", | ||
reason="SIGTERM is only used in non-Windows environments", | ||
) | ||
async def test_sigterm_sends_sigterm_directly(self, agent_process): | ||
agent_process.send_signal(signal.SIGTERM) | ||
with anyio.fail_after(SHUTDOWN_TIMEOUT): | ||
await agent_process.wait() | ||
agent_process.out.seek(0) | ||
out = agent_process.out.read().decode() | ||
|
||
assert "Sending SIGINT" in out, ( | ||
"When sending a SIGTERM, the main process should receive a SIGINT." | ||
f" Output:\n{out}" | ||
) | ||
assert "Agent stopped!" in out, ( | ||
"When sending a SIGTERM, the main process should shutdown gracefully." | ||
f" Output:\n{out}" | ||
) | ||
|
||
@pytest.mark.xdist_group(name="test_start_agent") | ||
@pytest.mark.skipif( | ||
sys.platform == "win32", | ||
reason="SIGTERM is only used in non-Windows environments", | ||
) | ||
async def test_sigint_sends_sigterm_then_sigkill(self, agent_process): | ||
agent_process.send_signal(signal.SIGINT) | ||
await anyio.sleep(0.01) # some time needed for the recursive signal handler | ||
agent_process.send_signal(signal.SIGINT) | ||
with anyio.fail_after(SHUTDOWN_TIMEOUT): | ||
await agent_process.wait() | ||
agent_process.out.seek(0) | ||
out = agent_process.out.read().decode() | ||
|
||
assert ( | ||
# either the main PID is still waiting for shutdown, so forwards the SIGKILL | ||
"Sending SIGKILL" in out | ||
# or SIGKILL came too late, and the main PID is already closing | ||
or "KeyboardInterrupt" in out | ||
or "Agent stopped!" in out | ||
), ( | ||
"When sending two SIGINT shortly after each other, the main process should" | ||
f" first receive a SIGINT and then a SIGKILL. Output:\n{out}" | ||
) | ||
|
||
@pytest.mark.xdist_group(name="test_start_agent") | ||
@pytest.mark.skipif( | ||
sys.platform == "win32", | ||
reason="SIGTERM is only used in non-Windows environments", | ||
) | ||
async def test_sigterm_sends_sigterm_then_sigkill(self, agent_process): | ||
agent_process.send_signal(signal.SIGTERM) | ||
await anyio.sleep(0.01) # some time needed for the recursive signal handler | ||
agent_process.send_signal(signal.SIGTERM) | ||
with anyio.fail_after(SHUTDOWN_TIMEOUT): | ||
await agent_process.wait() | ||
agent_process.out.seek(0) | ||
out = agent_process.out.read().decode() | ||
|
||
assert ( | ||
# either the main PID is still waiting for shutdown, so forwards the SIGKILL | ||
"Sending SIGKILL" in out | ||
# or SIGKILL came too late, and the main PID is already closing | ||
or "KeyboardInterrupt" in out | ||
or "Agent stopped!" in out | ||
), ( | ||
"When sending two SIGTERM shortly after each other, the main process should" | ||
f" first receive a SIGINT and then a SIGKILL. Output:\n{out}" | ||
) | ||
|
||
@pytest.mark.xdist_group(name="test_start_agent") | ||
@pytest.mark.skipif( | ||
sys.platform != "win32", | ||
reason="CTRL_BREAK_EVENT is only defined in Windows", | ||
) | ||
async def test_sends_ctrl_break_win32(self, agent_process): | ||
agent_process.send_signal(signal.SIGINT) | ||
with anyio.fail_after(SHUTDOWN_TIMEOUT): | ||
await agent_process.wait() | ||
agent_process.out.seek(0) | ||
out = agent_process.out.read().decode() | ||
|
||
assert "Sending CTRL_BREAK_EVENT" in out, ( | ||
"When sending a SIGINT, the main process should send a CTRL_BREAK_EVENT to" | ||
f" the uvicorn subprocess. Output:\n{out}" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ddelange interesting — I thought we were doing xdist grouping by module already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh interesting, in that case this should have no effect because the idea here was to not start multiple agent subprocesses concurrently. will try removing again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use
in CI