Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interactive processes kill the process and its children on Ctrl+C. #13678

Merged
merged 2 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_ctrl_c(pantsd: bool) -> None:
dest = os.path.join(workdir, "dest.log")

# Start a pantsd run that will wait forever, then kill the pantsd client.
client_handle, _, _ = launch_waiter(
client_handle, _, _, _ = launch_waiter(
workdir=workdir, config=workunit_logger_config(dest, pantsd=pantsd)
)
client_pid = client_handle.process.pid
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ hashing = { path = "../hashing" }
libc = "0.2.39"
log = "0.4"
nails = "0.12"
nix = "0.20"
sha2 = "0.9"
shell-quote = "0.1.0"
store = { path = "../fs/store" }
Expand Down
84 changes: 84 additions & 0 deletions src/rust/engine/process_execution/src/children.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};

use nix::sys::signal;
use nix::unistd::getpgid;
use nix::unistd::Pid;
use tokio::process::{Child, Command};

/// A child process running in its own PGID, with a drop implementation that will kill that
/// PGID.
///
/// TODO: If this API is useful, we should consider extending it to parented Nailgun processes
/// and to all local execution in general. It could also be adjusted for sending other posix
/// signals in sequence for https://github.com/pantsbuild/pants/issues/13230.
pub struct ManagedChild {
child: Child,
killed: AtomicBool,
}

impl ManagedChild {
pub fn spawn(mut command: Command) -> Result<Self, String> {
// Set `kill_on_drop` to encourage `tokio` to `wait` the process via its own "reaping"
// mechanism:
// see https://docs.rs/tokio/1.14.0/tokio/process/struct.Command.html#method.kill_on_drop
command.kill_on_drop(true);

// Adjust the Command to create its own PGID as it starts, to make it safe to kill the PGID
// later.
unsafe {
command.pre_exec(|| {
nix::unistd::setsid().map(|_pgid| ()).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Could not create new pgid: {}", e),
)
})
});
};

// Then spawn.
let child = command
.spawn()
.map_err(|e| format!("Error executing interactive process: {}", e))?;
Ok(Self {
child,
killed: AtomicBool::new(false),
})
}

/// Kill the process's unique PGID or return an error if we don't have a PID or cannot kill.
pub fn kill_pgid(&mut self) -> Result<(), String> {
let pid = self.id().ok_or_else(|| "Process had no PID.".to_owned())?;
let pgid = getpgid(Some(Pid::from_raw(pid as i32)))
.map_err(|e| format!("Could not get process group id of child process: {}", e))?;
// Kill the negative PGID to kill the entire process group.
signal::kill(Pid::from_raw(-pgid.as_raw()), signal::Signal::SIGKILL)
.map_err(|e| format!("Failed to interrupt child process group: {}", e))?;
self.killed.store(true, Ordering::SeqCst);
Ok(())
}
}

impl Deref for ManagedChild {
type Target = Child;

fn deref(&self) -> &Child {
&self.child
}
}

impl DerefMut for ManagedChild {
fn deref_mut(&mut self) -> &mut Child {
&mut self.child
}
}

/// Implements drop by killing the process group.
impl Drop for ManagedChild {
fn drop(&mut self) {
if !self.killed.load(Ordering::SeqCst) {
let _ = self.kill_pgid();
}
}
}
19 changes: 7 additions & 12 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use std::convert::TryFrom;
use std::path::PathBuf;
use std::sync::Arc;

pub use log::Level;

use async_semaphore::AsyncSemaphore;
use async_trait::async_trait;
use concrete_time::{Duration, TimeSpan};
use fs::RelativePath;
use hashing::{Digest, EMPTY_FINGERPRINT};
use log::Level;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ExecutedActionMetadata;
use serde::{Deserialize, Serialize};
Expand All @@ -46,6 +47,8 @@ pub mod cache;
#[cfg(test)]
mod cache_tests;

pub mod children;

pub mod local;
#[cfg(test)]
mod local_tests;
Expand All @@ -64,9 +67,9 @@ pub mod named_caches;

extern crate uname;

pub use crate::children::ManagedChild;
pub use crate::named_caches::{CacheDest, CacheName, NamedCaches};
use concrete_time::{Duration, TimeSpan};
use fs::RelativePath;
pub use crate::remote_cache::RemoteCacheWarningsBehavior;

