Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement!: Remove default encoding.codec where appropriate #5281

Merged
merged 18 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/components/sinks/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ components: sinks: aws_s3: components._aws & {
enabled: true
codec: {
enabled: true
default: "text"
default: null
enum: ["ndjson", "text"]
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/components/sinks/pulsar.cue
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ components: sinks: pulsar: {
enabled: true
codec: {
enabled: true
default: "text"
default: null
enum: ["text", "json"]
}
}
Expand Down
54 changes: 37 additions & 17 deletions src/sinks/aws_s3.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
config::{log_schema, DataType, SinkConfig, SinkContext, SinkDescription},
config::{log_schema, DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription},
rusoto::{self, RegionOrEndpoint},
serde::to_string,
sinks::util::{
encoding::{EncodingConfigWithDefault, EncodingConfiguration},
encoding::{EncodingConfig, EncodingConfiguration},
retries::RetryLogic,
sink::Response,
BatchConfig, BatchSettings, Buffer, Compression, Concurrency, PartitionBatchSink,
Expand Down Expand Up @@ -37,7 +37,7 @@ pub struct S3Sink {
client: S3Client,
}

#[derive(Deserialize, Serialize, Debug, Default, Clone)]
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct S3SinkConfig {
pub bucket: String,
Expand All @@ -49,11 +49,7 @@ pub struct S3SinkConfig {
options: S3Options,
#[serde(flatten)]
pub region: RegionOrEndpoint,
#[serde(
skip_serializing_if = "crate::serde::skip_serializing_if_default",
default
)]
pub encoding: EncodingConfigWithDefault<Encoding>,
pub encoding: EncodingConfig<Encoding>,
#[serde(default = "Compression::gzip_default")]
pub compression: Compression,
#[serde(default)]
Expand Down Expand Up @@ -127,9 +123,7 @@ lazy_static! {

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
#[serde(rename_all = "snake_case")]
#[derivative(Default)]
pub enum Encoding {
#[derivative(Default)]
Text,
Ndjson,
}
Expand All @@ -138,7 +132,25 @@ inventory::submit! {
SinkDescription::new::<S3SinkConfig>("aws_s3")
}

impl_generate_config_from_default!(S3SinkConfig);
impl GenerateConfig for S3SinkConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
bucket: "".to_owned(),
key_prefix: None,
filename_time_format: None,
filename_append_uuid: None,
filename_extension: None,
options: S3Options::default(),
region: RegionOrEndpoint::default(),
encoding: Encoding::Text.into(),
compression: Compression::gzip_default(),
batch: BatchConfig::default(),
request: TowerRequestConfig::default(),
assume_role: None,
})
.unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "aws_s3")]
Expand Down Expand Up @@ -377,7 +389,7 @@ impl RetryLogic for S3RetryLogic {
fn encode_event(
mut event: Event,
key_prefix: &Template,
encoding: &EncodingConfigWithDefault<Encoding>,
encoding: &EncodingConfig<Encoding>,
) -> Option<PartitionInnerBuffer<Vec<u8>, Bytes>> {
let key = key_prefix
.render_string(&event)
Expand Down Expand Up @@ -462,10 +474,12 @@ mod tests {

let key_prefix = Template::try_from("{{ key }}").unwrap();

let encoding_config = EncodingConfigWithDefault {
let encoding_config = EncodingConfig {
codec: Encoding::Ndjson,
schema: None,
only_fields: None,
except_fields: Some(vec!["key".into()]),
..Default::default()
timestamp_format: None,
};

let bytes = encode_event(event, &key_prefix, &encoding_config).unwrap();
Expand Down Expand Up @@ -706,16 +720,22 @@ mod integration_tests {
ensure_bucket(&client()).await;

S3SinkConfig {
key_prefix: Some(random_string(10) + "/date=%F/"),
bucket: BUCKET.to_string(),
key_prefix: Some(random_string(10) + "/date=%F/"),
filename_time_format: None,
filename_append_uuid: None,
filename_extension: None,
options: S3Options::default(),
region: RegionOrEndpoint::with_endpoint("http://localhost:4566".to_owned()),
encoding: Encoding::Text.into(),
compression: Compression::None,
batch: BatchConfig {
max_bytes: Some(batch_size),
timeout_secs: Some(5),
..Default::default()
},
region: RegionOrEndpoint::with_endpoint("http://localhost:4566".to_owned()),
..Default::default()
request: TowerRequestConfig::default(),
assume_role: None,
}
}

Expand Down
1 change: 0 additions & 1 deletion src/sinks/elasticsearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ mod tests {
let config = ElasticSearchConfig {
index: Some(String::from("{{ idx }}")),
encoding: EncodingConfigWithDefault {
codec: Encoding::Default,
except_fields: Some(vec!["idx".to_string(), "timestamp".to_string()]),
..Default::default()
},
Expand Down
22 changes: 6 additions & 16 deletions src/sinks/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
event::Event,
internal_events::FileOpen,
sinks::util::{
encoding::{EncodingConfigWithDefault, EncodingConfiguration},
encoding::{EncodingConfig, EncodingConfiguration},
StreamSink,
},
template::Template,
Expand Down Expand Up @@ -34,11 +34,7 @@ use std::convert::TryFrom;
pub struct FileSinkConfig {
pub path: Template,
pub idle_timeout_secs: Option<u64>,
#[serde(
default,
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub encoding: EncodingConfigWithDefault<Encoding>,
pub encoding: EncodingConfig<Encoding>,
#[serde(
default,
skip_serializing_if = "crate::serde::skip_serializing_if_default"
Expand All @@ -55,7 +51,7 @@ impl GenerateConfig for FileSinkConfig {
toml::Value::try_from(Self {
path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(),
idle_timeout_secs: None,
encoding: Default::default(),
encoding: Encoding::Text.into(),
compression: Default::default(),
})
.unwrap()
Expand All @@ -69,12 +65,6 @@ pub enum Encoding {
Ndjson,
}

impl Default for Encoding {
fn default() -> Self {
Encoding::Text
}
}

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum Compression {
Expand Down Expand Up @@ -157,7 +147,7 @@ impl SinkConfig for FileSinkConfig {
pub struct FileSink {
acker: Acker,
path: Template,
encoding: EncodingConfigWithDefault<Encoding>,
encoding: EncodingConfig<Encoding>,
idle_timeout: Duration,
files: ExpiringHashMap<Bytes, OutFile>,
compression: Compression,
Expand Down Expand Up @@ -320,7 +310,7 @@ async fn open_file(path: impl AsRef<std::path::Path>) -> std::io::Result<File> {
.await
}

pub fn encode_event(encoding: &EncodingConfigWithDefault<Encoding>, mut event: Event) -> Vec<u8> {
pub fn encode_event(encoding: &EncodingConfig<Encoding>, mut event: Event) -> Vec<u8> {
encoding.apply_rules(&mut event);
let log = event.into_log();
match encoding.codec() {
Expand All @@ -335,7 +325,7 @@ pub fn encode_event(encoding: &EncodingConfigWithDefault<Encoding>, mut event: E
async fn write_event_to_file(
file: &mut OutFile,
event: Event,
encoding: &EncodingConfigWithDefault<Encoding>,
encoding: &EncodingConfig<Encoding>,
) -> Result<(), std::io::Error> {
let mut buf = encode_event(encoding, event);
buf.push(b'\n');
Expand Down
53 changes: 33 additions & 20 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
use super::{default_host_key, Encoding};
use crate::{
config::{DataType, SinkConfig, SinkContext, SinkDescription},
config::{DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription},
sinks::splunk_hec::HecSinkConfig,
sinks::util::{
encoding::EncodingConfigWithDefault, BatchConfig, Compression, TowerRequestConfig,
},
sinks::util::{encoding::EncodingConfig, BatchConfig, Compression, TowerRequestConfig},
sinks::{Healthcheck, VectorSink},
template::Template,
};
use serde::{Deserialize, Serialize};

const HOST: &str = "https://cloud.humio.com";

#[derive(Clone, Debug, Deserialize, Serialize, Default)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct HumioLogsConfig {
pub(in crate::sinks::humio) token: String,
// Deprecated name
#[serde(alias = "host")]
pub(in crate::sinks::humio) endpoint: Option<String>,
pub(in crate::sinks::humio) source: Option<Template>,
#[serde(
skip_serializing_if = "crate::serde::skip_serializing_if_default",
default
)]
pub(in crate::sinks::humio) encoding: EncodingConfigWithDefault<Encoding>,
pub(in crate::sinks::humio) encoding: EncodingConfig<Encoding>,

pub(in crate::sinks::humio) event_type: Option<Template>,

Expand All @@ -44,7 +38,22 @@ inventory::submit! {
SinkDescription::new::<HumioLogsConfig>("humio_logs")
}

impl_generate_config_from_default!(HumioLogsConfig);
impl GenerateConfig for HumioLogsConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
token: "${HUMIO_TOKEN}".to_owned(),
endpoint: None,
source: None,
encoding: Encoding::Json.into(),
event_type: None,
host_key: default_host_key(),
compression: Compression::default(),
request: TowerRequestConfig::default(),
batch: BatchConfig::default(),
})
.unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "humio_logs")]
Expand All @@ -69,14 +78,16 @@ impl HumioLogsConfig {
HecSinkConfig {
token: self.token.clone(),
endpoint,
source: self.source.clone(),
host_key: self.host_key.clone(),
indexed_fields: vec![],
index: None,
sourcetype: self.event_type.clone(),
encoding: self.encoding.clone().transmute(),
source: self.source.clone(),
encoding: self.encoding.clone().into_encoding(),
compression: self.compression,
batch: self.batch,
request: self.request,
host_key: self.host_key.clone(),
..Default::default()
tls: None,
}
}
}
Expand Down Expand Up @@ -260,17 +271,19 @@ mod integration_tests {

/// create a new test config with the given ingest token
fn config(token: &str) -> super::HumioLogsConfig {
super::HumioLogsConfig {
endpoint: Some(HOST.to_string()),
HumioLogsConfig {
token: token.to_string(),
compression: Compression::None,
endpoint: Some(HOST.to_string()),
source: None,
encoding: Encoding::Json.into(),
event_type: None,
host_key: log_schema().host_key().to_string(),
compression: Compression::None,
request: TowerRequestConfig::default(),
batch: BatchConfig {
max_events: Some(1),
..Default::default()
},
host_key: log_schema().host_key().to_string(),
..Default::default()
}
}

Expand Down
10 changes: 2 additions & 8 deletions src/sinks/humio/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use super::{default_host_key, logs::HumioLogsConfig, Encoding};
use crate::{
config::{DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription, TransformConfig},
sinks::util::{
encoding::EncodingConfigWithDefault, BatchConfig, Compression, TowerRequestConfig,
},
sinks::util::{encoding::EncodingConfig, BatchConfig, Compression, TowerRequestConfig},
sinks::{Healthcheck, VectorSink},
template::Template,
transforms::metric_to_log::MetricToLogConfig,
Expand All @@ -21,11 +19,7 @@ pub struct HumioMetricsConfig {
#[serde(alias = "host")]
pub(in crate::sinks::humio) endpoint: Option<String>,
source: Option<Template>,
#[serde(
skip_serializing_if = "crate::serde::skip_serializing_if_default",
default
)]
encoding: EncodingConfigWithDefault<Encoding>,
encoding: EncodingConfig<Encoding>,

event_type: Option<Template>,

Expand Down
4 changes: 1 addition & 3 deletions src/sinks/humio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ pub mod metrics;
use crate::sinks::splunk_hec;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)]
#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
#[derivative(Default)]
pub enum Encoding {
#[derivative(Default)]
Json,
Text,
}
Expand Down
Loading