From b62d9ec0c2c0936c6441c0213cc5a3476cc27511 Mon Sep 17 00:00:00 2001 From: Lee Benson Date: Mon, 10 May 2021 09:56:36 +0100 Subject: [PATCH] wip --- src/config/builder.rs | 108 ++++++++++++++++++++++++++++++++++++--- src/config/loading.rs | 47 +++++++++-------- src/config/provider.rs | 6 ++- src/config/validation.rs | 13 ----- src/providers/mod.rs | 4 +- 5 files changed, 135 insertions(+), 43 deletions(-) diff --git a/src/config/builder.rs b/src/config/builder.rs index e1c144481e6fc..4c3154ce89735 100644 --- a/src/config/builder.rs +++ b/src/config/builder.rs @@ -1,12 +1,44 @@ #[cfg(feature = "api")] use super::api; use super::{ - compiler, default_data_dir, provider, Config, GlobalOptions, HealthcheckOptions, SinkConfig, - SinkOuter, SourceConfig, TestDefinition, TransformConfig, TransformOuter, + compiler, default_data_dir, provider::ProviderConfig, Config, GlobalOptions, + HealthcheckOptions, SinkConfig, SinkOuter, SourceConfig, TestDefinition, TransformConfig, + TransformOuter, }; use indexmap::IndexMap; use serde::{Deserialize, Serialize}; +/// A "bootstrapping" config can either contain a [provider] key and top-level types, or +/// top-level types and topology. This directs serialization accordingly. +#[derive(Deserialize, Serialize, Debug)] +pub enum Builder { + Config(ConfigBuilder), + Provider(ProviderBuilder), +} + +impl Builder { + pub fn is_provider(&self) -> bool { + matches!(self, Self::Provider(_)) + } + + pub fn into_config_builder(self) -> ConfigBuilder { + match self { + Self::Config(builder) => builder, + Self::Provider(builder) => builder.into(), + } + } + + pub fn append(&mut self, with: Self) -> Result<(), Vec> { + match (self, with) { + (Self::Config(mut config_builder), Self::Config(with)) => config_builder.append(with), + (Self::Provider(mut provider_builder), Self::Provider(with)) => { + provider_builder.append(with) + } + _ => Err(vec!["Conflicting builder type".to_owned()]), + } + } +} + #[derive(Deserialize, Serialize, Debug, Default)] #[serde(deny_unknown_fields)] pub struct ConfigBuilder { @@ -25,7 +57,6 @@ pub struct ConfigBuilder { pub transforms: IndexMap, #[serde(default)] pub tests: Vec, - pub provider: Option>, } impl Clone for ConfigBuilder { @@ -50,7 +81,6 @@ impl From for ConfigBuilder { sources: c.sources, sinks: c.sinks, transforms: c.transforms, - provider: None, tests: c.tests, } } @@ -110,8 +140,6 @@ impl ConfigBuilder { errors.push(error); } - self.provider = with.provider; - if self.global.data_dir.is_none() || self.global.data_dir == default_data_dir() { self.global.data_dir = with.global.data_dir; } else if with.global.data_dir != default_data_dir() @@ -162,3 +190,71 @@ impl ConfigBuilder { Ok(()) } } + +#[derive(Deserialize, Serialize, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct ProviderBuilder { + #[serde(flatten)] + pub global: GlobalOptions, + #[cfg(feature = "api")] + #[serde(default)] + pub api: api::Options, + #[serde(default)] + pub healthchecks: HealthcheckOptions, + pub tests: Vec, + pub provider: Option>, +} + +impl ProviderBuilder { + pub fn append(&mut self, with: Self) -> Result<(), Vec> { + let mut errors = Vec::new(); + + #[cfg(feature = "api")] + if let Err(error) = self.api.merge(with.api) { + errors.push(error); + } + + if self.global.data_dir.is_none() || self.global.data_dir == default_data_dir() { + self.global.data_dir = with.global.data_dir; + } else if with.global.data_dir != default_data_dir() + && self.global.data_dir != with.global.data_dir + { + // If two configs both set 'data_dir' and have conflicting values + // we consider this an error. + errors.push("conflicting values for 'data_dir' found".to_owned()); + } + + // If the user has multiple config files, we must *merge* log schemas + // until we meet a conflict, then we are allowed to error. + if let Err(merge_errors) = self.global.log_schema.merge(&with.global.log_schema) { + errors.extend(merge_errors); + } + + self.healthchecks.merge(with.healthchecks); + + with.tests.iter().for_each(|wt| { + if self.tests.iter().any(|t| t.name == wt.name) { + errors.push(format!("duplicate test name found: {}", wt.name)); + } + }); + if !errors.is_empty() { + return Err(errors); + } + + self.tests.extend(with.tests); + + Ok(()) + } +} + +impl From for ConfigBuilder { + fn from(provider_builder: ProviderBuilder) -> Self { + Self { + global: provider_builder.global, + #[cfg(feature = "api")] + api: provider_builder.api, + healthchecks: provider_builder.healthchecks, + ..Self::default() + } + } +} diff --git a/src/config/loading.rs b/src/config/loading.rs index f5742bf745665..d089c5cc8ca10 100644 --- a/src/config/loading.rs +++ b/src/config/loading.rs @@ -1,4 +1,4 @@ -use super::{builder::ConfigBuilder, format, validation, vars, Config, Format, FormatHint}; +use super::{builder::Builder, format, validation, vars, Config, Format, FormatHint}; use crate::signal; use glob::glob; use lazy_static::lazy_static; @@ -95,36 +95,39 @@ pub async fn load_from_paths_with_provider( signal_handler: &mut signal::SignalHandler, ) -> Result> { let (builder, load_warnings) = load_builder_from_paths(config_paths)?; - validation::check_provider(&builder)?; - match builder.provider { - Some(mut provider) => match provider.build(signal_handler).await { - Ok(config) => { + let config_builder = match builder { + Builder::Provider(mut provider_builder) => match provider_builder.provider { + Some(mut provider) => { + provider + .build(signal_handler) + .await + .map_err(|err| vec![format!("provider error: {}", err)])?; debug!(message = "Provider configured.", provider = ?provider.provider_type()); - load_from_str(&config, None) + config_builder } - Err(err) => Err(vec![format!("provider error: {}", err)]), + None => provider_builder.into(), }, - _ => { - let (config, build_warnings) = builder.build_with_warnings()?; + Builder::Config(config_builder) => config_builder, + }; - // Trigger a shutdown in the signal handler, which will terminate any provider - // streams that may exist prior to loading this configuration to prevent any - // in-flight polling/retrieval. - signal_handler.trigger_shutdown(); + let (config, build_warnings) = config_builder.build_with_warnings()?; - for warning in load_warnings.into_iter().chain(build_warnings) { - warn!("{}", warning); - } + // Trigger a shutdown in the signal handler, which will terminate any provider + // streams that may exist prior to loading this configuration to prevent any + // in-flight polling/retrieval. + signal_handler.trigger_shutdown(); - Ok(config) - } + for warning in load_warnings.into_iter().chain(build_warnings) { + warn!("{}", warning); } + + Ok(config) } pub fn load_builder_from_paths( config_paths: &[(PathBuf, FormatHint)], -) -> Result<(ConfigBuilder, Vec), Vec> { +) -> Result<(Builder, Vec), Vec> { let mut inputs = Vec::new(); let mut errors = Vec::new(); @@ -145,7 +148,7 @@ pub fn load_builder_from_paths( pub fn load_from_str(input: &str, format: FormatHint) -> Result> { let (builder, load_warnings) = load_from_inputs(std::iter::once((input.as_bytes(), format)))?; - let (config, build_warnings) = builder.build_with_warnings()?; + let (config, build_warnings) = builder.into_config_builder().build_with_warnings()?; for warning in load_warnings.into_iter().chain(build_warnings) { warn!("{}", warning); @@ -156,7 +159,7 @@ pub fn load_from_str(input: &str, format: FormatHint) -> Result, -) -> Result<(ConfigBuilder, Vec), Vec> { +) -> Result<(Builder, Vec), Vec> { let mut config = Config::builder(); let mut errors = Vec::new(); let mut warnings = Vec::new(); @@ -196,7 +199,7 @@ fn open_config(path: &Path) -> Option { fn load( mut input: impl std::io::Read, format: FormatHint, -) -> Result<(ConfigBuilder, Vec), Vec> { +) -> Result<(Builder, Vec), Vec> { let mut source_string = String::new(); input .read_to_string(&mut source_string) diff --git a/src/config/provider.rs b/src/config/provider.rs index cb88432d62801..fcceb261ddd9b 100644 --- a/src/config/provider.rs +++ b/src/config/provider.rs @@ -1,4 +1,8 @@ -use super::{component::ExampleError, GenerateConfig}; +use super::{ + api, builder, component::ExampleError, GenerateConfig, GlobalOptions, HealthcheckOptions, + TestDefinition, +}; +use crate::config::ConfigBuilder; use crate::{providers, signal}; use async_trait::async_trait; use toml::Value; diff --git a/src/config/validation.rs b/src/config/validation.rs index 4fee59671ff76..8c68007efcde3 100644 --- a/src/config/validation.rs +++ b/src/config/validation.rs @@ -1,19 +1,6 @@ use super::{builder::ConfigBuilder, DataType, Resource}; use std::collections::HashMap; -/// Check that provide + topology config aren't present in the same builder, which is an error. -pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec> { - if config.provider.is_some() - && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty()) - { - Err(vec![ - "No sources/transforms/sinks are allowed if provider config is present.".to_owned(), - ]) - } else { - Ok(()) - } -} - pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec> { let mut errors = vec![]; diff --git a/src/providers/mod.rs b/src/providers/mod.rs index e0bf2430c04e2..239d69a75cebc 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -1,4 +1,6 @@ pub mod http; +use super::config::ConfigBuilder; + /// A provider returns an initial configuration string, if successful. -pub type Result = std::result::Result; +pub type Result = std::result::Result;