Skip to content

Commit

Permalink
feat(config): Initial [provider] implementation - http (#7166)
Browse files Browse the repository at this point in the history
* provider init

Signed-off-by: Lee Benson <[email protected]>

* http wip

Signed-off-by: Lee Benson <[email protected]>

* Update validation.rs

* wip

Signed-off-by: Lee Benson <[email protected]>

* wip

* wip

* wip

Signed-off-by: Lee Benson <[email protected]>

* sources finished

Signed-off-by: Lee Benson <[email protected]>

* provider reload

Signed-off-by: Lee Benson <[email protected]>

* commentary

Signed-off-by: Lee Benson <[email protected]>

* reload event

Signed-off-by: Lee Benson <[email protected]>

* polling

Signed-off-by: Lee Benson <[email protected]>

* logs

* compile

Signed-off-by: Lee Benson <[email protected]>

* fix error

Signed-off-by: Lee Benson <[email protected]>

* clippy

Signed-off-by: Lee Benson <[email protected]>

* more comments

Signed-off-by: Lee Benson <[email protected]>

* Update Cargo.lock

* fix url serde

Signed-off-by: Lee Benson <[email protected]>

* fix event messages

Signed-off-by: Lee Benson <[email protected]>

* fix default features

Signed-off-by: Lee Benson <[email protected]>

* receiverstream

Signed-off-by: Lee Benson <[email protected]>

* providerS

Signed-off-by: Lee Benson <[email protected]>

* feature

Signed-off-by: Lee Benson <[email protected]>

* fix windows signals

Signed-off-by: Lee Benson <[email protected]>

* remove providers feature

Signed-off-by: Lee Benson <[email protected]>

* wip

* provider refactor

Signed-off-by: Lee Benson <[email protected]>

* shutdown

Signed-off-by: Lee Benson <[email protected]>

* broadcast

Signed-off-by: Lee Benson <[email protected]>

* remove control

Signed-off-by: Lee Benson <[email protected]>

* tweaks

Signed-off-by: Lee Benson <[email protected]>

* -option

Signed-off-by: Lee Benson <[email protected]>

* comment cleanup

Signed-off-by: Lee Benson <[email protected]>

* fix merge

Signed-off-by: Lee Benson <[email protected]>

* wip

* Revert "wip"

This reverts commit b62d9ec.

* configbuilder

Signed-off-by: Lee Benson <[email protected]>

* signal_rx

Signed-off-by: Lee Benson <[email protected]>

* bytes

Signed-off-by: Lee Benson <[email protected]>

* signal handling

Signed-off-by: Lee Benson <[email protected]>
  • Loading branch information
leebenson authored May 12, 2021
1 parent 838bdb2 commit 151dacf
Show file tree
Hide file tree
Showing 12 changed files with 489 additions and 57 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ tokio-postgres = { version = "0.7.2", default-features = false, features = ["run
toml = { version = "0.5.8", default-features = false }
typetag = { version = "0.1.7", default-features = false }
twox-hash = { version = "1.6.0", default-features = false }
url = { version = "2.2.2", default-features = false }
url = { version = "2.2.2", default-features = false, features = ["serde"] }
uuid = { version = "0.8.2", default-features = false, features = ["serde", "v4"], optional = true }
warp = { version = "0.3.1", default-features = false, optional = true }
zstd = { version = "0.6", default-features = false }
Expand Down
128 changes: 86 additions & 42 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use crate::{
cli::{handle_config_errors, Color, LogFormat, Opts, RootOpts, SubCommand},
config, generate, heartbeat, list, metrics, signal,
signal::SignalTo,
config, generate, heartbeat, list, metrics,
signal::{self, SignalTo},
topology::{self, RunningTopology},
trace, unit_test, validate,
};
use cfg_if::cfg_if;
use futures::StreamExt;
use std::{collections::HashMap, path::PathBuf};

use tokio::{
runtime::{self, Runtime},
sync::mpsc,
};

use futures::StreamExt;
use tokio_stream::wrappers::UnboundedReceiverStream;

#[cfg(feature = "sources-host_metrics")]
Expand All @@ -39,6 +37,8 @@ pub struct ApplicationConfig {
pub graceful_crash: mpsc::UnboundedReceiver<()>,
#[cfg(feature = "api")]
pub api: config::api::Options,
pub signal_handler: signal::SignalHandler,
pub signal_rx: signal::SignalRx,
}

pub struct Application {
Expand Down Expand Up @@ -110,6 +110,10 @@ impl Application {
let require_healthy = root_opts.require_healthy;

rt.block_on(async move {
// Signal handler for OS and provider messages.
let (mut signal_handler, signal_rx) = signal::SignalHandler::new();
signal_handler.forever(signal::os_signals());

if let Some(s) = sub_command {
let code = match s {
SubCommand::Validate(v) => validate::validate(&v, color).await,
Expand Down Expand Up @@ -153,7 +157,9 @@ impl Application {
config::init_log_schema(&config_paths, true).map_err(handle_config_errors)?;

let mut config =
config::load_from_paths(&config_paths).map_err(handle_config_errors)?;
config::load_from_paths_with_provider(&config_paths, &mut signal_handler)
.await
.map_err(handle_config_errors)?;

if !config.healthchecks.enabled {
info!("Health checks are disabled.");
Expand All @@ -177,6 +183,8 @@ impl Application {
graceful_crash,
#[cfg(feature = "api")]
api,
signal_handler,
signal_rx,
})
})
}?;
Expand All @@ -201,6 +209,9 @@ impl Application {
#[cfg(feature = "api")]
let api_config = self.config.api;

let mut signal_handler = self.config.signal_handler;
let mut signal_rx = self.config.signal_rx;

// Any internal_logs sources will have grabbed a copy of the
// early buffer by this point and set up a subscriber.
crate::trace::stop_buffering();
Expand All @@ -209,7 +220,7 @@ impl Application {
emit!(VectorStarted);
tokio::spawn(heartbeat::heartbeat());

// Using cfg_if flattens nesting.
// Configure the API server, if applicable.
cfg_if! (
if #[cfg(feature = "api")] {
// Assigned to prevent the API terminating when falling out of scope.
Expand All @@ -227,48 +238,81 @@ impl Application {
}
);

let signals = signal::signals();
tokio::pin!(signals);
let mut sources_finished = topology.sources_finished();

let signal = loop {
tokio::select! {
Some(signal) = signals.next() => {
if signal == SignalTo::Reload {
// Reload paths
config_paths = config::process_paths(&opts.config_paths_with_formats()).unwrap_or(config_paths);
// Reload config
let new_config = config::load_from_paths(&config_paths).map_err(handle_config_errors).ok();

if let Some(mut new_config) = new_config {
new_config.healthchecks.set_require_healthy(opts.require_healthy);
match topology
.reload_config_and_respawn(new_config)
.await
{
Ok(true) => {
#[cfg(feature = "api")]
// Pass the new config to the API server.
if let Some(ref api_server) = api_server {
api_server.update_config(topology.config());
Some(signal) = signal_rx.recv() => {
match signal {
SignalTo::ReloadFromConfigBuilder(config_builder) => {
match config_builder.build().map_err(handle_config_errors) {
Ok(mut new_config) => {
new_config.healthchecks.set_require_healthy(opts.require_healthy);
match topology
.reload_config_and_respawn(new_config)
.await
{
Ok(true) => {
#[cfg(feature = "api")]
// Pass the new config to the API server.
if let Some(ref api_server) = api_server {
api_server.update_config(topology.config());
}

emit!(VectorReloaded { config_paths: &config_paths })
},
Ok(false) => emit!(VectorReloadFailed),
// Trigger graceful shutdown for what remains of the topology
Err(()) => {
emit!(VectorReloadFailed);
emit!(VectorRecoveryFailed);
break SignalTo::Shutdown;
}
}

emit!(VectorReloaded { config_paths: &config_paths })
sources_finished = topology.sources_finished();
},
Ok(false) => emit!(VectorReloadFailed),
// Trigger graceful shutdown for what remains of the topology
Err(()) => {
emit!(VectorReloadFailed);
emit!(VectorRecoveryFailed);
break SignalTo::Shutdown;
Err(_) => {
emit!(VectorConfigLoadFailed);
}
}
}
SignalTo::ReloadFromDisk => {
// Reload paths
config_paths = config::process_paths(&opts.config_paths_with_formats()).unwrap_or(config_paths);
// Reload config
let new_config = config::load_from_paths_with_provider(&config_paths, &mut signal_handler)
.await
.map_err(handle_config_errors).ok();

if let Some(mut new_config) = new_config {
new_config.healthchecks.set_require_healthy(opts.require_healthy);
match topology
.reload_config_and_respawn(new_config)
.await
{
Ok(true) => {
#[cfg(feature = "api")]
// Pass the new config to the API server.
if let Some(ref api_server) = api_server {
api_server.update_config(topology.config());
}

emit!(VectorReloaded { config_paths: &config_paths })
},
Ok(false) => emit!(VectorReloadFailed),
// Trigger graceful shutdown for what remains of the topology
Err(()) => {
emit!(VectorReloadFailed);
emit!(VectorRecoveryFailed);
break SignalTo::Shutdown;
}
}
sources_finished = topology.sources_finished();
} else {
emit!(VectorConfigLoadFailed);
}
sources_finished = topology.sources_finished();
} else {
emit!(VectorConfigLoadFailed);
}
} else {
break signal;
_ => break signal,
}
}
// Trigger graceful shutdown if a component crashed, or all sources have ended.
Expand All @@ -283,7 +327,7 @@ impl Application {
emit!(VectorStopped);
tokio::select! {
_ = topology.stop() => (), // Graceful shutdown finished
_ = signals.next() => {
_ = signal_rx.recv() => {
// It is highly unlikely that this event will exit from topology.
emit!(VectorQuit);
// Dropping the shutdown future will immediately shut the server down
Expand All @@ -295,7 +339,7 @@ impl Application {
emit!(VectorQuit);
drop(topology);
}
SignalTo::Reload => unreachable!(),
_ => unreachable!(),
}
});
}
Expand Down
8 changes: 6 additions & 2 deletions src/config/builder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#[cfg(feature = "api")]
use super::api;
use super::{
compiler, default_data_dir, Config, GlobalOptions, HealthcheckOptions, SinkConfig, SinkOuter,
SourceConfig, TestDefinition, TransformConfig, TransformOuter,
compiler, default_data_dir, provider, Config, GlobalOptions, HealthcheckOptions, SinkConfig,
SinkOuter, SourceConfig, TestDefinition, TransformConfig, TransformOuter,
};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
Expand All @@ -25,6 +25,7 @@ pub struct ConfigBuilder {
pub transforms: IndexMap<String, TransformOuter>,
#[serde(default)]
pub tests: Vec<TestDefinition>,
pub provider: Option<Box<dyn provider::ProviderConfig>>,
}

impl Clone for ConfigBuilder {
Expand All @@ -49,6 +50,7 @@ impl From<Config> for ConfigBuilder {
sources: c.sources,
sinks: c.sinks,
transforms: c.transforms,
provider: None,
tests: c.tests,
}
}
Expand Down Expand Up @@ -108,6 +110,8 @@ impl ConfigBuilder {
errors.push(error);
}

self.provider = with.provider;

if self.global.data_dir.is_none() || self.global.data_dir == default_data_dir() {
self.global.data_dir = with.global.data_dir;
} else if with.global.data_dir != default_data_dir()
Expand Down
30 changes: 28 additions & 2 deletions src/config/loading.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{builder::ConfigBuilder, format, vars, Config, Format, FormatHint};
use super::{builder::ConfigBuilder, format, validation, vars, Config, Format, FormatHint};
use crate::signal;
use glob::glob;
use lazy_static::lazy_static;
use std::{
Expand Down Expand Up @@ -87,6 +88,31 @@ pub fn load_from_paths(config_paths: &[(PathBuf, FormatHint)]) -> Result<Config,
Ok(config)
}

/// Loads a configuration from paths. If a provider is present in the builder, the config is
/// used as bootstrapping for a remote source. Otherwise, provider instantiation is skipped.
pub async fn load_from_paths_with_provider(
config_paths: &[(PathBuf, FormatHint)],
signal_handler: &mut signal::SignalHandler,
) -> Result<Config, Vec<String>> {
let (mut builder, load_warnings) = load_builder_from_paths(config_paths)?;
validation::check_provider(&builder)?;
signal_handler.clear();

// If there's a provider, overwrite the existing config builder with the remote variant.
if let Some(mut provider) = builder.provider {
builder = provider.build(signal_handler).await?;
debug!(message = "Provider configured.", provider = ?provider.provider_type());
}

let (new_config, build_warnings) = builder.build_with_warnings()?;

for warning in load_warnings.into_iter().chain(build_warnings) {
warn!("{}", warning);
}

Ok(new_config)
}

pub fn load_builder_from_paths(
config_paths: &[(PathBuf, FormatHint)],
) -> Result<(ConfigBuilder, Vec<String>), Vec<String>> {
Expand Down Expand Up @@ -158,7 +184,7 @@ fn open_config(path: &Path) -> Option<File> {
}
}

fn load(
pub fn load(
mut input: impl std::io::Read,
format: FormatHint,
) -> Result<(ConfigBuilder, Vec<String>), Vec<String>> {
Expand Down
6 changes: 3 additions & 3 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod component;
mod diff;
pub mod format;
mod loading;
pub mod provider;
mod unit_test;
mod validation;
mod vars;
Expand All @@ -35,8 +36,8 @@ pub use builder::ConfigBuilder;
pub use diff::ConfigDiff;
pub use format::{Format, FormatHint};
pub use loading::{
load_builder_from_paths, load_from_paths, load_from_str, merge_path_lists, process_paths,
CONFIG_PATHS,
load, load_builder_from_paths, load_from_paths, load_from_paths_with_provider, load_from_str,
merge_path_lists, process_paths, CONFIG_PATHS,
};
pub use unit_test::build_unit_tests_main as build_unit_tests;
pub use validation::warnings;
Expand Down Expand Up @@ -204,7 +205,6 @@ macro_rules! impl_generate_config_from_default {
};
}

#[async_trait::async_trait]
#[async_trait]
#[typetag::serde(tag = "type")]
pub trait SourceConfig: core::fmt::Debug + Send + Sync {
Expand Down
49 changes: 49 additions & 0 deletions src/config/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use super::{component::ExampleError, GenerateConfig};
use crate::{providers, signal};
use async_trait::async_trait;
use toml::Value;

#[async_trait]
#[typetag::serde(tag = "type")]
pub trait ProviderConfig: core::fmt::Debug + Send + Sync + dyn_clone::DynClone {
/// Builds a provider, returning a string containing the config. It's passed a signals
/// channel to control reloading and shutdown, as applicable.
async fn build(&mut self, signal_handler: &mut signal::SignalHandler) -> providers::Result;
fn provider_type(&self) -> &'static str;
}

dyn_clone::clone_trait_object!(ProviderConfig);

/// Describes a provider plugin storing its type name and an optional example config.
pub struct ProviderDescription {
pub type_str: &'static str,
example_value: fn() -> Option<Value>,
}

impl ProviderDescription
where
inventory::iter<ProviderDescription>:
std::iter::IntoIterator<Item = &'static ProviderDescription>,
{
/// Creates a new provider plugin description.
/// Configuration example is generated by the `GenerateConfig` trait.
pub fn new<B: GenerateConfig>(type_str: &'static str) -> Self {
Self {
type_str,
example_value: || Some(B::generate_config()),
}
}

/// Returns an example config for a plugin identified by its type.
pub fn example(type_str: &str) -> Result<Value, ExampleError> {
inventory::iter::<ProviderDescription>
.into_iter()
.find(|t| t.type_str == type_str)
.ok_or_else(|| ExampleError::DoesNotExist {
type_str: type_str.to_owned(),
})
.and_then(|t| (t.example_value)().ok_or(ExampleError::MissingExample))
}
}

inventory::collect!(ProviderDescription);
Loading

0 comments on commit 151dacf

Please sign in to comment.