#[derive(PartialOrd, Ord, Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[allow(non_camel_case_types)]
Expand Down Expand Up @@ -660,13 +663,5 @@ impl From<Box<BoundedCommandRunner>> for Arc<dyn CommandRunner> {
}
}

#[derive(Clone, Copy, Debug, PartialEq, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
pub enum RemoteCacheWarningsBehavior {
Ignore,
FirstOnly,
Backoff,
}

#[cfg(test)]
mod tests;
10 changes: 9 additions & 1 deletion src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,17 @@ use workunit_store::{
use crate::remote::make_execute_request;
use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
ProcessCacheScope, ProcessMetadata, RemoteCacheWarningsBehavior,
ProcessCacheScope, ProcessMetadata,
};

#[derive(Clone, Copy, Debug, PartialEq, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
pub enum RemoteCacheWarningsBehavior {
Ignore,
FirstOnly,
Backoff,
}

/// This `CommandRunner` implementation caches results remotely using the Action Cache service
/// of the Remote Execution API.
///
Expand Down
20 changes: 11 additions & 9 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use fs::{safe_create_dir_all_ioerror, PreparedPathGlobs, RelativePath};
use futures::future::{self, BoxFuture, FutureExt, TryFutureExt};
use hashing::{Digest, EMPTY_DIGEST};
use indexmap::IndexMap;
use process_execution::{CacheDest, CacheName, NamedCaches};
use process_execution::{CacheDest, CacheName, ManagedChild, NamedCaches};
use pyo3::{PyAny, PyRef, Python};
use stdio::TryCloneAsFile;
use store::{SnapshotOps, SubsetParams};
Expand Down Expand Up @@ -677,8 +677,6 @@ fn interactive_process(
command.env_clear();
command.envs(env);

command.kill_on_drop(true);

let exit_status = session.clone()
.with_console_ui_disabled(async move {
// Once any UI is torn down, grab exclusive access to the console.
Expand All @@ -705,14 +703,18 @@ fn interactive_process(
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stderr: {}", e))?,
));
let mut subprocess = command
.spawn()
.map_err(|e| format!("Error executing interactive process: {}", e))?;
let mut subprocess = ManagedChild::spawn(command)?;
tokio::select! {
_ = session.cancelled() => {
// The Session was cancelled: kill the process, and then wait for it to exit (to avoid
// zombies).
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?;
// The Session was cancelled: attempt to kill the process group / process, and
// then wait for it to exit (to avoid zombies).
if let Err(e) = subprocess.kill_pgid() {
// Failed to kill the PGID: try the non-group form.
log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\
This is unexpected: please file an issue about this problem at \
[https://github.com/pantsbuild/pants/issues/new]", e);
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?;
};
subprocess.wait().await.map_err(|e| e.to_string())
}
exit_status = subprocess.wait() => {
Expand Down
32 changes: 26 additions & 6 deletions testprojects/src/python/coordinated_runs/waiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,35 @@
import os
import sys
import time
from multiprocessing import Process

waiting_for_file = sys.argv[1]
pid_file = sys.argv[2]
child_pid_file = sys.argv[3]
attempts = 60


def run_child():
while True:
print("Child running...")
time.sleep(1)


child = Process(target=run_child, daemon=True)
child.start()

with open(child_pid_file, "w") as pf:
pf.write(str(child.pid))

with open(pid_file, "w") as pf:
pf.write(str(os.getpid()))
while not os.path.isfile(waiting_for_file):
if attempts <= 0:
raise Exception("File was never written.")
attempts -= 1
sys.stderr.write("Waiting for file {}\n".format(waiting_for_file))
time.sleep(1)

try:
while not os.path.isfile(waiting_for_file):
if attempts <= 0:
raise Exception("File was never written.")
attempts -= 1
sys.stderr.write("Waiting for file {}\n".format(waiting_for_file))
time.sleep(1)
finally:
child.terminate()
34 changes: 9 additions & 25 deletions tests/python/pants_test/pantsd/pantsd_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,26 +498,6 @@ def test_pantsd_invalidation_stale_sources(self):
finally:
rm_rf(test_path)

@unittest.skip("TODO https://github.com/pantsbuild/pants/issues/7654")
def test_pantsd_parse_exception_success(self):
# This test covers the case described in #6426, where a run that is failing fast due to an
# exception can race other completing work. We expect all runs to fail due to the error
# that has been introduced, but none of them should hang.
test_path = "testprojects/3rdparty/this_is_definitely_not_a_valid_directory"
test_build_file = os.path.join(test_path, "BUILD")
invalid_symbol = "this_is_definitely_not_a_valid_symbol"

try:
safe_mkdir(test_path, clean=True)
safe_file_dump(test_build_file, f"{invalid_symbol}()")
for _ in range(3):
with self.pantsd_run_context(success=False) as ctx:
result = ctx.runner(["list", "testprojects::"])
ctx.checker.assert_started()
self.assertIn(invalid_symbol, result.stderr)
finally:
rm_rf(test_path)

def _assert_pantsd_keyboardinterrupt_signal(
self, signum: int, regexps: list[str] | None = None
):
Expand All @@ -527,25 +507,29 @@ def _assert_pantsd_keyboardinterrupt_signal(
:param regexps: Assert that all of these regexps match somewhere in stderr.
"""
with self.pantsd_test_context() as (workdir, config, checker):
client_handle, waiter_process_pid, _ = launch_waiter(workdir=workdir, config=config)
client_handle, waiter_pid, child_pid, _ = launch_waiter(workdir=workdir, config=config)
client_pid = client_handle.process.pid
waiter_process = psutil.Process(waiter_process_pid)
waiter_process = psutil.Process(waiter_pid)
child_process = psutil.Process(waiter_pid)

assert waiter_process.is_running()
assert child_process.is_running()
checker.assert_started()

# This should kill the client, which will cancel the run on the server, which will
# kill the waiting process.
# kill the waiting process and its child.
os.kill(client_pid, signum)
client_run = client_handle.join()
client_run.assert_failure()

for regexp in regexps or []:
self.assertRegex(client_run.stderr, regexp)

# pantsd should still be running, but the waiter process should have been killed.
# pantsd should still be running, but the waiter process and child should have been
# killed.
time.sleep(5)
assert not waiter_process.is_running()
assert not child_process.is_running()
checker.assert_running()

def test_pantsd_sigint(self):
Expand All @@ -560,7 +544,7 @@ def test_sigint_kills_request_waiting_for_lock(self):
config = {"GLOBAL": {"pantsd_timeout_when_multiple_invocations": -1, "level": "debug"}}
with self.pantsd_test_context(extra_config=config) as (workdir, config, checker):
# Run a process that will wait forever.
first_run_handle, _, file_to_create = launch_waiter(workdir=workdir, config=config)
first_run_handle, _, _, file_to_create = launch_waiter(workdir=workdir, config=config)

checker.assert_started()
checker.assert_running()
Expand Down
14 changes: 9 additions & 5 deletions tests/python/pants_test/pantsd/pantsd_integration_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,34 @@ def attempts(

def launch_waiter(
*, workdir: str, config: Mapping | None = None
) -> tuple[PantsJoinHandle, int, str]:
) -> tuple[PantsJoinHandle, int, int, str]:
"""Launch a process that will wait forever for a file to be created.

Returns the pid of the pants client, the pid of the waiting child process, and the file to
create to cause the waiting child to exit.
Returns the pants client handle, the pid of the waiting process, the pid of a child of the
waiting process, and the file to create to cause the waiting child to exit.
"""
file_to_make = os.path.join(workdir, "some_magic_file")
waiter_pid_file = os.path.join(workdir, "pid_file")
child_pid_file = os.path.join(workdir, "child_pid_file")

argv = [
"run",
"testprojects/src/python/coordinated_runs:waiter",
"--",
file_to_make,
waiter_pid_file,
child_pid_file,
]
client_handle = run_pants_with_workdir_without_waiting(argv, workdir=workdir, config=config)
waiter_pid = -1
for _ in attempts("The waiter process should have written its pid."):
waiter_pid_str = maybe_read_file(waiter_pid_file)
if waiter_pid_str:
child_pid_str = maybe_read_file(child_pid_file)
if waiter_pid_str and child_pid_str:
waiter_pid = int(waiter_pid_str)
child_pid = int(child_pid_str)
break
return client_handle, waiter_pid, file_to_make
return client_handle, waiter_pid, child_pid, file_to_make


class PantsDaemonMonitor(ProcessManager):
Expand Down