Skip to content

Commit

Permalink
fix(task): forward signals to spawned sub-processes on unix (#27141)
Browse files Browse the repository at this point in the history
Closes #18445
  • Loading branch information
dsherret authored Nov 29, 2024
1 parent 8626ec7 commit f624860
Show file tree
Hide file tree
Showing 15 changed files with 589 additions and 287 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ deno_path_util.workspace = true
deno_resolver.workspace = true
deno_runtime = { workspace = true, features = ["include_js_files_for_snapshotting"] }
deno_semver.workspace = true
deno_task_shell = "=0.18.1"
deno_task_shell = "=0.20.1"
deno_telemetry.workspace = true
deno_terminal.workspace = true
libsui = "0.5.0"
Expand Down
14 changes: 4 additions & 10 deletions cli/lsp/tsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1297,16 +1297,10 @@ impl TsServer {
{
// When an LSP request is cancelled by the client, the future this is being
// executed under and any local variables here will be dropped at the next
// await point. To pass on that cancellation to the TS thread, we make this
// wrapper which cancels the request's token on drop.
struct DroppableToken(CancellationToken);
impl Drop for DroppableToken {
fn drop(&mut self) {
self.0.cancel();
}
}
// await point. To pass on that cancellation to the TS thread, we use drop_guard
// which cancels the request's token on drop.
let token = token.child_token();
let droppable_token = DroppableToken(token.clone());
let droppable_token = token.clone().drop_guard();
let (tx, mut rx) = oneshot::channel::<Result<String, AnyError>>();
let change = self.pending_change.lock().take();

Expand All @@ -1320,7 +1314,7 @@ impl TsServer {
tokio::select! {
value = &mut rx => {
let value = value??;
drop(droppable_token);
droppable_token.disarm();
Ok(serde_json::from_str(&value)?)
}
_ = token.cancelled() => {
Expand Down
25 changes: 25 additions & 0 deletions cli/npm/managed/resolvers/common/lifecycle_scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use deno_npm::resolution::NpmResolutionSnapshot;
use deno_runtime::deno_io::FromRawIoHandle;
use deno_semver::package::PackageNv;
use deno_semver::Version;
use deno_task_shell::KillSignal;
use std::borrow::Cow;
use std::collections::HashSet;
use std::rc::Rc;
Expand Down Expand Up @@ -155,6 +156,29 @@ impl<'a> LifecycleScripts<'a> {
packages: &[NpmResolutionPackage],
root_node_modules_dir_path: &Path,
progress_bar: &ProgressBar,
) -> Result<(), AnyError> {
let kill_signal = KillSignal::default();
let _drop_signal = kill_signal.clone().drop_guard();
// we don't run with signals forwarded because once signals
// are setup then they're process wide.
self
.finish_with_cancellation(
snapshot,
packages,
root_node_modules_dir_path,
progress_bar,
kill_signal,
)
.await
}

async fn finish_with_cancellation(
self,
snapshot: &NpmResolutionSnapshot,
packages: &[NpmResolutionPackage],
root_node_modules_dir_path: &Path,
progress_bar: &ProgressBar,
kill_signal: KillSignal,
) -> Result<(), AnyError> {
self.warn_not_run_scripts()?;
let get_package_path =
Expand Down Expand Up @@ -246,6 +270,7 @@ impl<'a> LifecycleScripts<'a> {
stderr: TaskStdio::piped(),
stdout: TaskStdio::piped(),
}),
kill_signal: kill_signal.clone(),
},
)
.await?;
Expand Down
89 changes: 86 additions & 3 deletions cli/task_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use deno_runtime::deno_node::NodeResolver;
use deno_semver::package::PackageNv;
use deno_task_shell::ExecutableCommand;
use deno_task_shell::ExecuteResult;
use deno_task_shell::KillSignal;
use deno_task_shell::ShellCommand;
use deno_task_shell::ShellCommandContext;
use deno_task_shell::ShellPipeReader;
Expand All @@ -22,6 +23,7 @@ use lazy_regex::Lazy;
use regex::Regex;
use tokio::task::JoinHandle;
use tokio::task::LocalSet;
use tokio_util::sync::CancellationToken;

use crate::npm::CliNpmResolver;
use crate::npm::InnerCliNpmResolverRef;
Expand All @@ -45,9 +47,11 @@ impl TaskStdio {
pub fn stdout() -> Self {
Self(None, ShellPipeWriter::stdout())
}

pub fn stderr() -> Self {
Self(None, ShellPipeWriter::stderr())
}

pub fn piped() -> Self {
let (r, w) = deno_task_shell::pipe();
Self(Some(r), w)
Expand All @@ -62,8 +66,8 @@ pub struct TaskIo {
impl Default for TaskIo {
fn default() -> Self {
Self {
stderr: TaskStdio::stderr(),
stdout: TaskStdio::stdout(),
stderr: TaskStdio::stderr(),
}
}
}
Expand All @@ -78,6 +82,7 @@ pub struct RunTaskOptions<'a> {
pub custom_commands: HashMap<String, Rc<dyn ShellCommand>>,
pub root_node_modules_dir: Option<&'a Path>,
pub stdio: Option<TaskIo>,
pub kill_signal: KillSignal,
}

pub type TaskCustomCommands = HashMap<String, Rc<dyn ShellCommand>>;
Expand All @@ -96,8 +101,12 @@ pub async fn run_task(
.with_context(|| format!("Error parsing script '{}'.", opts.task_name))?;
let env_vars =
prepare_env_vars(opts.env_vars, opts.init_cwd, opts.root_node_modules_dir);
let state =
deno_task_shell::ShellState::new(env_vars, opts.cwd, opts.custom_commands);
let state = deno_task_shell::ShellState::new(
env_vars,
opts.cwd,
opts.custom_commands,
opts.kill_signal,
);
let stdio = opts.stdio.unwrap_or_default();
let (
TaskStdio(stdout_read, stdout_write),
Expand Down Expand Up @@ -537,6 +546,80 @@ fn resolve_managed_npm_commands(
Ok(result)
}

/// Runs a deno task future forwarding any signals received
/// to the process.
///
/// Signal listeners and ctrl+c listening will be setup.
pub async fn run_future_forwarding_signals<TOutput>(
kill_signal: KillSignal,
future: impl std::future::Future<Output = TOutput>,
) -> TOutput {
fn spawn_future_with_cancellation(
future: impl std::future::Future<Output = ()> + 'static,
token: CancellationToken,
) {
deno_core::unsync::spawn(async move {
tokio::select! {
_ = future => {}
_ = token.cancelled() => {}
}
});
}

let token = CancellationToken::new();
let _token_drop_guard = token.clone().drop_guard();
let _drop_guard = kill_signal.clone().drop_guard();

spawn_future_with_cancellation(
listen_ctrl_c(kill_signal.clone()),
token.clone(),
);
#[cfg(unix)]
spawn_future_with_cancellation(
listen_and_forward_all_signals(kill_signal),
token,
);

future.await
}

async fn listen_ctrl_c(kill_signal: KillSignal) {
while let Ok(()) = tokio::signal::ctrl_c().await {
kill_signal.send(deno_task_shell::SignalKind::SIGINT)
}
}

#[cfg(unix)]
async fn listen_and_forward_all_signals(kill_signal: KillSignal) {
use deno_core::futures::FutureExt;
use deno_runtime::signal::SIGNAL_NUMS;

// listen and forward every signal we support
let mut futures = Vec::with_capacity(SIGNAL_NUMS.len());
for signo in SIGNAL_NUMS.iter().copied() {
if signo == libc::SIGKILL || signo == libc::SIGSTOP {
continue; // skip, can't listen to these
}

let kill_signal = kill_signal.clone();
futures.push(
async move {
let Ok(mut stream) = tokio::signal::unix::signal(
tokio::signal::unix::SignalKind::from_raw(signo),
) else {
return;
};
let signal_kind: deno_task_shell::SignalKind = signo.into();
while let Some(()) = stream.recv().await {
kill_signal.send(signal_kind);
}
}
.boxed_local(),
)
}
futures::future::join_all(futures).await;
}

#[cfg(test)]
mod test {

Expand Down
Loading

0 comments on commit f624860

Please sign in to comment.