From 812929b1761355e2209ce33b3fc439d9b8b0d182 Mon Sep 17 00:00:00 2001 From: Dominic Burkart Date: Fri, 30 Jun 2023 16:44:33 +0200 Subject: [PATCH] feat(internal telemetry at shutdown): close internal sources after external ones (#17741) We would like to close the internal logs, metrics, and trace sources sent from Vector as late as possible during shutdown to facilitate debugging. In this PR, we wait until all other sources are shut down before shutting down internal telemetry sources. This means that shutdown may be a bit longer, but we will have better observability on the shutdown process. issue: https://github.com/vectordotdev/vector/issues/15912 --- lib/vector-common/src/shutdown.rs | 33 ++++++++++++++++++++----------- src/config/source.rs | 2 +- src/sources/socket/mod.rs | 2 +- src/sources/util/framestream.rs | 2 +- src/topology/builder.rs | 7 +++++-- 5 files changed, 30 insertions(+), 16 deletions(-) diff --git a/lib/vector-common/src/shutdown.rs b/lib/vector-common/src/shutdown.rs index 79f58978bd6e9..d1b35be5c6fb7 100644 --- a/lib/vector-common/src/shutdown.rs +++ b/lib/vector-common/src/shutdown.rs @@ -107,9 +107,11 @@ impl ShutdownSignal { } } +type IsInternal = bool; + #[derive(Debug, Default)] pub struct SourceShutdownCoordinator { - shutdown_begun_triggers: HashMap, + shutdown_begun_triggers: HashMap, shutdown_force_triggers: HashMap, shutdown_complete_tripwires: HashMap, } @@ -121,13 +123,14 @@ impl SourceShutdownCoordinator { pub fn register_source( &mut self, id: &ComponentKey, + internal: bool, ) -> (ShutdownSignal, impl Future) { let (shutdown_begun_trigger, shutdown_begun_tripwire) = Tripwire::new(); let (force_shutdown_trigger, force_shutdown_tripwire) = Tripwire::new(); let (shutdown_complete_trigger, shutdown_complete_tripwire) = Tripwire::new(); self.shutdown_begun_triggers - .insert(id.clone(), shutdown_begun_trigger); + .insert(id.clone(), (internal, shutdown_begun_trigger)); self.shutdown_force_triggers .insert(id.clone(), force_shutdown_trigger); self.shutdown_complete_tripwires @@ -201,13 +204,14 @@ impl SourceShutdownCoordinator { /// Panics if this coordinator has had its triggers removed (ie /// has been taken over with `Self::takeover_source`). pub fn shutdown_all(self, deadline: Option) -> impl Future { - let mut complete_futures = Vec::new(); + let mut internal_sources_complete_futures = Vec::new(); + let mut external_sources_complete_futures = Vec::new(); let shutdown_begun_triggers = self.shutdown_begun_triggers; let mut shutdown_complete_tripwires = self.shutdown_complete_tripwires; let mut shutdown_force_triggers = self.shutdown_force_triggers; - for (id, trigger) in shutdown_begun_triggers { + for (id, (internal, trigger)) in shutdown_begun_triggers { trigger.cancel(); let shutdown_complete_tripwire = @@ -229,10 +233,16 @@ impl SourceShutdownCoordinator { deadline, ); - complete_futures.push(source_complete); + if internal { + internal_sources_complete_futures.push(source_complete); + } else { + external_sources_complete_futures.push(source_complete); + } } - futures::future::join_all(complete_futures).map(|_| ()) + futures::future::join_all(external_sources_complete_futures) + .then(|_| futures::future::join_all(internal_sources_complete_futures)) + .map(|_| ()) } /// Sends the signal to the given source to begin shutting down. Returns a future that resolves @@ -250,11 +260,12 @@ impl SourceShutdownCoordinator { id: &ComponentKey, deadline: Instant, ) -> impl Future { - let begin_shutdown_trigger = self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| { - panic!( + let (_, begin_shutdown_trigger) = + self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| { + panic!( "shutdown_begun_trigger for source \"{id}\" not found in the ShutdownCoordinator" ) - }); + }); // This is what actually triggers the source to begin shutting down. begin_shutdown_trigger.cancel(); @@ -336,7 +347,7 @@ mod test { let mut shutdown = SourceShutdownCoordinator::default(); let id = ComponentKey::from("test"); - let (shutdown_signal, _) = shutdown.register_source(&id); + let (shutdown_signal, _) = shutdown.register_source(&id, false); let deadline = Instant::now() + Duration::from_secs(1); let shutdown_complete = shutdown.shutdown_source(&id, deadline); @@ -352,7 +363,7 @@ mod test { let mut shutdown = SourceShutdownCoordinator::default(); let id = ComponentKey::from("test"); - let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id); + let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id, false); let deadline = Instant::now() + Duration::from_secs(1); let shutdown_complete = shutdown.shutdown_source(&id, deadline); diff --git a/src/config/source.rs b/src/config/source.rs index 1353c18c05dc4..5e53ebbad1725 100644 --- a/src/config/source.rs +++ b/src/config/source.rs @@ -143,7 +143,7 @@ impl SourceContext { out: SourceSender, ) -> (Self, crate::shutdown::SourceShutdownCoordinator) { let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default(); - let (shutdown_signal, _) = shutdown.register_source(key); + let (shutdown_signal, _) = shutdown.register_source(key, false); ( Self { key: key.clone(), diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 58ef30c3fcf99..d381b65cecfdd 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -872,7 +872,7 @@ mod test { source_id: &ComponentKey, shutdown: &mut SourceShutdownCoordinator, ) -> (SocketAddr, JoinHandle>) { - let (shutdown_signal, _) = shutdown.register_source(source_id); + let (shutdown_signal, _) = shutdown.register_source(source_id, false); init_udp_inner(sender, source_id, shutdown_signal, None, false).await } diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 194dd48432074..8bae468a85241 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -728,7 +728,7 @@ mod test { let source_id = ComponentKey::from(source_id); let socket_path = frame_handler.socket_path(); let mut shutdown = SourceShutdownCoordinator::default(); - let (shutdown_signal, _) = shutdown.register_source(&source_id); + let (shutdown_signal, _) = shutdown.register_source(&source_id, false); let server = build_framestream_unix_source(frame_handler, shutdown_signal, pipeline) .expect("Failed to build framestream unix source."); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index b7ace14acd57b..250754aeddfae 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -71,6 +71,8 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy = Lazy::new(|| { .unwrap_or_else(crate::num_threads) }); +const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"]; + /// Builds only the new pieces, and doesn't check their topology. pub async fn build_pieces( config: &super::Config, @@ -313,8 +315,9 @@ impl<'a> Builder<'a> { let pipeline = builder.build(); - let (shutdown_signal, force_shutdown_tripwire) = - self.shutdown_coordinator.register_source(key); + let (shutdown_signal, force_shutdown_tripwire) = self + .shutdown_coordinator + .register_source(key, INTERNAL_SOURCES.contains(&typetag)); let context = SourceContext { key: key.clone(),