Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
leebenson committed May 10, 2021
1 parent 3e93f63 commit b62d9ec
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 43 deletions.
108 changes: 102 additions & 6 deletions src/config/builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,44 @@
#[cfg(feature = "api")]
use super::api;
use super::{
compiler, default_data_dir, provider, Config, GlobalOptions, HealthcheckOptions, SinkConfig,
SinkOuter, SourceConfig, TestDefinition, TransformConfig, TransformOuter,
compiler, default_data_dir, provider::ProviderConfig, Config, GlobalOptions,
HealthcheckOptions, SinkConfig, SinkOuter, SourceConfig, TestDefinition, TransformConfig,
TransformOuter,
};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};

/// A "bootstrapping" config can either contain a [provider] key and top-level types, or
/// top-level types and topology. This directs serialization accordingly.
#[derive(Deserialize, Serialize, Debug)]
pub enum Builder {
Config(ConfigBuilder),
Provider(ProviderBuilder),
}

impl Builder {
pub fn is_provider(&self) -> bool {
matches!(self, Self::Provider(_))
}

pub fn into_config_builder(self) -> ConfigBuilder {
match self {
Self::Config(builder) => builder,
Self::Provider(builder) => builder.into(),
}
}

pub fn append(&mut self, with: Self) -> Result<(), Vec<String>> {
match (self, with) {
(Self::Config(mut config_builder), Self::Config(with)) => config_builder.append(with),
(Self::Provider(mut provider_builder), Self::Provider(with)) => {
provider_builder.append(with)
}
_ => Err(vec!["Conflicting builder type".to_owned()]),
}
}
}

#[derive(Deserialize, Serialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct ConfigBuilder {
Expand All @@ -25,7 +57,6 @@ pub struct ConfigBuilder {
pub transforms: IndexMap<String, TransformOuter>,
#[serde(default)]
pub tests: Vec<TestDefinition>,
pub provider: Option<Box<dyn provider::ProviderConfig>>,
}

impl Clone for ConfigBuilder {
Expand All @@ -50,7 +81,6 @@ impl From<Config> for ConfigBuilder {
sources: c.sources,
sinks: c.sinks,
transforms: c.transforms,
provider: None,
tests: c.tests,
}
}
Expand Down Expand Up @@ -110,8 +140,6 @@ impl ConfigBuilder {
errors.push(error);
}

self.provider = with.provider;

if self.global.data_dir.is_none() || self.global.data_dir == default_data_dir() {
self.global.data_dir = with.global.data_dir;
} else if with.global.data_dir != default_data_dir()
Expand Down Expand Up @@ -162,3 +190,71 @@ impl ConfigBuilder {
Ok(())
}
}

#[derive(Deserialize, Serialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct ProviderBuilder {
#[serde(flatten)]
pub global: GlobalOptions,
#[cfg(feature = "api")]
#[serde(default)]
pub api: api::Options,
#[serde(default)]
pub healthchecks: HealthcheckOptions,
pub tests: Vec<TestDefinition>,
pub provider: Option<Box<dyn ProviderConfig>>,
}

impl ProviderBuilder {
pub fn append(&mut self, with: Self) -> Result<(), Vec<String>> {
let mut errors = Vec::new();

#[cfg(feature = "api")]
if let Err(error) = self.api.merge(with.api) {
errors.push(error);
}

if self.global.data_dir.is_none() || self.global.data_dir == default_data_dir() {
self.global.data_dir = with.global.data_dir;
} else if with.global.data_dir != default_data_dir()
&& self.global.data_dir != with.global.data_dir
{
// If two configs both set 'data_dir' and have conflicting values
// we consider this an error.
errors.push("conflicting values for 'data_dir' found".to_owned());
}

// If the user has multiple config files, we must *merge* log schemas
// until we meet a conflict, then we are allowed to error.
if let Err(merge_errors) = self.global.log_schema.merge(&with.global.log_schema) {
errors.extend(merge_errors);
}

self.healthchecks.merge(with.healthchecks);

with.tests.iter().for_each(|wt| {
if self.tests.iter().any(|t| t.name == wt.name) {
errors.push(format!("duplicate test name found: {}", wt.name));
}
});
if !errors.is_empty() {
return Err(errors);
}

self.tests.extend(with.tests);

Ok(())
}
}

