From 151dacf5b8b50afd3686eb4022ee487d2521a474 Mon Sep 17 00:00:00 2001 From: Lee Benson Date: Wed, 12 May 2021 10:47:31 +0100 Subject: [PATCH] feat(config): Initial `[provider]` implementation - `http` (#7166) * provider init Signed-off-by: Lee Benson * http wip Signed-off-by: Lee Benson * Update validation.rs * wip Signed-off-by: Lee Benson * wip * wip * wip Signed-off-by: Lee Benson * sources finished Signed-off-by: Lee Benson * provider reload Signed-off-by: Lee Benson * commentary Signed-off-by: Lee Benson * reload event Signed-off-by: Lee Benson * polling Signed-off-by: Lee Benson * logs * compile Signed-off-by: Lee Benson * fix error Signed-off-by: Lee Benson * clippy Signed-off-by: Lee Benson * more comments Signed-off-by: Lee Benson * Update Cargo.lock * fix url serde Signed-off-by: Lee Benson * fix event messages Signed-off-by: Lee Benson * fix default features Signed-off-by: Lee Benson * receiverstream Signed-off-by: Lee Benson * providerS Signed-off-by: Lee Benson * feature Signed-off-by: Lee Benson * fix windows signals Signed-off-by: Lee Benson * remove providers feature Signed-off-by: Lee Benson * wip * provider refactor Signed-off-by: Lee Benson * shutdown Signed-off-by: Lee Benson * broadcast Signed-off-by: Lee Benson * remove control Signed-off-by: Lee Benson * tweaks Signed-off-by: Lee Benson * -option Signed-off-by: Lee Benson * comment cleanup Signed-off-by: Lee Benson * fix merge Signed-off-by: Lee Benson * wip * Revert "wip" This reverts commit b62d9ec0c2c0936c6441c0213cc5a3476cc27511. * configbuilder Signed-off-by: Lee Benson * signal_rx Signed-off-by: Lee Benson * bytes Signed-off-by: Lee Benson * signal handling Signed-off-by: Lee Benson --- Cargo.lock | 1 + Cargo.toml | 2 +- src/app.rs | 128 +++++++++++++++++--------- src/config/builder.rs | 8 +- src/config/loading.rs | 30 ++++++- src/config/mod.rs | 6 +- src/config/provider.rs | 49 ++++++++++ src/config/validation.rs | 13 +++ src/lib.rs | 1 + src/providers/http.rs | 188 +++++++++++++++++++++++++++++++++++++++ src/providers/mod.rs | 6 ++ src/signal.rs | 114 ++++++++++++++++++++++-- 12 files changed, 489 insertions(+), 57 deletions(-) create mode 100644 src/config/provider.rs create mode 100644 src/providers/http.rs create mode 100644 src/providers/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 5a60b25f10d33..2434fbefb1cc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7593,6 +7593,7 @@ dependencies = [ "idna", "matches", "percent-encoding", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b6de227e72c28..0ffb8cf58d30b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -241,7 +241,7 @@ tokio-postgres = { version = "0.7.2", default-features = false, features = ["run toml = { version = "0.5.8", default-features = false } typetag = { version = "0.1.7", default-features = false } twox-hash = { version = "1.6.0", default-features = false } -url = { version = "2.2.2", default-features = false } +url = { version = "2.2.2", default-features = false, features = ["serde"] } uuid = { version = "0.8.2", default-features = false, features = ["serde", "v4"], optional = true } warp = { version = "0.3.1", default-features = false, optional = true } zstd = { version = "0.6", default-features = false } diff --git a/src/app.rs b/src/app.rs index 2cc9a2d2827c4..3bf8004dd421d 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,19 +1,17 @@ use crate::{ cli::{handle_config_errors, Color, LogFormat, Opts, RootOpts, SubCommand}, - config, generate, heartbeat, list, metrics, signal, - signal::SignalTo, + config, generate, heartbeat, list, metrics, + signal::{self, SignalTo}, topology::{self, RunningTopology}, trace, unit_test, validate, }; use cfg_if::cfg_if; +use futures::StreamExt; use std::{collections::HashMap, path::PathBuf}; - use tokio::{ runtime::{self, Runtime}, sync::mpsc, }; - -use futures::StreamExt; use tokio_stream::wrappers::UnboundedReceiverStream; #[cfg(feature = "sources-host_metrics")] @@ -39,6 +37,8 @@ pub struct ApplicationConfig { pub graceful_crash: mpsc::UnboundedReceiver<()>, #[cfg(feature = "api")] pub api: config::api::Options, + pub signal_handler: signal::SignalHandler, + pub signal_rx: signal::SignalRx, } pub struct Application { @@ -110,6 +110,10 @@ impl Application { let require_healthy = root_opts.require_healthy; rt.block_on(async move { + // Signal handler for OS and provider messages. + let (mut signal_handler, signal_rx) = signal::SignalHandler::new(); + signal_handler.forever(signal::os_signals()); + if let Some(s) = sub_command { let code = match s { SubCommand::Validate(v) => validate::validate(&v, color).await, @@ -153,7 +157,9 @@ impl Application { config::init_log_schema(&config_paths, true).map_err(handle_config_errors)?; let mut config = - config::load_from_paths(&config_paths).map_err(handle_config_errors)?; + config::load_from_paths_with_provider(&config_paths, &mut signal_handler) + .await + .map_err(handle_config_errors)?; if !config.healthchecks.enabled { info!("Health checks are disabled."); @@ -177,6 +183,8 @@ impl Application { graceful_crash, #[cfg(feature = "api")] api, + signal_handler, + signal_rx, }) }) }?; @@ -201,6 +209,9 @@ impl Application { #[cfg(feature = "api")] let api_config = self.config.api; + let mut signal_handler = self.config.signal_handler; + let mut signal_rx = self.config.signal_rx; + // Any internal_logs sources will have grabbed a copy of the // early buffer by this point and set up a subscriber. crate::trace::stop_buffering(); @@ -209,7 +220,7 @@ impl Application { emit!(VectorStarted); tokio::spawn(heartbeat::heartbeat()); - // Using cfg_if flattens nesting. + // Configure the API server, if applicable. cfg_if! ( if #[cfg(feature = "api")] { // Assigned to prevent the API terminating when falling out of scope. @@ -227,48 +238,81 @@ impl Application { } ); - let signals = signal::signals(); - tokio::pin!(signals); let mut sources_finished = topology.sources_finished(); let signal = loop { tokio::select! { - Some(signal) = signals.next() => { - if signal == SignalTo::Reload { - // Reload paths - config_paths = config::process_paths(&opts.config_paths_with_formats()).unwrap_or(config_paths); - // Reload config - let new_config = config::load_from_paths(&config_paths).map_err(handle_config_errors).ok(); - - if let Some(mut new_config) = new_config { - new_config.healthchecks.set_require_healthy(opts.require_healthy); - match topology - .reload_config_and_respawn(new_config) - .await - { - Ok(true) => { - #[cfg(feature = "api")] - // Pass the new config to the API server. - if let Some(ref api_server) = api_server { - api_server.update_config(topology.config()); + Some(signal) = signal_rx.recv() => { + match signal { + SignalTo::ReloadFromConfigBuilder(config_builder) => { + match config_builder.build().map_err(handle_config_errors) { + Ok(mut new_config) => { + new_config.healthchecks.set_require_healthy(opts.require_healthy); + match topology + .reload_config_and_respawn(new_config) + .await + { + Ok(true) => { + #[cfg(feature = "api")] + // Pass the new config to the API server. + if let Some(ref api_server) = api_server { + api_server.update_config(topology.config()); + } + + emit!(VectorReloaded { config_paths: &config_paths }) + }, + Ok(false) => emit!(VectorReloadFailed), + // Trigger graceful shutdown for what remains of the topology + Err(()) => { + emit!(VectorReloadFailed); + emit!(VectorRecoveryFailed); + break SignalTo::Shutdown; + } } - - emit!(VectorReloaded { config_paths: &config_paths }) + sources_finished = topology.sources_finished(); }, - Ok(false) => emit!(VectorReloadFailed), - // Trigger graceful shutdown for what remains of the topology - Err(()) => { - emit!(VectorReloadFailed); - emit!(VectorRecoveryFailed); - break SignalTo::Shutdown; + Err(_) => { + emit!(VectorConfigLoadFailed); + } + } + } + SignalTo::ReloadFromDisk => { + // Reload paths + config_paths = config::process_paths(&opts.config_paths_with_formats()).unwrap_or(config_paths); + // Reload config + let new_config = config::load_from_paths_with_provider(&config_paths, &mut signal_handler) + .await + .map_err(handle_config_errors).ok(); + + if let Some(mut new_config) = new_config { + new_config.healthchecks.set_require_healthy(opts.require_healthy); + match topology + .reload_config_and_respawn(new_config) + .await + { + Ok(true) => { + #[cfg(feature = "api")] + // Pass the new config to the API server. + if let Some(ref api_server) = api_server { + api_server.update_config(topology.config()); + } + + emit!(VectorReloaded { config_paths: &config_paths }) + }, + Ok(false) => emit!(VectorReloadFailed), + // Trigger graceful shutdown for what remains of the topology + Err(()) => { + emit!(VectorReloadFailed); + emit!(VectorRecoveryFailed); + break SignalTo::Shutdown; + } } + sources_finished = topology.sources_finished(); + } else { + emit!(VectorConfigLoadFailed); } - sources_finished = topology.sources_finished(); - } else { - emit!(VectorConfigLoadFailed); } - } else { - break signal; + _ => break signal, } } // Trigger graceful shutdown if a component crashed, or all sources have ended. @@ -283,7 +327,7 @@ impl Application { emit!(VectorStopped); tokio::select! { _ = topology.stop() => (), // Graceful shutdown finished - _ = signals.next() => { + _ = signal_rx.recv() => { // It is highly unlikely that this event will exit from topology. emit!(VectorQuit); // Dropping the shutdown future will immediately shut the server down @@ -295,7 +339,7 @@ impl Application { emit!(VectorQuit); drop(topology); } - SignalTo::Reload => unreachable!(), + _ => unreachable!(), } }); } diff --git a/src/config/builder.rs b/src/config/builder.rs index 05dea5b71cef7..e1c144481e6fc 100644 --- a/src/config/builder.rs +++ b/src/config/builder.rs @@ -1,8 +1,8 @@ #[cfg(feature = "api")] use super::api; use super::{ - compiler, default_data_dir, Config, GlobalOptions, HealthcheckOptions, SinkConfig, SinkOuter, - SourceConfig, TestDefinition, TransformConfig, TransformOuter, + compiler, default_data_dir, provider, Config, GlobalOptions, HealthcheckOptions, SinkConfig, + SinkOuter, SourceConfig, TestDefinition, TransformConfig, TransformOuter, }; use indexmap::IndexMap; use serde::{Deserialize, Serialize}; @@ -25,6 +25,7 @@ pub struct ConfigBuilder { pub transforms: IndexMap, #[serde(default)] pub tests: Vec, + pub provider: Option>, } impl Clone for ConfigBuilder { @@ -49,6 +50,7 @@ impl From for ConfigBuilder { sources: c.sources, sinks: c.sinks, transforms: c.transforms, + provider: None, tests: c.tests, } } @@ -108,6 +110,8 @@ impl ConfigBuilder { errors.push(error); } + self.provider = with.provider; + if self.global.data_dir.is_none() || self.global.data_dir == default_data_dir() { self.global.data_dir = with.global.data_dir; } else if with.global.data_dir != default_data_dir() diff --git a/src/config/loading.rs b/src/config/loading.rs index 349a77cfd7c44..3cb384c3d6ba4 100644 --- a/src/config/loading.rs +++ b/src/config/loading.rs @@ -1,4 +1,5 @@ -use super::{builder::ConfigBuilder, format, vars, Config, Format, FormatHint}; +use super::{builder::ConfigBuilder, format, validation, vars, Config, Format, FormatHint}; +use crate::signal; use glob::glob; use lazy_static::lazy_static; use std::{ @@ -87,6 +88,31 @@ pub fn load_from_paths(config_paths: &[(PathBuf, FormatHint)]) -> Result Result> { + let (mut builder, load_warnings) = load_builder_from_paths(config_paths)?; + validation::check_provider(&builder)?; + signal_handler.clear(); + + // If there's a provider, overwrite the existing config builder with the remote variant. + if let Some(mut provider) = builder.provider { + builder = provider.build(signal_handler).await?; + debug!(message = "Provider configured.", provider = ?provider.provider_type()); + } + + let (new_config, build_warnings) = builder.build_with_warnings()?; + + for warning in load_warnings.into_iter().chain(build_warnings) { + warn!("{}", warning); + } + + Ok(new_config) +} + pub fn load_builder_from_paths( config_paths: &[(PathBuf, FormatHint)], ) -> Result<(ConfigBuilder, Vec), Vec> { @@ -158,7 +184,7 @@ fn open_config(path: &Path) -> Option { } } -fn load( +pub fn load( mut input: impl std::io::Read, format: FormatHint, ) -> Result<(ConfigBuilder, Vec), Vec> { diff --git a/src/config/mod.rs b/src/config/mod.rs index e9d4cf7640d6a..97ddda7ee047f 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -26,6 +26,7 @@ pub mod component; mod diff; pub mod format; mod loading; +pub mod provider; mod unit_test; mod validation; mod vars; @@ -35,8 +36,8 @@ pub use builder::ConfigBuilder; pub use diff::ConfigDiff; pub use format::{Format, FormatHint}; pub use loading::{ - load_builder_from_paths, load_from_paths, load_from_str, merge_path_lists, process_paths, - CONFIG_PATHS, + load, load_builder_from_paths, load_from_paths, load_from_paths_with_provider, load_from_str, + merge_path_lists, process_paths, CONFIG_PATHS, }; pub use unit_test::build_unit_tests_main as build_unit_tests; pub use validation::warnings; @@ -204,7 +205,6 @@ macro_rules! impl_generate_config_from_default { }; } -#[async_trait::async_trait] #[async_trait] #[typetag::serde(tag = "type")] pub trait SourceConfig: core::fmt::Debug + Send + Sync { diff --git a/src/config/provider.rs b/src/config/provider.rs new file mode 100644 index 0000000000000..cb88432d62801 --- /dev/null +++ b/src/config/provider.rs @@ -0,0 +1,49 @@ +use super::{component::ExampleError, GenerateConfig}; +use crate::{providers, signal}; +use async_trait::async_trait; +use toml::Value; + +#[async_trait] +#[typetag::serde(tag = "type")] +pub trait ProviderConfig: core::fmt::Debug + Send + Sync + dyn_clone::DynClone { + /// Builds a provider, returning a string containing the config. It's passed a signals + /// channel to control reloading and shutdown, as applicable. + async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> providers::Result; + fn provider_type(&self) -> &'static str; +} + +dyn_clone::clone_trait_object!(ProviderConfig); + +/// Describes a provider plugin storing its type name and an optional example config. +pub struct ProviderDescription { + pub type_str: &'static str, + example_value: fn() -> Option, +} + +impl ProviderDescription +where + inventory::iter: + std::iter::IntoIterator, +{ + /// Creates a new provider plugin description. + /// Configuration example is generated by the `GenerateConfig` trait. + pub fn new(type_str: &'static str) -> Self { + Self { + type_str, + example_value: || Some(B::generate_config()), + } + } + + /// Returns an example config for a plugin identified by its type. + pub fn example(type_str: &str) -> Result { + inventory::iter:: + .into_iter() + .find(|t| t.type_str == type_str) + .ok_or_else(|| ExampleError::DoesNotExist { + type_str: type_str.to_owned(), + }) + .and_then(|t| (t.example_value)().ok_or(ExampleError::MissingExample)) + } +} + +inventory::collect!(ProviderDescription); diff --git a/src/config/validation.rs b/src/config/validation.rs index 8c68007efcde3..4fee59671ff76 100644 --- a/src/config/validation.rs +++ b/src/config/validation.rs @@ -1,6 +1,19 @@ use super::{builder::ConfigBuilder, DataType, Resource}; use std::collections::HashMap; +/// Check that provide + topology config aren't present in the same builder, which is an error. +pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec> { + if config.provider.is_some() + && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty()) + { + Err(vec![ + "No sources/transforms/sinks are allowed if provider config is present.".to_owned(), + ]) + } else { + Ok(()) + } +} + pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec> { let mut errors = vec![]; diff --git a/src/lib.rs b/src/lib.rs index c960c4b70c98a..b7cfa0826206f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ pub mod kubernetes; pub mod line_agg; pub mod list; pub(crate) mod pipeline; +pub mod providers; #[cfg(feature = "rusoto_core")] pub mod rusoto; pub mod serde; diff --git a/src/providers/http.rs b/src/providers/http.rs new file mode 100644 index 0000000000000..47a4d0d303688 --- /dev/null +++ b/src/providers/http.rs @@ -0,0 +1,188 @@ +use super::Result; +use crate::{ + config::{ + self, + provider::{ProviderConfig, ProviderDescription}, + }, + http::HttpClient, + signal, + tls::{TlsOptions, TlsSettings}, +}; +use async_stream::stream; +use bytes::Buf; +use futures::Stream; +use hyper::Body; +use indexmap::IndexMap; +use serde::{Deserialize, Serialize}; +use tokio::time; +use url::Url; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct RequestConfig { + #[serde(default)] + pub headers: IndexMap, +} + +impl Default for RequestConfig { + fn default() -> Self { + Self { + headers: IndexMap::new(), + } + } +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(deny_unknown_fields, default)] +pub struct HttpConfig { + url: Option, + request: RequestConfig, + poll_interval_secs: u64, + #[serde(flatten)] + tls_options: Option, +} + +impl Default for HttpConfig { + fn default() -> Self { + Self { + url: None, + request: RequestConfig::default(), + poll_interval_secs: 30, + tls_options: None, + } + } +} + +/// Makes an HTTP request to the provided endpoint, returning the String body. +async fn http_request( + url: &Url, + tls_options: &Option, + headers: &IndexMap, +) -> std::result::Result { + let tls_settings = TlsSettings::from_options(tls_options).map_err(|_| "Invalid TLS options")?; + let http_client = HttpClient::::new(tls_settings).map_err(|_| "Invalid TLS settings")?; + + // Build HTTP request. + let mut builder = http::request::Builder::new().uri(url.to_string()); + + // Augment with headers. These may be required e.g. for authentication to + // private endpoints. + for (header, value) in headers.iter() { + builder = builder.header(header.as_str(), value.as_str()); + } + + let request = builder + .body(Body::empty()) + .map_err(|_| "Couldn't create HTTP request")?; + + info!( + message = "Attempting to retrieve configuration.", + url = ?url.as_str() + ); + + let response = http_client.send(request).await.map_err(|err| { + let message = "HTTP error"; + error!( + message = ?message, + error = ?err, + url = ?url.as_str()); + message + })?; + + info!(message = "Response received.", url = ?url.as_str()); + + hyper::body::to_bytes(response.into_body()) + .await + .map_err(|err| { + let message = "Error interpreting response."; + let cause = err.into_cause(); + error!( + message = ?message, + error = ?cause); + + message + }) +} + +/// Calls `http_request`, serializing the result to a `ConfigBuilder`. +async fn http_request_to_config_builder( + url: &Url, + tls_options: &Option, + headers: &IndexMap, +) -> Result { + let config_str = http_request(url, tls_options, headers) + .await + .map_err(|e| vec![e.to_owned()])?; + + let (config_builder, warnings) = config::load(config_str.chunk(), None)?; + + for warning in warnings.into_iter() { + warn!("{}", warning); + } + + Ok(config_builder) +} + +/// Polls the HTTP endpoint after/every `poll_interval_secs`, returning a stream of `ConfigBuilder`. +fn poll_http( + poll_interval_secs: u64, + url: Url, + tls_options: Option, + headers: IndexMap, +) -> impl Stream { + let duration = time::Duration::from_secs(poll_interval_secs); + let mut interval = time::interval_at(time::Instant::now() + duration, duration); + + stream! { + loop { + interval.tick().await; + + match http_request_to_config_builder(&url, &tls_options, &headers).await { + Ok(config_builder) => yield signal::SignalTo::ReloadFromConfigBuilder(config_builder), + Err(_) => return, + }; + + info!( + message = "HTTP provider is waiting.", + poll_interval_secs = ?poll_interval_secs, + url = ?url.as_str()); + } + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "http")] +impl ProviderConfig for HttpConfig { + async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> Result { + let url = self + .url + .take() + .ok_or_else(|| vec!["URL is required for the `http` provider.".to_owned()])?; + + let tls_options = self.tls_options.take(); + let poll_interval_secs = self.poll_interval_secs; + let request = self.request.clone(); + + let config_builder = + http_request_to_config_builder(&url, &tls_options, &request.headers).await?; + + // Poll for changes to remote configuration. + signal_handler.add(poll_http( + poll_interval_secs, + url, + tls_options, + request.headers.clone(), + )); + + Ok(config_builder) + } + + fn provider_type(&self) -> &'static str { + "http" + } +} + +inventory::submit! { + ProviderDescription::new::("http") +} + +impl_generate_config_from_default!(HttpConfig); diff --git a/src/providers/mod.rs b/src/providers/mod.rs new file mode 100644 index 0000000000000..e94a350fea589 --- /dev/null +++ b/src/providers/mod.rs @@ -0,0 +1,6 @@ +pub mod http; + +use super::config::ConfigBuilder; + +/// A provider returns a `ConfigBuilder` and config warnings, if successful. +pub type Result = std::result::Result>; diff --git a/src/signal.rs b/src/signal.rs index ce542ec10fd23..23b8e8f26e87f 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -1,18 +1,118 @@ -use futures::Stream; +use super::config::ConfigBuilder; +use tokio::sync::{broadcast, mpsc}; +use tokio_stream::{Stream, StreamExt}; -#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub type ShutdownTx = broadcast::Sender<()>; +pub type SignalTx = mpsc::Sender; +pub type SignalRx = mpsc::Receiver; + +#[derive(Debug)] +/// Control messages used by Vector to drive topology and shutdown events. pub enum SignalTo { - /// Signal to reload config. - Reload, + /// Signal to reload config from a string. + ReloadFromConfigBuilder(ConfigBuilder), + /// Signal to reload config from the filesystem. + ReloadFromDisk, /// Signal to shutdown process. Shutdown, /// Shutdown process immediately. Quit, } +/// SignalHandler is a general `ControlTo` message receiver and transmitter. It's used by +/// OS signals and providers to surface control events to the root of the application. +pub struct SignalHandler { + tx: SignalTx, + shutdown_txs: Vec, +} + +impl SignalHandler { + /// Create a new signal handler. We'll have space for 2 control messages at a time, to + /// ensure the channel isn't blocking. + pub fn new() -> (Self, SignalRx) { + let (tx, rx) = mpsc::channel(2); + let handler = Self { + tx, + shutdown_txs: vec![], + }; + + (handler, rx) + } + + /// Clones the transmitter. + pub fn clone_tx(&self) -> SignalTx { + self.tx.clone() + } + + /// Takes a stream who's elements are convertible to `SignalTo`, and spawns a permanent + /// task for transmitting to the receiver. + pub fn forever(&mut self, stream: S) + where + T: Into + Send + Sync, + S: Stream + 'static + Send, + { + let tx = self.tx.clone(); + + tokio::spawn(async move { + tokio::pin!(stream); + + while let Some(value) = stream.next().await { + if tx.send(value.into()).await.is_err() { + error!(message = "Couldn't send signal."); + break; + } + } + }); + } + + /// Takes a stream, sending to the underlying signal receiver. Returns a broadcast tx + /// channel which can be used by the caller to either subscribe to cancelation, or trigger + /// it. Useful for providers that may need to do both. + pub fn add(&mut self, stream: S) + where + T: Into + Send, + S: Stream + 'static + Send, + { + let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(2); + let tx = self.tx.clone(); + + self.shutdown_txs.push(shutdown_tx); + + tokio::spawn(async move { + tokio::pin!(stream); + + loop { + tokio::select! { + biased; + + _ = shutdown_rx.recv() => break, + Some(value) = stream.next() => { + if tx.send(value.into()).await.is_err() { + error!(message = "Couldn't send signal."); + break; + } + } + else => { + error!(message = "Underlying stream is closed."); + break; + } + } + } + }); + } + + /// Shutdown active signal handlers. + pub fn clear(&mut self) { + for shutdown_tx in self.shutdown_txs.drain(..) { + // An error just means the channel was already shut down; safe to ignore. + let _ = shutdown_tx.send(()); + } + } +} + /// Signals from OS/user. #[cfg(unix)] -pub fn signals() -> impl Stream { +pub fn os_signals() -> impl Stream { use tokio::signal::unix::{signal, SignalKind}; let mut sigint = signal(SignalKind::interrupt()).expect("Signal handlers should not panic."); @@ -26,7 +126,7 @@ pub fn signals() -> impl Stream { _ = sigint.recv() => SignalTo::Shutdown, _ = sigterm.recv() => SignalTo::Shutdown, _ = sigquit.recv() => SignalTo::Quit, - _ = sighup.recv() => SignalTo::Reload, + _ = sighup.recv() => SignalTo::ReloadFromDisk, }; yield signal; } @@ -35,7 +135,7 @@ pub fn signals() -> impl Stream { /// Signals from OS/user. #[cfg(windows)] -pub fn signals() -> impl Stream { +pub fn os_signals() -> impl Stream { use futures::future::FutureExt; async_stream::stream! {