Skip to content

Commit

Permalink
Add a process wrapper to kill the entire process group on Drop.
Browse files Browse the repository at this point in the history
  • Loading branch information
stuhood committed Nov 19, 2021
1 parent 9393470 commit 4ccc1eb
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ hashing = { path = "../hashing" }
libc = "0.2.39"
log = "0.4"
nails = "0.12"
nix = "0.20"
sha2 = "0.9"
shell-quote = "0.1.0"
store = { path = "../fs/store" }
Expand Down
83 changes: 83 additions & 0 deletions src/rust/engine/process_execution/src/children.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};

use nix::sys::signal;
use nix::unistd::getpgid;
use nix::unistd::Pid;
use tokio::process::{Child, Command};

/// A child process running in its own PGID, with a drop implementation that will kill that
/// PGID.
///
/// TODO: This struct should be extended to allow for sending other lifecycle signals in
/// https://github.com/pantsbuild/pants/issues/13230
pub struct ManagedChild {
child: Child,
killed: AtomicBool,
}

impl ManagedChild {
pub fn spawn(mut command: Command) -> Result<Self, String> {
// Set `kill_on_drop` to encourage `tokio` to `wait` the process via its own "reaping"
// mechanism:
// see https://docs.rs/tokio/1.14.0/tokio/process/struct.Command.html#method.kill_on_drop
command.kill_on_drop(true);

// Adjust the Command to create its own PGID as it starts, to make it safe to kill the PGID
// later.
unsafe {
command.pre_exec(|| {
nix::unistd::setsid().map(|_pgid| ()).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Could not create new pgid: {}", e),
)
})
});
};

// Then spawn.
let child = command
.spawn()
.map_err(|e| format!("Error executing interactive process: {}", e))?;
Ok(Self {
child,
killed: AtomicBool::new(false),
})
}

/// Kill the process's unique PGID or return an error if we don't have a PID or cannot kill.
pub fn kill_pgid(&mut self) -> Result<(), String> {
let pid = self.id().ok_or_else(|| "Process had no PID.".to_owned())?;
// Kill the negative PGID to kill the entire process group.
let pgid = getpgid(Some(Pid::from_raw(pid as i32)))
.map_err(|e| format!("Could not get process group id of child process: {}", e))?;
signal::kill(Pid::from_raw(-pgid.as_raw()), signal::Signal::SIGKILL)
.map_err(|e| format!("Failed to interrupt child process group: {}", e))?;
self.killed.store(true, Ordering::SeqCst);
Ok(())
}
}

impl Deref for ManagedChild {
type Target = Child;

fn deref(&self) -> &Child {
&self.child
}
}

impl DerefMut for ManagedChild {
fn deref_mut(&mut self) -> &mut Child {
&mut self.child
}
}

/// Implements drop by killing the process group.
impl Drop for ManagedChild {
fn drop(&mut self) {
if !self.killed.load(Ordering::SeqCst) {
let _ = self.kill_pgid();
}
}
}
19 changes: 7 additions & 12 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use std::convert::TryFrom;
use std::path::PathBuf;
use std::sync::Arc;

pub use log::Level;

use async_semaphore::AsyncSemaphore;
use async_trait::async_trait;
use concrete_time::{Duration, TimeSpan};
use fs::RelativePath;
use hashing::{Digest, EMPTY_FINGERPRINT};
use log::Level;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ExecutedActionMetadata;
use serde::{Deserialize, Serialize};
Expand All @@ -46,6 +47,8 @@ pub mod cache;
#[cfg(test)]
mod cache_tests;

pub mod children;

pub mod local;
#[cfg(test)]
mod local_tests;
Expand All @@ -64,9 +67,9 @@ pub mod named_caches;

extern crate uname;

pub use crate::children::ManagedChild;
pub use crate::named_caches::{CacheDest, CacheName, NamedCaches};
use concrete_time::{Duration, TimeSpan};
use fs::RelativePath;
pub use crate::remote_cache::RemoteCacheWarningsBehavior;

#[derive(PartialOrd, Ord, Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[allow(non_camel_case_types)]
Expand Down Expand Up @@ -660,13 +663,5 @@ impl From<Box<BoundedCommandRunner>> for Arc<dyn CommandRunner> {
}
}

#[derive(Clone, Copy, Debug, PartialEq, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
pub enum RemoteCacheWarningsBehavior {
Ignore,
FirstOnly,
Backoff,
}

#[cfg(test)]
mod tests;
10 changes: 9 additions & 1 deletion src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,17 @@ use workunit_store::{
use crate::remote::make_execute_request;
use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process,
ProcessCacheScope, ProcessMetadata, RemoteCacheWarningsBehavior,
ProcessCacheScope, ProcessMetadata,
};

#[derive(Clone, Copy, Debug, PartialEq, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
pub enum RemoteCacheWarningsBehavior {
Ignore,
FirstOnly,
Backoff,
}

/// This `CommandRunner` implementation caches results remotely using the Action Cache service
/// of the Remote Execution API.
///
Expand Down
22 changes: 13 additions & 9 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use fs::{safe_create_dir_all_ioerror, PreparedPathGlobs, RelativePath};
use futures::future::{self, BoxFuture, FutureExt, TryFutureExt};
use hashing::{Digest, EMPTY_DIGEST};
use indexmap::IndexMap;
use process_execution::{CacheDest, CacheName, NamedCaches};
use process_execution::{CacheDest, CacheName, ManagedChild, NamedCaches};
use pyo3::{PyAny, PyRef, Python};
use stdio::TryCloneAsFile;
use store::{SnapshotOps, SubsetParams};
Expand Down Expand Up @@ -677,8 +677,6 @@ fn interactive_process(
command.env_clear();
command.envs(env);

command.kill_on_drop(true);

let exit_status = session.clone()
.with_console_ui_disabled(async move {
// Once any UI is torn down, grab exclusive access to the console.
Expand All @@ -705,18 +703,24 @@ fn interactive_process(
.try_clone_as_file()
.map_err(|e| format!("Couldn't clone stderr: {}", e))?,
));
let mut subprocess = command
.spawn()
.map_err(|e| format!("Error executing interactive process: {}", e))?;
let mut subprocess = ManagedChild::spawn(command)?;
tokio::select! {
_ = session.cancelled() => {
// The Session was cancelled: kill the process, and then wait for it to exit (to avoid
// zombies).
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?;
log::warn!("Session was cancelled! Killing {:?} and waiting...", subprocess.id());
// The Session was cancelled: attempt to kill the process group / process, and
// then wait for it to exit (to avoid zombies).
if let Err(e) = subprocess.kill_pgid() {
// Failed to kill the PGID: try the non-group form.
log::warn!("Failed to kill spawned process group ({}). Will try killing only the top process.\n\
This is unexpected: please file an issue about this problem at \
[https://github.com/pantsbuild/pants/issues/new]", e);
subprocess.kill().map_err(|e| format!("Failed to interrupt child process: {}", e)).await?;
};
subprocess.wait().await.map_err(|e| e.to_string())
}
exit_status = subprocess.wait() => {
// The process exited.
log::warn!("Process exited.");
exit_status.map_err(|e| e.to_string())
}
}
Expand Down

0 comments on commit 4ccc1eb

Please sign in to comment.