From 45f7fa4d73d8b7d68841fb1317779909cfc21aa2 Mon Sep 17 00:00:00 2001 From: Carlo Kok Date: Fri, 5 Jan 2024 21:17:06 +0100 Subject: [PATCH 1/5] feat(up): Spawn multiple trigger commands Signed-off-by: Carlo Kok --- crates/app/src/lib.rs | 17 +++++++++++ crates/http/src/trigger.rs | 2 -- crates/redis/src/lib.rs | 14 +++++++-- crates/trigger-http/src/lib.rs | 13 ++++++-- crates/trigger/src/lib.rs | 4 +++ examples/spin-timer/src/lib.rs | 2 +- src/commands/up.rs | 54 +++++++++++++++++++++------------- src/commands/up/app_source.rs | 5 ++-- 8 files changed, 81 insertions(+), 30 deletions(-) diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index e030d7b824..2ac39ce040 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -207,6 +207,23 @@ impl<'a, L> App<'a, L> { .map(|locked| AppTrigger { app: self, locked }) } + /// Returns the trigger metadata for a specific trigger type. + pub fn get_trigger_metadata<'this, T: Deserialize<'this> + Default>( + &'this self, + trigger_type: &'a str, + ) -> Result> { + self.locked.metadata.get("triggers").map_or(Ok(None), |t| { + t.get(trigger_type) + .map(T::deserialize) + .transpose() + .map_err(|err| { + Error::MetadataError(format!( + "invalid metadata value for {trigger_type:?}: {err:?}" + )) + }) + }) + } + /// Returns an iterator of [`AppTrigger`]s defined for this app with /// the given `trigger_type`. pub fn triggers_with_type( diff --git a/crates/http/src/trigger.rs b/crates/http/src/trigger.rs index a8c713d00d..ca53b33544 100644 --- a/crates/http/src/trigger.rs +++ b/crates/http/src/trigger.rs @@ -7,8 +7,6 @@ pub const METADATA_KEY: MetadataKey = MetadataKey::new("trigger"); #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(deny_unknown_fields)] pub struct Metadata { - // The type of trigger which should always been "http" in this case - pub r#type: String, // The based url #[serde(default = "default_base")] pub base: String, diff --git a/crates/redis/src/lib.rs b/crates/redis/src/lib.rs index 3fb789098f..44850b24b3 100644 --- a/crates/redis/src/lib.rs +++ b/crates/redis/src/lib.rs @@ -44,7 +44,6 @@ pub struct RedisTriggerConfig { #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(deny_unknown_fields)] struct TriggerMetadata { - r#type: String, address: String, } @@ -56,7 +55,18 @@ impl TriggerExecutor for RedisTrigger { type RunConfig = NoArgs; async fn new(engine: TriggerAppEngine) -> Result { - let address = engine.app().require_metadata(TRIGGER_METADATA_KEY)?.address; + let address = engine + .app() + .get_metadata(TRIGGER_METADATA_KEY)? + .map_or_else( + || { + engine + .trigger_metadata::() + .unwrap_or_default() + .map_or(String::new(), |f| f.address) + }, + |t| t.address, + ); let mut channel_components: HashMap> = HashMap::new(); diff --git a/crates/trigger-http/src/lib.rs b/crates/trigger-http/src/lib.rs index ddabf57afe..55d05ecd34 100644 --- a/crates/trigger-http/src/lib.rs +++ b/crates/trigger-http/src/lib.rs @@ -96,8 +96,17 @@ impl TriggerExecutor for HttpTrigger { async fn new(engine: TriggerAppEngine) -> Result { let mut base = engine .app() - .require_metadata(spin_http::trigger::METADATA_KEY)? - .base; + .get_metadata(spin_http::trigger::METADATA_KEY)? + .map_or_else( + || { + engine + .trigger_metadata::() + .unwrap_or_default() + .map_or(String::new(), |f| f.base) + }, + |t| t.base, + ); + if !base.starts_with('/') { base = format!("/{base}"); } diff --git a/crates/trigger/src/lib.rs b/crates/trigger/src/lib.rs index 83897e16b6..058a132941 100644 --- a/crates/trigger/src/lib.rs +++ b/crates/trigger/src/lib.rs @@ -287,6 +287,10 @@ impl TriggerAppEngine { self.app.borrowed() } + pub fn trigger_metadata(&self) -> spin_app::Result> { + self.app().get_trigger_metadata(Executor::TRIGGER_TYPE) + } + /// Returns AppTriggers and typed TriggerConfigs for this executor type. pub fn trigger_configs(&self) -> impl Iterator { self.app() diff --git a/examples/spin-timer/src/lib.rs b/examples/spin-timer/src/lib.rs index 729a7c3df8..1177c36e27 100644 --- a/examples/spin-timer/src/lib.rs +++ b/examples/spin-timer/src/lib.rs @@ -45,7 +45,7 @@ pub struct TimerTriggerConfig { interval_secs: u64, } -const TRIGGER_METADATA_KEY: MetadataKey = MetadataKey::new("trigger"); +const TRIGGER_METADATA_KEY: MetadataKey = MetadataKey::new("triggers"); #[async_trait] impl TriggerExecutor for TimerTrigger { diff --git a/src/commands/up.rs b/src/commands/up.rs index 046ac06b27..06376b809b 100644 --- a/src/commands/up.rs +++ b/src/commands/up.rs @@ -16,6 +16,8 @@ use spin_oci::OciLoader; use spin_trigger::cli::{SPIN_LOCAL_APP_DIR, SPIN_LOCKED_URL, SPIN_WORKING_DIR}; use tempfile::TempDir; +use futures::future::try_join_all; + use crate::opts::*; use self::app_source::{AppSource, ResolvedAppSource}; @@ -154,7 +156,11 @@ impl UpCommand { .with_context(|| format!("Couldn't find trigger executor for {app_source}"))?; if self.help { - return self.run_trigger(trigger_cmd, None).await; + for cmd in trigger_cmd + { + self.run_trigger(cmd.clone(), None).await?; + } + return Ok(()); } let mut locked_app = self @@ -165,13 +171,16 @@ impl UpCommand { let local_app_dir = app_source.local_app_dir().map(Into::into); - let run_opts = RunTriggerOpts { - locked_app, - working_dir, - local_app_dir, - }; + try_join_all(trigger_cmd.iter().map(|cmd| { + let run_opts = RunTriggerOpts { + locked_app: locked_app.clone(), + working_dir: working_dir.clone(), + local_app_dir: local_app_dir.clone(), + }; - self.run_trigger(trigger_cmd, Some(run_opts)).await + self.run_trigger(cmd.clone(), Some(run_opts)) + })) + .await.map(|_| ()) } fn get_canonical_working_dir(&self) -> Result { @@ -191,13 +200,14 @@ impl UpCommand { } async fn run_trigger( - self, + &self, trigger_cmd: Vec, opts: Option, ) -> Result<(), anyhow::Error> { // The docs for `current_exe` warn that this may be insecure because it could be executed // via hard-link. I think it should be fine as long as we aren't `setuid`ing this binary. - let mut cmd = std::process::Command::new(std::env::current_exe().unwrap()); + + let mut cmd = tokio::process::Command::new(std::env::current_exe().unwrap()); cmd.args(&trigger_cmd); if let Some(RunTriggerOpts { @@ -235,7 +245,7 @@ impl UpCommand { })?; } - let status = child.wait()?; + let status = child.wait().await?; if status.success() { Ok(()) } else { @@ -424,16 +434,20 @@ fn trigger_command(trigger_type: &str) -> Vec { vec!["trigger".to_owned(), trigger_type.to_owned()] } -fn trigger_command_for_resolved_app_source(resolved: &ResolvedAppSource) -> Result> { - let trigger_type = resolved.trigger_type()?; - - match trigger_type { - "http" | "redis" => Ok(trigger_command(trigger_type)), - _ => { - let cmd = resolve_trigger_plugin(trigger_type)?; - Ok(vec![cmd]) - } - } +fn trigger_command_for_resolved_app_source( + resolved: &ResolvedAppSource, +) -> Result>> { + let trigger_type = resolved.trigger_types()?; + trigger_type + .iter() + .map(|&t| match t { + "http" | "redis" => Ok(trigger_command(t)), + _ => { + let cmd = resolve_trigger_plugin(t)?; + Ok(vec![cmd]) + } + }) + .collect() } #[cfg(test)] diff --git a/src/commands/up/app_source.rs b/src/commands/up/app_source.rs index 3a6b7ae23f..88431ce76f 100644 --- a/src/commands/up/app_source.rs +++ b/src/commands/up/app_source.rs @@ -83,7 +83,7 @@ pub enum ResolvedAppSource { } impl ResolvedAppSource { - pub fn trigger_type(&self) -> anyhow::Result<&str> { + pub fn trigger_types(&self) -> anyhow::Result> { let types = match self { ResolvedAppSource::File { manifest, .. } => { manifest.triggers.keys().collect::>() @@ -96,7 +96,6 @@ impl ResolvedAppSource { }; ensure!(!types.is_empty(), "no triggers in app"); - ensure!(types.len() == 1, "multiple trigger types not yet supported"); - Ok(types.into_iter().next().unwrap()) + Ok(types.into_iter().map(|t| t.as_str()).collect()) } } From 9b0ce7d8e8b64ed7416ee5df860ea2a83b076ccd Mon Sep 17 00:00:00 2001 From: Carlo Kok Date: Thu, 11 Jan 2024 08:59:39 +0100 Subject: [PATCH 2/5] fix(up): Linting issue on non-Windows platform only Signed-off-by: Carlo Kok --- src/commands/up.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commands/up.rs b/src/commands/up.rs index 06376b809b..e6fb40f7bf 100644 --- a/src/commands/up.rs +++ b/src/commands/up.rs @@ -237,7 +237,7 @@ impl UpCommand { #[cfg(not(windows))] { // https://github.com/nix-rust/nix/issues/656 - let pid = nix::unistd::Pid::from_raw(child.id() as i32); + let pid = nix::unistd::Pid::from_raw(child.id() as u32); ctrlc::set_handler(move || { if let Err(err) = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM) { tracing::warn!("Failed to kill trigger handler process: {:?}", err) From 3efbc04d9226d15a0c8597d52eb36e497d226164 Mon Sep 17 00:00:00 2001 From: itowlson Date: Wed, 10 Jan 2024 15:16:31 +1300 Subject: [PATCH 3/5] Update trigger timer to multi-friendly lockfile layout Signed-off-by: itowlson --- examples/spin-timer/src/lib.rs | 13 ++++++++++--- examples/spin-timer/trigger-timer.json | 2 +- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/examples/spin-timer/src/lib.rs b/examples/spin-timer/src/lib.rs index 1177c36e27..b7750d0ab0 100644 --- a/examples/spin-timer/src/lib.rs +++ b/examples/spin-timer/src/lib.rs @@ -29,11 +29,16 @@ pub struct TimerTrigger { component_timings: HashMap, } -// Application settings (raw serialization format) +// Picks out the timer entry from the application-level trigger settings +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +struct TriggerMetadataParent { + timer: Option, +} + +// Application-level settings (raw serialization format) #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(deny_unknown_fields)] struct TriggerMetadata { - r#type: String, speedup: Option, } @@ -45,7 +50,7 @@ pub struct TimerTriggerConfig { interval_secs: u64, } -const TRIGGER_METADATA_KEY: MetadataKey = MetadataKey::new("triggers"); +const TRIGGER_METADATA_KEY: MetadataKey = MetadataKey::new("triggers"); #[async_trait] impl TriggerExecutor for TimerTrigger { @@ -61,6 +66,8 @@ impl TriggerExecutor for TimerTrigger { let speedup = engine .app() .require_metadata(TRIGGER_METADATA_KEY)? + .timer + .unwrap_or_default() .speedup .unwrap_or(1); diff --git a/examples/spin-timer/trigger-timer.json b/examples/spin-timer/trigger-timer.json index b55d431913..78f7186579 100644 --- a/examples/spin-timer/trigger-timer.json +++ b/examples/spin-timer/trigger-timer.json @@ -3,7 +3,7 @@ "description": "Run Spin components at timed intervals", "homepage": "https://github.com/fermyon/spin/tree/main/examples/spin-timer", "version": "0.1.0", - "spinCompatibility": ">=2.0", + "spinCompatibility": ">=2.2", "license": "Apache-2.0", "packages": [ { From 71c35c6fee7434e8074975869185c3f78599d4bf Mon Sep 17 00:00:00 2001 From: itowlson Date: Fri, 12 Jan 2024 13:14:04 +1300 Subject: [PATCH 4/5] Handle Ctrl+C and other exit scenarios in multi-trigger Signed-off-by: itowlson --- src/commands/up.rs | 129 +++++++++++++++++++++++++++++---------------- 1 file changed, 85 insertions(+), 44 deletions(-) diff --git a/src/commands/up.rs b/src/commands/up.rs index e6fb40f7bf..2cbd818149 100644 --- a/src/commands/up.rs +++ b/src/commands/up.rs @@ -8,6 +8,7 @@ use std::{ use anyhow::{anyhow, bail, Context, Result}; use clap::{CommandFactory, Parser}; +use itertools::Itertools; use reqwest::Url; use spin_app::locked::LockedApp; use spin_common::ui::quoted_path; @@ -16,7 +17,7 @@ use spin_oci::OciLoader; use spin_trigger::cli::{SPIN_LOCAL_APP_DIR, SPIN_LOCKED_URL, SPIN_WORKING_DIR}; use tempfile::TempDir; -use futures::future::try_join_all; +use futures::StreamExt; use crate::opts::*; @@ -130,9 +131,11 @@ impl UpCommand { if app_source == AppSource::None { if self.help { - return self - .run_trigger(trigger_command(HELP_ARGS_ONLY_TRIGGER_TYPE), None) - .await; + let mut child = self + .start_trigger(trigger_command(HELP_ARGS_ONLY_TRIGGER_TYPE), None) + .await?; + let _ = child.wait().await?; + return Ok(()); } else { bail!("Default file '{DEFAULT_MANIFEST_FILE}' not found. Run `spin up --from `, or `spin up --help` for usage."); } @@ -152,13 +155,13 @@ impl UpCommand { let resolved_app_source = self.resolve_app_source(&app_source, &working_dir).await?; - let trigger_cmd = trigger_command_for_resolved_app_source(&resolved_app_source) + let trigger_cmds = trigger_command_for_resolved_app_source(&resolved_app_source) .with_context(|| format!("Couldn't find trigger executor for {app_source}"))?; if self.help { - for cmd in trigger_cmd - { - self.run_trigger(cmd.clone(), None).await?; + for cmd in trigger_cmds { + let mut help_process = self.start_trigger(cmd.clone(), None).await?; + _ = help_process.wait().await; } return Ok(()); } @@ -168,19 +171,35 @@ impl UpCommand { .await?; self.update_locked_app(&mut locked_app); + let locked_url = self.write_locked_app(&locked_app, &working_dir).await?; let local_app_dir = app_source.local_app_dir().map(Into::into); - try_join_all(trigger_cmd.iter().map(|cmd| { - let run_opts = RunTriggerOpts { - locked_app: locked_app.clone(), - working_dir: working_dir.clone(), - local_app_dir: local_app_dir.clone(), - }; + let run_opts = RunTriggerOpts { + locked_url, + working_dir, + local_app_dir, + }; + + let mut trigger_processes = self.start_trigger_processes(trigger_cmds, run_opts).await?; + + set_kill_on_ctrl_c(&trigger_processes)?; + + let mut trigger_tasks = trigger_processes + .iter_mut() + .map(|ch| ch.wait()) + .collect::>(); + + let first_to_finish = trigger_tasks.next().await; - self.run_trigger(cmd.clone(), Some(run_opts)) - })) - .await.map(|_| ()) + if let Some(process_result) = first_to_finish { + let status = process_result?; + if !status.success() { + return Err(crate::subprocess::ExitStatusError::new(status).into()); + } + } + + Ok(()) } fn get_canonical_working_dir(&self) -> Result { @@ -199,25 +218,40 @@ impl UpCommand { Ok(working_dir_holder) } - async fn run_trigger( + async fn start_trigger_processes( + self, + trigger_cmds: Vec>, + run_opts: RunTriggerOpts, + ) -> anyhow::Result> { + let mut trigger_processes = Vec::with_capacity(trigger_cmds.len()); + + for cmd in trigger_cmds { + let child = self + .start_trigger(cmd.clone(), Some(run_opts.clone())) + .await + .context("Failed to start trigger process")?; + trigger_processes.push(child); + } + + Ok(trigger_processes) + } + + async fn start_trigger( &self, trigger_cmd: Vec, opts: Option, - ) -> Result<(), anyhow::Error> { + ) -> Result { // The docs for `current_exe` warn that this may be insecure because it could be executed // via hard-link. I think it should be fine as long as we aren't `setuid`ing this binary. - let mut cmd = tokio::process::Command::new(std::env::current_exe().unwrap()); cmd.args(&trigger_cmd); if let Some(RunTriggerOpts { - locked_app, + locked_url, working_dir, local_app_dir, }) = opts { - let locked_url = self.write_locked_app(&locked_app, &working_dir).await?; - cmd.env(SPIN_LOCKED_URL, locked_url) .env(SPIN_WORKING_DIR, &working_dir) .args(&self.trigger_args); @@ -225,32 +259,16 @@ impl UpCommand { if let Some(local_app_dir) = local_app_dir { cmd.env(SPIN_LOCAL_APP_DIR, local_app_dir); } + + cmd.kill_on_drop(true); } else { cmd.arg("--help-args-only"); } tracing::trace!("Running trigger executor: {:?}", cmd); - let mut child = cmd.spawn().context("Failed to execute trigger")?; - - // Terminate trigger executor if `spin up` itself receives a termination signal - #[cfg(not(windows))] - { - // https://github.com/nix-rust/nix/issues/656 - let pid = nix::unistd::Pid::from_raw(child.id() as u32); - ctrlc::set_handler(move || { - if let Err(err) = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM) { - tracing::warn!("Failed to kill trigger handler process: {:?}", err) - } - })?; - } - - let status = child.wait().await?; - if status.success() { - Ok(()) - } else { - Err(crate::subprocess::ExitStatusError::new(status).into()) - } + let child = cmd.spawn().context("Failed to execute trigger")?; + Ok(child) } fn app_source(&self) -> AppSource { @@ -368,8 +386,31 @@ impl UpCommand { } } +#[cfg(windows)] +fn set_kill_on_ctrl_c(trigger_processes: &Vec) -> Result<(), anyhow::Error> { + Ok(()) +} + +#[cfg(not(windows))] +fn set_kill_on_ctrl_c(trigger_processes: &[tokio::process::Child]) -> Result<(), anyhow::Error> { + // https://github.com/nix-rust/nix/issues/656 + let pids = trigger_processes + .iter() + .flat_map(|child| child.id().map(|id| nix::unistd::Pid::from_raw(id as i32))) + .collect_vec(); + ctrlc::set_handler(move || { + for pid in &pids { + if let Err(err) = nix::sys::signal::kill(*pid, nix::sys::signal::SIGTERM) { + tracing::warn!("Failed to kill trigger handler process: {:?}", err) + } + } + })?; + Ok(()) +} + +#[derive(Clone)] struct RunTriggerOpts { - locked_app: LockedApp, + locked_url: String, working_dir: PathBuf, local_app_dir: Option, } From b8490dfa76350ba60c490e3bf6fbbfcc7c2a7145 Mon Sep 17 00:00:00 2001 From: Lann Martin Date: Fri, 12 Jan 2024 09:33:43 -0500 Subject: [PATCH 5/5] app: Move backward-compat logic into get_trigger_metadata Signed-off-by: Lann Martin --- crates/app/src/lib.rs | 34 ++++++++++++++++++++++++---------- crates/redis/src/lib.rs | 17 +++-------------- crates/trigger-http/src/lib.rs | 14 +++----------- 3 files changed, 30 insertions(+), 35 deletions(-) diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index 2ac39ce040..1fbc7cdf7d 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -7,6 +7,7 @@ #![deny(missing_docs)] mod host_component; +use serde_json::Value; pub use spin_locked_app::locked; pub use spin_locked_app::values; pub use spin_locked_app::{Error, MetadataKey, Result}; @@ -212,16 +213,29 @@ impl<'a, L> App<'a, L> { &'this self, trigger_type: &'a str, ) -> Result> { - self.locked.metadata.get("triggers").map_or(Ok(None), |t| { - t.get(trigger_type) - .map(T::deserialize) - .transpose() - .map_err(|err| { - Error::MetadataError(format!( - "invalid metadata value for {trigger_type:?}: {err:?}" - )) - }) - }) + let Some(value) = self.get_trigger_metadata_value(trigger_type) else { + return Ok(None); + }; + let metadata = T::deserialize(value).map_err(|err| { + Error::MetadataError(format!( + "invalid metadata value for {trigger_type:?}: {err:?}" + )) + })?; + Ok(Some(metadata)) + } + + fn get_trigger_metadata_value(&self, trigger_type: &str) -> Option { + if let Some(trigger_configs) = self.locked.metadata.get("triggers") { + // New-style: `{"triggers": {"": {...}}}` + trigger_configs.get(trigger_type).cloned() + } else if self.locked.metadata["trigger"]["type"] == trigger_type { + // Old-style: `{"trigger": {"type": "", ...}}` + let mut meta = self.locked.metadata["trigger"].clone(); + meta.as_object_mut().unwrap().remove("type"); + Some(meta) + } else { + None + } } /// Returns an iterator of [`AppTrigger`]s defined for this app with diff --git a/crates/redis/src/lib.rs b/crates/redis/src/lib.rs index 44850b24b3..10d032b908 100644 --- a/crates/redis/src/lib.rs +++ b/crates/redis/src/lib.rs @@ -8,14 +8,11 @@ use anyhow::{anyhow, Context, Result}; use futures::{future::join_all, StreamExt}; use redis::{Client, ConnectionLike}; use serde::{de::IgnoredAny, Deserialize, Serialize}; -use spin_app::MetadataKey; use spin_core::async_trait; use spin_trigger::{cli::NoArgs, TriggerAppEngine, TriggerExecutor}; use crate::spin::SpinRedisExecutor; -const TRIGGER_METADATA_KEY: MetadataKey = MetadataKey::new("trigger"); - pub(crate) type RuntimeData = (); pub(crate) type Store = spin_core::Store; @@ -56,17 +53,9 @@ impl TriggerExecutor for RedisTrigger { async fn new(engine: TriggerAppEngine) -> Result { let address = engine - .app() - .get_metadata(TRIGGER_METADATA_KEY)? - .map_or_else( - || { - engine - .trigger_metadata::() - .unwrap_or_default() - .map_or(String::new(), |f| f.address) - }, - |t| t.address, - ); + .trigger_metadata::()? + .unwrap_or_default() + .address; let mut channel_components: HashMap> = HashMap::new(); diff --git a/crates/trigger-http/src/lib.rs b/crates/trigger-http/src/lib.rs index 55d05ecd34..a414d6a765 100644 --- a/crates/trigger-http/src/lib.rs +++ b/crates/trigger-http/src/lib.rs @@ -95,17 +95,9 @@ impl TriggerExecutor for HttpTrigger { async fn new(engine: TriggerAppEngine) -> Result { let mut base = engine - .app() - .get_metadata(spin_http::trigger::METADATA_KEY)? - .map_or_else( - || { - engine - .trigger_metadata::() - .unwrap_or_default() - .map_or(String::new(), |f| f.base) - }, - |t| t.base, - ); + .trigger_metadata::()? + .unwrap_or_default() + .base; if !base.starts_with('/') { base = format!("/{base}");