Skip to content

Commit

Permalink
Give interactive processes a chance to gracefully shutdown (#14580)
Browse files Browse the repository at this point in the history
## Overview
This PR aims to add a graceful shutdown feature for interactive processes. 

Currently, when `pants run` receives a `SIGINT`, it send a `SIGKILL` to child processes which causes them to shutdown without an opportunity to run cleanup procedures (for instance, `try .. finally` and context managers in python). This can cause file handles to be left open, streams to not be flushed, etc. 

This PR changes the behavior to first send a SIGINT and wait for the process to gracefully terminate within a pre-determined amount of time. If the process does not shut itself down gracefully, a SIGKILL is sent.

This problem is outlined in greater detail in #13230

### Approach
This PR takes approach number (2) from #13230 (comment). This is was the lowest-lift implementation. It is slightly sub-optimal since it blocks in the `Drop` implementation, however, this should not cause deadlocks since it blocks for a maximum of a bounded time. Furthermore, in my benchmarking, this drop rarely runs, as most of the time the cancellation happens before drop and the drop becomes essentially a noop.

An alternative is to run our own reaping process using a separate routine launched with `tokio::spawn`. The main loop could await this reaping process before shutting down. This would be more complex but would allow us to avoid using blocking calls where we should be awaiting.

Long term a solution for `AsyncDrop` would solve this problem the most elegantly.

Fixes #13230.
  • Loading branch information
slessans authored Feb 28, 2022
1 parent 2ae0ee8 commit d5d7186
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 31 deletions.
93 changes: 88 additions & 5 deletions src/rust/engine/process_execution/src/children.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::{thread, time};

use nix::sys::signal;
use nix::unistd::getpgid;
use nix::unistd::Pid;
use tokio::process::{Child, Command};

// We keep this relatively low to start to encourage interactivity. If users end up needing longer graceful
// shutdown periods, we can expose it as a per-process setting.
const GRACEFUL_SHUTDOWN_MAX_WAIT_TIME: time::Duration = time::Duration::from_secs(3);
const GRACEFUL_SHUTDOWN_POLL_TIME: time::Duration = time::Duration::from_millis(50);

/// A child process running in its own PGID, with a drop implementation that will kill that
/// PGID.
///
Expand Down Expand Up @@ -47,14 +53,91 @@ impl ManagedChild {
})
}