impl From<ProviderBuilder> for ConfigBuilder {
fn from(provider_builder: ProviderBuilder) -> Self {
Self {
global: provider_builder.global,
#[cfg(feature = "api")]
api: provider_builder.api,
healthchecks: provider_builder.healthchecks,
..Self::default()
}
}
}
47 changes: 25 additions & 22 deletions src/config/loading.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{builder::ConfigBuilder, format, validation, vars, Config, Format, FormatHint};
use super::{builder::Builder, format, validation, vars, Config, Format, FormatHint};
use crate::signal;
use glob::glob;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -95,36 +95,39 @@ pub async fn load_from_paths_with_provider(
signal_handler: &mut signal::SignalHandler,
) -> Result<Config, Vec<String>> {
let (builder, load_warnings) = load_builder_from_paths(config_paths)?;
validation::check_provider(&builder)?;

match builder.provider {
Some(mut provider) => match provider.build(signal_handler).await {
Ok(config) => {
let config_builder = match builder {
Builder::Provider(mut provider_builder) => match provider_builder.provider {
Some(mut provider) => {
provider
.build(signal_handler)
.await
.map_err(|err| vec![format!("provider error: {}", err)])?;
debug!(message = "Provider configured.", provider = ?provider.provider_type());
load_from_str(&config, None)
config_builder
}
Err(err) => Err(vec![format!("provider error: {}", err)]),
None => provider_builder.into(),
},
_ => {
let (config, build_warnings) = builder.build_with_warnings()?;
Builder::Config(config_builder) => config_builder,
};

// Trigger a shutdown in the signal handler, which will terminate any provider
// streams that may exist prior to loading this configuration to prevent any
// in-flight polling/retrieval.
signal_handler.trigger_shutdown();
let (config, build_warnings) = config_builder.build_with_warnings()?;

for warning in load_warnings.into_iter().chain(build_warnings) {
warn!("{}", warning);
}
// Trigger a shutdown in the signal handler, which will terminate any provider
// streams that may exist prior to loading this configuration to prevent any
// in-flight polling/retrieval.
signal_handler.trigger_shutdown();

Ok(config)
}
for warning in load_warnings.into_iter().chain(build_warnings) {
warn!("{}", warning);
}

Ok(config)
}

pub fn load_builder_from_paths(
config_paths: &[(PathBuf, FormatHint)],
) -> Result<(ConfigBuilder, Vec<String>), Vec<String>> {
) -> Result<(Builder, Vec<String>), Vec<String>> {
let mut inputs = Vec::new();
let mut errors = Vec::new();

Expand All @@ -145,7 +148,7 @@ pub fn load_builder_from_paths(

pub fn load_from_str(input: &str, format: FormatHint) -> Result<Config, Vec<String>> {
let (builder, load_warnings) = load_from_inputs(std::iter::once((input.as_bytes(), format)))?;
let (config, build_warnings) = builder.build_with_warnings()?;
let (config, build_warnings) = builder.into_config_builder().build_with_warnings()?;

for warning in load_warnings.into_iter().chain(build_warnings) {
warn!("{}", warning);
Expand All @@ -156,7 +159,7 @@ pub fn load_from_str(input: &str, format: FormatHint) -> Result<Config, Vec<Stri

fn load_from_inputs(
inputs: impl IntoIterator<Item = (impl std::io::Read, FormatHint)>,
) -> Result<(ConfigBuilder, Vec<String>), Vec<String>> {
) -> Result<(Builder, Vec<String>), Vec<String>> {
let mut config = Config::builder();
let mut errors = Vec::new();
let mut warnings = Vec::new();
Expand Down Expand Up @@ -196,7 +199,7 @@ fn open_config(path: &Path) -> Option<File> {
fn load(
mut input: impl std::io::Read,
format: FormatHint,
) -> Result<(ConfigBuilder, Vec<String>), Vec<String>> {
) -> Result<(Builder, Vec<String>), Vec<String>> {
let mut source_string = String::new();
input
.read_to_string(&mut source_string)
Expand Down
6 changes: 5 additions & 1 deletion src/config/provider.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use super::{component::ExampleError, GenerateConfig};
use super::{
api, builder, component::ExampleError, GenerateConfig, GlobalOptions, HealthcheckOptions,
TestDefinition,
};
use crate::config::ConfigBuilder;
use crate::{providers, signal};
use async_trait::async_trait;
use toml::Value;
Expand Down
13 changes: 0 additions & 13 deletions src/config/validation.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
use super::{builder::ConfigBuilder, DataType, Resource};
use std::collections::HashMap;

/// Check that provide + topology config aren't present in the same builder, which is an error.
pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
if config.provider.is_some()
&& (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
{
Err(vec![
"No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
])
} else {
Ok(())
}
}

pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
let mut errors = vec![];

Expand Down
4 changes: 3 additions & 1 deletion src/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod http;

use super::config::ConfigBuilder;

/// A provider returns an initial configuration string, if successful.
pub type Result = std::result::Result<String, &'static str>;
pub type Result = std::result::Result<ConfigBuilder, &'static str>;

0 comments on commit b62d9ec

Please sign in to comment.