/// Kill the process's unique PGID or return an error if we don't have a PID or cannot kill.
pub fn kill_pgid(&mut self) -> Result<(), String> {
fn get_pgid(&self) -> Result<Pid, String> {
let pid = self.id().ok_or_else(|| "Process had no PID.".to_owned())?;
let pgid = getpgid(Some(Pid::from_raw(pid as i32)))
.map_err(|e| format!("Could not get process group id of child process: {}", e))?;
// Kill the negative PGID to kill the entire process group.
signal::kill(Pid::from_raw(-pgid.as_raw()), signal::Signal::SIGKILL)
Ok(pgid)
}

/// Send a signal to the child process group.
fn signal_pg<T: Into<Option<signal::Signal>>>(&mut self, signal: T) -> Result<(), String> {
let pgid = self.get_pgid()?;
// the negative PGID will signal the entire process group.
signal::kill(Pid::from_raw(-pgid.as_raw()), signal)
.map_err(|e| format!("Failed to interrupt child process group: {}", e))?;
Ok(())
}

/// Check if the child has exited.
///
/// This returns true if the child has exited with any return code, or false
/// if the child has not yet exited. An error indicated a system error checking
/// the result of the child process, and does not necessarily indicate that
/// has exited or not.
fn check_child_has_exited(&mut self) -> Result<bool, String> {
self
.child
.try_wait()
.map(|o| o.is_some())
.map_err(|e| e.to_string())
}

/// Synchronously wait for the child to exit.
///
/// This method will repeatedly poll the child process until it exits, an error occurrs
/// or the timeout is reached.
///
/// A return value of Ok(true) indicates that the child has terminated, Ok(false) indicates
/// that we reached the max_wait_duration while waiting for the child to terminate.
///
/// This method *will* block the current thread but will do so for a bounded amount of time.
fn wait_for_child_exit_sync(
&mut self,
max_wait_duration: time::Duration,
) -> Result<bool, String> {
let deadline = time::Instant::now() + max_wait_duration;
while time::Instant::now() <= deadline {
if self.check_child_has_exited()? {
return Ok(true);
}
thread::sleep(GRACEFUL_SHUTDOWN_POLL_TIME);
}
// if we get here we have timed-out
Ok(false)
}

/// Attempt to gracefully shutdown the process.
///
/// This will send a SIGINT to the process and give it a chance to shutdown gracefully. If the
/// process does not respond to the SIGINT within a fixed interval, a SIGKILL will be sent.
///
/// This method *will* block the current thread but will do so for a bounded amount of time.
pub fn graceful_shutdown_sync(&mut self) -> Result<(), String> {
self.signal_pg(signal::Signal::SIGINT)?;
match self.wait_for_child_exit_sync(GRACEFUL_SHUTDOWN_MAX_WAIT_TIME) {
Ok(true) => {
// process was gracefully shutdown
self.killed.store(true, Ordering::SeqCst);
Ok(())
}
Ok(false) => {
// we timed out waiting for the child to exit, so we need to kill it.
log::warn!(
"Timed out waiting for graceful shutdown of process group. Will try SIGKILL instead."
);
self.kill_pgid()
}
Err(e) => {
log::warn!("An error occurred while waiting for graceful shutdown of process group ({}). Will try SIGKILL instead.", e);
self.kill_pgid()
}
}
}

/// Kill the process's unique PGID or return an error if we don't have a PID or cannot kill.
fn kill_pgid(&mut self) -> Result<(), String> {
self.signal_pg(signal::Signal::SIGKILL)?;
self.killed.store(true, Ordering::SeqCst);
Ok(())
}
Expand All @@ -78,7 +161,7 @@ impl DerefMut for ManagedChild {
impl Drop for ManagedChild {
fn drop(&mut self) {
if !self.killed.load(Ordering::SeqCst) {
let _ = self.kill_pgid();
let _ = self.graceful_shutdown_sync();
}
}
}
2 changes: 1 addition & 1 deletion src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ fn interactive_process(
_ = session.cancelled() => {
// The Session was cancelled: attempt to kill the process group / process, and
// then wait for it to exit (to avoid zombies).
if let Err(e) = subprocess.kill_pgid() {
if let Err(e) = subprocess.graceful_shutdown_sync() {
// Failed to kill the PGID: try the non-group form.
log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\
This is unexpected: please file an issue about this problem at \
Expand Down
60 changes: 40 additions & 20 deletions testprojects/src/python/coordinated_runs/waiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,53 @@
import time
from multiprocessing import Process

waiting_for_file = sys.argv[1]
pid_file = sys.argv[2]
child_pid_file = sys.argv[3]
attempts = 60


def run_child():
while True:
print("Child running...")
time.sleep(1)


child = Process(target=run_child, daemon=True)
child.start()
def main():
waiting_for_file = sys.argv[1]
pid_file = sys.argv[2]
child_pid_file = sys.argv[3]
cleanup_wait_time = int(sys.argv[4])
attempts = 60

with open(child_pid_file, "w") as pf:
pf.write(str(child.pid))
child = Process(target=run_child, daemon=True)
child.start()

with open(pid_file, "w") as pf:
pf.write(str(os.getpid()))
with open(child_pid_file, "w") as pf:
pf.write(str(child.pid))

try:
while not os.path.isfile(waiting_for_file):
if attempts <= 0:
raise Exception("File was never written.")
attempts -= 1
sys.stderr.write("Waiting for file {}\n".format(waiting_for_file))
time.sleep(1)
finally:
child.terminate()
with open(pid_file, "w") as pf:
pf.write(str(os.getpid()))

try:
while not os.path.isfile(waiting_for_file):
if attempts <= 0:
raise Exception("File was never written.")
attempts -= 1
sys.stderr.write("Waiting for file {}\n".format(waiting_for_file))
sys.stderr.flush()
time.sleep(1)

except KeyboardInterrupt:
sys.stderr.write("keyboard int received\n")
sys.stderr.flush()

finally:
sys.stderr.write("waiter cleaning up\n")
sys.stderr.flush()

child.terminate()
if cleanup_wait_time > 0:
time.sleep(cleanup_wait_time)

sys.stderr.write("waiter cleanup complete\n")
sys.stderr.flush()


if __name__ == "__main__":
main()
45 changes: 41 additions & 4 deletions tests/python/pants_test/pantsd/pantsd_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,15 +499,23 @@ def test_pantsd_invalidation_stale_sources(self):
rm_rf(test_path)

def _assert_pantsd_keyboardinterrupt_signal(
self, signum: int, regexps: list[str] | None = None
self,
signum: int,
regexps: list[str] | None = None,
not_regexps: list[str] | None = None,
cleanup_wait_time: int = 0,
):
"""Send a signal to the thin pailgun client and observe the error messaging.
:param signum: The signal to send.
:param regexps: Assert that all of these regexps match somewhere in stderr.
:param not_regexps: Assert that all of these regexps do not match somewhere in stderr.
:param cleanup_wait_time: passed throught to waiter, dictated how long simulated cleanup will take
"""
with self.pantsd_test_context() as (workdir, config, checker):
client_handle, waiter_pid, child_pid, _ = launch_waiter(workdir=workdir, config=config)
client_handle, waiter_pid, child_pid, _ = launch_waiter(
workdir=workdir, config=config, cleanup_wait_time=cleanup_wait_time
)
client_pid = client_handle.process.pid
waiter_process = psutil.Process(waiter_pid)
child_process = psutil.Process(waiter_pid)
Expand All @@ -516,6 +524,9 @@ def _assert_pantsd_keyboardinterrupt_signal(
assert child_process.is_running()
checker.assert_started()

# give time to enter the try/finally block in the child process
time.sleep(5)

# This should kill the client, which will cancel the run on the server, which will
# kill the waiting process and its child.
os.kill(client_pid, signum)
Expand All @@ -525,17 +536,43 @@ def _assert_pantsd_keyboardinterrupt_signal(
for regexp in regexps or []:
self.assertRegex(client_run.stderr, regexp)

for regexp in not_regexps or []:
self.assertNotRegex(client_run.stderr, regexp)

# pantsd should still be running, but the waiter process and child should have been
# killed.
time.sleep(5)
assert not waiter_process.is_running()
assert not child_process.is_running()
checker.assert_running()

def test_pantsd_sigint(self):
def test_pantsd_graceful_shutdown(self):
"""Test that SIGINT is propgated to child processes and they are given time to shutdown."""
self._assert_pantsd_keyboardinterrupt_signal(
signal.SIGINT,
regexps=[
"Interrupted by user.",
"keyboard int received",
"waiter cleaning up",
"waiter cleanup complete",
],
cleanup_wait_time=0,
)

def test_pantsd_graceful_shutdown_deadline(self):
"""Test that a child process that does not respond to SIGINT within 5 seconds, is forcibly
cleaned up with a SIGKILL."""
self._assert_pantsd_keyboardinterrupt_signal(
signal.SIGINT,
regexps=["Interrupted by user."],
regexps=[
"Interrupted by user.",
"keyboard int received",
"waiter cleaning up",
],
not_regexps=[
"waiter cleanup complete",
],
cleanup_wait_time=6,
)

def test_sigint_kills_request_waiting_for_lock(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def attempts(


def launch_waiter(
*, workdir: str, config: Mapping | None = None
*, workdir: str, config: Mapping | None = None, cleanup_wait_time: int = 0
) -> tuple[PantsJoinHandle, int, int, str]:
"""Launch a process that will wait forever for a file to be created.
Expand All @@ -74,6 +74,7 @@ def launch_waiter(
file_to_make,
waiter_pid_file,
child_pid_file,
str(cleanup_wait_time),
]
client_handle = run_pants_with_workdir_without_waiting(argv, workdir=workdir, config=config)
waiter_pid = -1
Expand Down

0 comments on commit d5d7186

Please sign in to comment.