From 182caf34179629d547a3ef1882a26886eb001e9f Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Fri, 27 Nov 2020 17:59:50 +0300 Subject: [PATCH 01/16] remove not used Into impl Signed-off-by: Kirill Fomichev --- src/sinks/util/encoding/config.rs | 14 +++++++------- src/sinks/util/encoding/mod.rs | 2 +- src/sinks/util/encoding/with_default.rs | 22 +--------------------- 3 files changed, 9 insertions(+), 29 deletions(-) diff --git a/src/sinks/util/encoding/config.rs b/src/sinks/util/encoding/config.rs index e0a4fa46020e3..c41456a6115f4 100644 --- a/src/sinks/util/encoding/config.rs +++ b/src/sinks/util/encoding/config.rs @@ -46,16 +46,16 @@ impl EncodingConfiguration for EncodingConfig { } } -impl Into> for EncodingConfig +impl From> for EncodingConfig where E: Default + PartialEq, { - fn into(self) -> EncodingConfigWithDefault { - EncodingConfigWithDefault { - codec: self.codec, - only_fields: self.only_fields, - except_fields: self.except_fields, - timestamp_format: self.timestamp_format, + fn from(encoding: EncodingConfigWithDefault) -> Self { + Self { + codec: encoding.codec, + only_fields: encoding.only_fields, + except_fields: encoding.except_fields, + timestamp_format: encoding.timestamp_format, } } } diff --git a/src/sinks/util/encoding/mod.rs b/src/sinks/util/encoding/mod.rs index f9731f29c8846..3fae7f557a17a 100644 --- a/src/sinks/util/encoding/mod.rs +++ b/src/sinks/util/encoding/mod.rs @@ -39,7 +39,7 @@ use crate::{ Event, Result, }; use serde::{Deserialize, Serialize}; -use std::{collections::VecDeque, fmt::Debug}; +use std::collections::VecDeque; /// The behavior of a encoding configuration. pub trait EncodingConfiguration { diff --git a/src/sinks/util/encoding/with_default.rs b/src/sinks/util/encoding/with_default.rs index f649a2907de9e..5e1d7f3953313 100644 --- a/src/sinks/util/encoding/with_default.rs +++ b/src/sinks/util/encoding/with_default.rs @@ -91,26 +91,6 @@ where } } -impl Into> for EncodingConfigWithDefault -where - E: Default + PartialEq, -{ - fn into(self) -> EncodingConfig { - let Self { - codec, - only_fields, - except_fields, - timestamp_format, - } = self; - EncodingConfig { - codec, - only_fields, - except_fields, - timestamp_format, - } - } -} - impl From for EncodingConfigWithDefault { fn from(codec: E) -> Self { Self { @@ -190,7 +170,7 @@ where timestamp_format: inner.timestamp_format, }; - concrete.validate().map_err(serde::de::Error::custom)?; + concrete.validate().map_err(de::Error::custom)?; Ok(concrete) } } From 7824477a66ebd64754240d54dca360e2f404651e Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Fri, 27 Nov 2020 19:07:00 +0300 Subject: [PATCH 02/16] remove duplicated skip_serializing_if_default Signed-off-by: Kirill Fomichev --- src/sinks/new_relic_logs.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/sinks/new_relic_logs.rs b/src/sinks/new_relic_logs.rs index 9675a3dae2776..5e8ec00651a00 100644 --- a/src/sinks/new_relic_logs.rs +++ b/src/sinks/new_relic_logs.rs @@ -3,8 +3,8 @@ use crate::{ sinks::{ http::{HttpMethod, HttpSinkConfig}, util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, - BatchConfig, Compression, Concurrency, TowerRequestConfig, + encoding::EncodingConfigWithDefault, BatchConfig, Compression, Concurrency, + TowerRequestConfig, }, }, }; @@ -44,7 +44,10 @@ pub struct NewRelicLogsConfig { pub license_key: Option, pub insert_key: Option, pub region: Option, - #[serde(skip_serializing_if = "skip_serializing_if_default", default)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] pub encoding: EncodingConfigWithDefault, #[serde(default)] pub compression: Compression, @@ -77,13 +80,6 @@ impl From for crate::sinks::http::Encoding { } } -// There is another one of these in `util::encoding`, but this one is specialized for New Relic. -/// For encodings, answers "Is it possible to skip serializing this value, because it's the -/// default?" -pub(crate) fn skip_serializing_if_default(e: &EncodingConfigWithDefault) -> bool { - e.codec() == &Encoding::default() -} - #[async_trait::async_trait] #[typetag::serde(name = "new_relic_logs")] impl SinkConfig for NewRelicLogsConfig { @@ -162,9 +158,9 @@ mod tests { use super::*; use crate::{ config::SinkConfig, - event::Event, - sinks::util::{test::build_test_server, Concurrency}, + sinks::util::{encoding::EncodingConfiguration, test::build_test_server, Concurrency}, test_util::next_addr, + Event, }; use bytes::buf::BufExt; use futures::{stream, StreamExt}; From 4bee15be950f777563f297bbe22d8e7c138fc3eb Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Fri, 27 Nov 2020 23:49:05 +0300 Subject: [PATCH 03/16] use skip_serializing_if_default in EncodingConfig for consistency Signed-off-by: Kirill Fomichev --- src/sinks/util/encoding/config.rs | 7 ++++--- src/sinks/util/encoding/with_default.rs | 21 +++++---------------- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/src/sinks/util/encoding/config.rs b/src/sinks/util/encoding/config.rs index c41456a6115f4..452f832ee79c3 100644 --- a/src/sinks/util/encoding/config.rs +++ b/src/sinks/util/encoding/config.rs @@ -1,5 +1,6 @@ use crate::{ event::{PathComponent, PathIter}, + serde::skip_serializing_if_default, sinks::util::encoding::{ with_default::EncodingConfigWithDefault, EncodingConfiguration, TimestampFormat, }, @@ -22,11 +23,11 @@ use std::{ pub struct EncodingConfig { pub(crate) codec: E, // TODO(2410): Using PathComponents here is a hack for #2407, #2410 should fix this fully. - #[serde(default)] + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] pub(crate) only_fields: Option>>, - #[serde(default)] + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] pub(crate) except_fields: Option>, - #[serde(default)] + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] pub(crate) timestamp_format: Option, } diff --git a/src/sinks/util/encoding/with_default.rs b/src/sinks/util/encoding/with_default.rs index 5e1d7f3953313..2c2e483b73254 100644 --- a/src/sinks/util/encoding/with_default.rs +++ b/src/sinks/util/encoding/with_default.rs @@ -1,5 +1,6 @@ use crate::{ event::{PathComponent, PathIter}, + serde::skip_serializing_if_default, sinks::util::encoding::{EncodingConfig, EncodingConfiguration, TimestampFormat}, }; use serde::{ @@ -19,29 +20,17 @@ use std::{ pub struct EncodingConfigWithDefault { /// The format of the encoding. // TODO: This is currently sink specific. - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] pub(crate) codec: E, /// Keep only the following fields of the message. (Items mutually exclusive with `except_fields`) - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] // TODO(2410): Using PathComponents here is a hack for #2407, #2410 should fix this fully. pub(crate) only_fields: Option>>, /// Remove the following fields of the message. (Items mutually exclusive with `only_fields`) - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] pub(crate) except_fields: Option>, /// Format for outgoing timestamps. - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] + #[serde(default, skip_serializing_if = "skip_serializing_if_default")] pub(crate) timestamp_format: Option, } From fddc54b3c58f4b36f547d6412d2bc2841e293701 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Sun, 29 Nov 2020 17:14:34 +0300 Subject: [PATCH 04/16] add encodings Signed-off-by: Kirill Fomichev --- benches/files.rs | 2 +- benches/http.rs | 4 +- src/sinks/aws_cloudwatch_logs/mod.rs | 9 +-- src/sinks/aws_kinesis_firehose.rs | 9 +-- src/sinks/aws_kinesis_streams.rs | 9 +-- src/sinks/aws_s3.rs | 60 ++++++++++++-------- src/sinks/aws_sqs.rs | 9 +-- src/sinks/azure_monitor_logs.rs | 12 +--- src/sinks/clickhouse.rs | 10 +--- src/sinks/console.rs | 9 +-- src/sinks/datadog/logs.rs | 4 +- src/sinks/elasticsearch.rs | 11 +--- src/sinks/file/mod.rs | 29 ++-------- src/sinks/gcp/cloud_storage.rs | 28 ++++------ src/sinks/gcp/pubsub.rs | 12 +--- src/sinks/gcp/stackdriver_logs.rs | 12 +--- src/sinks/http.rs | 12 +--- src/sinks/humio/logs.rs | 15 +++-- src/sinks/humio/metrics.rs | 5 +- src/sinks/humio/mod.rs | 21 ------- src/sinks/influxdb/logs.rs | 13 ++--- src/sinks/kafka.rs | 42 ++++++-------- src/sinks/logdna.rs | 10 +--- src/sinks/loki.rs | 15 ++--- src/sinks/nats.rs | 31 ++++------- src/sinks/new_relic_logs.rs | 20 +------ src/sinks/papertrail.rs | 4 +- src/sinks/pulsar.rs | 25 +++------ src/sinks/sematext/logs.rs | 5 +- src/sinks/socket.rs | 5 +- src/sinks/splunk_hec.rs | 55 ++++++++++-------- src/sinks/util/encoding/encodings.rs | 74 +++++++++++++++++++++++++ src/sinks/util/encoding/mod.rs | 3 + src/sinks/util/encoding/with_default.rs | 12 ---- src/sinks/util/mod.rs | 13 +---- src/sources/splunk_hec.rs | 22 ++++++-- src/topology/mod.rs | 34 ++++++++---- tests/syslog.rs | 2 +- 38 files changed, 294 insertions(+), 373 deletions(-) create mode 100644 src/sinks/util/encoding/encodings.rs diff --git a/benches/files.rs b/benches/files.rs index 074bfa6e727ca..f65a9fa9de0a4 100644 --- a/benches/files.rs +++ b/benches/files.rs @@ -54,7 +54,7 @@ fn benchmark_files_without_partitions(c: &mut Criterion) { sinks::file::FileSinkConfig { path: output.try_into().unwrap(), idle_timeout_secs: None, - encoding: sinks::file::Encoding::Text.into(), + encoding: sinks::util::encoding::EncodingTextNdjson::Text.into(), compression: sinks::file::Compression::None, }, ); diff --git a/benches/http.rs b/benches/http.rs index 11e303554f2f7..91faa392044c8 100644 --- a/benches/http.rs +++ b/benches/http.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use tokio::runtime::Runtime; use vector::{ config, sinks, - sinks::util::Compression, + sinks::util::{encoding::EncodingTextJsonNdjson as Encoding, Compression}, sources, test_util::{next_addr, random_lines, runtime, send_lines, start_topology, wait_for_tcp}, Error, @@ -53,7 +53,7 @@ fn benchmark_http(c: &mut Criterion) { max_bytes: Some(num_lines * line_size), ..Default::default() }, - encoding: sinks::http::Encoding::Text.into(), + encoding: Encoding::Text.into(), request: Default::default(), tls: Default::default(), }, diff --git a/src/sinks/aws_cloudwatch_logs/mod.rs b/src/sinks/aws_cloudwatch_logs/mod.rs index c5b837f85564b..19af4c122eb64 100644 --- a/src/sinks/aws_cloudwatch_logs/mod.rs +++ b/src/sinks/aws_cloudwatch_logs/mod.rs @@ -5,7 +5,7 @@ use crate::{ event::{Event, LogEvent, Value}, rusoto::{self, RegionOrEndpoint}, sinks::util::{ - encoding::{EncodingConfig, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, retries::{FixedRetryPolicy, RetryLogic}, BatchConfig, BatchSettings, Compression, EncodedLength, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig, TowerRequestSettings, VecBuffer, @@ -135,13 +135,6 @@ pub struct CloudwatchLogsPartitionSvc { client: CloudWatchLogsClient, } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Json, -} - #[derive(Debug)] pub enum CloudwatchError { Put(RusotoError), diff --git a/src/sinks/aws_kinesis_firehose.rs b/src/sinks/aws_kinesis_firehose.rs index c84cc9e2148ef..a1c0f81288ee3 100644 --- a/src/sinks/aws_kinesis_firehose.rs +++ b/src/sinks/aws_kinesis_firehose.rs @@ -3,7 +3,7 @@ use crate::{ event::Event, rusoto::{self, RegionOrEndpoint}, sinks::util::{ - encoding::{EncodingConfig, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, retries::RetryLogic, sink::Response, BatchConfig, BatchSettings, Compression, EncodedLength, TowerRequestConfig, VecBuffer, @@ -56,13 +56,6 @@ lazy_static! { }; } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Json, -} - inventory::submit! { SinkDescription::new::("aws_kinesis_firehose") } diff --git a/src/sinks/aws_kinesis_streams.rs b/src/sinks/aws_kinesis_streams.rs index 132fa876838cc..f5684588ac417 100644 --- a/src/sinks/aws_kinesis_streams.rs +++ b/src/sinks/aws_kinesis_streams.rs @@ -4,7 +4,7 @@ use crate::{ internal_events::AwsKinesisStreamsEventSent, rusoto::{self, RegionOrEndpoint}, sinks::util::{ - encoding::{EncodingConfig, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, retries::RetryLogic, sink::Response, BatchConfig, BatchSettings, Compression, EncodedLength, TowerRequestConfig, VecBuffer, @@ -59,13 +59,6 @@ lazy_static! { }; } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Json, -} - inventory::submit! { SinkDescription::new::("aws_kinesis_streams") } diff --git a/src/sinks/aws_s3.rs b/src/sinks/aws_s3.rs index a11d326f295d9..3ee2aa3e47b1c 100644 --- a/src/sinks/aws_s3.rs +++ b/src/sinks/aws_s3.rs @@ -1,9 +1,9 @@ use crate::{ - config::{log_schema, DataType, SinkConfig, SinkContext, SinkDescription}, + config::{log_schema, DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription}, rusoto::{self, RegionOrEndpoint}, serde::to_string, sinks::util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextNdjson as Encoding}, retries::RetryLogic, sink::Response, BatchConfig, BatchSettings, Buffer, Compression, Concurrency, PartitionBatchSink, @@ -37,7 +37,7 @@ pub struct S3Sink { client: S3Client, } -#[derive(Deserialize, Serialize, Debug, Default, Clone)] +#[derive(Deserialize, Serialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct S3SinkConfig { pub bucket: String, @@ -49,11 +49,7 @@ pub struct S3SinkConfig { options: S3Options, #[serde(flatten)] pub region: RegionOrEndpoint, - #[serde( - skip_serializing_if = "crate::serde::skip_serializing_if_default", - default - )] - pub encoding: EncodingConfigWithDefault, + pub encoding: EncodingConfig, #[serde(default = "Compression::gzip_default")] pub compression: Compression, #[serde(default)] @@ -125,20 +121,29 @@ lazy_static! { }; } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Text, - Ndjson, -} - inventory::submit! { SinkDescription::new::("aws_s3") } -impl_generate_config_from_default!(S3SinkConfig); +impl GenerateConfig for S3SinkConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + bucket: "".to_owned(), + key_prefix: None, + filename_time_format: None, + filename_append_uuid: None, + filename_extension: None, + options: S3Options::default(), + region: RegionOrEndpoint::default(), + encoding: Encoding::Text.into(), + compression: Compression::Gzip(None), + batch: BatchConfig::default(), + request: TowerRequestConfig::default(), + assume_role: None, + }) + .unwrap() + } +} #[async_trait::async_trait] #[typetag::serde(name = "aws_s3")] @@ -377,7 +382,7 @@ impl RetryLogic for S3RetryLogic { fn encode_event( mut event: Event, key_prefix: &Template, - encoding: &EncodingConfigWithDefault, + encoding: &EncodingConfig, ) -> Option, Bytes>> { let key = key_prefix .render_string(&event) @@ -462,10 +467,11 @@ mod tests { let key_prefix = Template::try_from("{{ key }}").unwrap(); - let encoding_config = EncodingConfigWithDefault { + let encoding_config = EncodingConfig { codec: Encoding::Ndjson, + only_fields: None, except_fields: Some(vec!["key".into()]), - ..Default::default() + timestamp_format: None, }; let bytes = encode_event(event, &key_prefix, &encoding_config).unwrap(); @@ -706,16 +712,22 @@ mod integration_tests { ensure_bucket(&client()).await; S3SinkConfig { - key_prefix: Some(random_string(10) + "/date=%F/"), bucket: BUCKET.to_string(), + key_prefix: Some(random_string(10) + "/date=%F/"), + filename_time_format: None, + filename_append_uuid: None, + filename_extension: None, + options: S3Options::default(), + region: RegionOrEndpoint::with_endpoint("http://localhost:4566".to_owned()), + encoding: Encoding::Text.into(), compression: Compression::None, batch: BatchConfig { max_bytes: Some(batch_size), timeout_secs: Some(5), ..Default::default() }, - region: RegionOrEndpoint::with_endpoint("http://localhost:4566".to_owned()), - ..Default::default() + request: TowerRequestConfig::default(), + assume_role: None, } } diff --git a/src/sinks/aws_sqs.rs b/src/sinks/aws_sqs.rs index f15ab1f7b3f2d..43673460944e3 100644 --- a/src/sinks/aws_sqs.rs +++ b/src/sinks/aws_sqs.rs @@ -3,7 +3,7 @@ use crate::{ internal_events::{AwsSqsEventSent, AwsSqsMessageGroupIdMissingKeys}, rusoto, sinks::util::{ - encoding::{EncodingConfig, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, retries::RetryLogic, sink::Response, BatchSettings, EncodedLength, TowerRequestConfig, VecBuffer, @@ -71,13 +71,6 @@ lazy_static! { }; } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Json, -} - inventory::submit! { SinkDescription::new::("aws_sqs") } diff --git a/src/sinks/azure_monitor_logs.rs b/src/sinks/azure_monitor_logs.rs index 6ac975cc9084f..3353a72dcfb87 100644 --- a/src/sinks/azure_monitor_logs.rs +++ b/src/sinks/azure_monitor_logs.rs @@ -4,7 +4,9 @@ use crate::{ http::HttpClient, sinks::{ util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{ + EncodingConfigWithDefault, EncodingConfiguration, EncodingJson as Encoding, + }, http::{BatchedHttpSink, HttpSink}, BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, TowerRequestConfig, }, @@ -49,14 +51,6 @@ pub struct AzureMonitorLogsConfig { pub tls: Option, } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - lazy_static! { static ref REQUEST_DEFAULTS: TowerRequestConfig = TowerRequestConfig { ..Default::default() diff --git a/src/sinks/clickhouse.rs b/src/sinks/clickhouse.rs index 83eb43cc20508..1325853286659 100644 --- a/src/sinks/clickhouse.rs +++ b/src/sinks/clickhouse.rs @@ -3,7 +3,7 @@ use crate::{ event::Event, http::{Auth, HttpClient}, sinks::util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{EncodingConfigWithDefault, EncodingConfiguration, EncodingJson as Encoding}, http::{BatchedHttpSink, HttpRetryLogic, HttpSink}, retries::{RetryAction, RetryLogic}, BatchConfig, BatchSettings, Buffer, Compression, TowerRequestConfig, @@ -53,14 +53,6 @@ inventory::submit! { impl_generate_config_from_default!(ClickhouseConfig); -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - #[async_trait::async_trait] #[typetag::serde(name = "clickhouse")] impl SinkConfig for ClickhouseConfig { diff --git a/src/sinks/console.rs b/src/sinks/console.rs index 65c4f458cb865..fd8cd559f6aea 100644 --- a/src/sinks/console.rs +++ b/src/sinks/console.rs @@ -4,7 +4,7 @@ use crate::{ event::Event, internal_events::{ConsoleEventProcessed, ConsoleFieldNotFound}, sinks::util::{ - encoding::{EncodingConfig, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, StreamSink, }, }; @@ -35,13 +35,6 @@ pub struct ConsoleSinkConfig { pub encoding: EncodingConfig, } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Json, -} - inventory::submit! { SinkDescription::new::("console") } diff --git a/src/sinks/datadog/logs.rs b/src/sinks/datadog/logs.rs index e27c744b9386b..98e9804046d45 100644 --- a/src/sinks/datadog/logs.rs +++ b/src/sinks/datadog/logs.rs @@ -6,9 +6,9 @@ use crate::{ util::{ batch::{Batch, BatchError}, encode_event, - encoding::{EncodingConfig, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, http::{BatchedHttpSink, HttpSink}, - BatchConfig, BatchSettings, BoxedRawValue, Compression, Encoding, JsonArrayBuffer, + BatchConfig, BatchSettings, BoxedRawValue, Compression, JsonArrayBuffer, TowerRequestConfig, VecBuffer, }, Healthcheck, VectorSink, diff --git a/src/sinks/elasticsearch.rs b/src/sinks/elasticsearch.rs index 5fe29d1876caf..cce9bb2345380 100644 --- a/src/sinks/elasticsearch.rs +++ b/src/sinks/elasticsearch.rs @@ -6,7 +6,7 @@ use crate::{ internal_events::{ElasticSearchEventReceived, ElasticSearchMissingKeys}, rusoto::{self, region_from_endpoint, RegionOrEndpoint}, sinks::util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{EncodingConfigWithDefault, EncodingConfiguration, EncodingJson as Encoding}, http::{BatchedHttpSink, HttpSink}, retries::{RetryAction, RetryLogic}, BatchConfig, BatchSettings, Buffer, Compression, TowerRequestConfig, @@ -69,14 +69,6 @@ lazy_static! { }; } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - #[derive(Deserialize, Serialize, Clone, Debug)] #[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")] pub enum ElasticSearchAuth { @@ -542,7 +534,6 @@ mod tests { let config = ElasticSearchConfig { index: Some(String::from("{{ idx }}")), encoding: EncodingConfigWithDefault { - codec: Encoding::Default, except_fields: Some(vec!["idx".to_string(), "timestamp".to_string()]), ..Default::default() }, diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 85123cd5b9dd7..fb35f9f84af0f 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -5,7 +5,7 @@ use crate::{ event::Event, internal_events::FileOpen, sinks::util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextNdjson as Encoding}, StreamSink, }, template::Template, @@ -34,11 +34,7 @@ use std::convert::TryFrom; pub struct FileSinkConfig { pub path: Template, pub idle_timeout_secs: Option, - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub encoding: EncodingConfigWithDefault, + pub encoding: EncodingConfig, #[serde( default, skip_serializing_if = "crate::serde::skip_serializing_if_default" @@ -55,26 +51,13 @@ impl GenerateConfig for FileSinkConfig { toml::Value::try_from(Self { path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(), idle_timeout_secs: None, - encoding: Default::default(), + encoding: Encoding::Text.into(), compression: Default::default(), }) .unwrap() } } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Ndjson, -} - -impl Default for Encoding { - fn default() -> Self { - Encoding::Text - } -} - #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Copy)] #[serde(rename_all = "snake_case")] pub enum Compression { @@ -157,7 +140,7 @@ impl SinkConfig for FileSinkConfig { pub struct FileSink { acker: Acker, path: Template, - encoding: EncodingConfigWithDefault, + encoding: EncodingConfig, idle_timeout: Duration, files: ExpiringHashMap, compression: Compression, @@ -320,7 +303,7 @@ async fn open_file(path: impl AsRef) -> std::io::Result { .await } -pub fn encode_event(encoding: &EncodingConfigWithDefault, mut event: Event) -> Vec { +pub fn encode_event(encoding: &EncodingConfig, mut event: Event) -> Vec { encoding.apply_rules(&mut event); let log = event.into_log(); match encoding.codec() { @@ -335,7 +318,7 @@ pub fn encode_event(encoding: &EncodingConfigWithDefault, mut event: E async fn write_event_to_file( file: &mut OutFile, event: Event, - encoding: &EncodingConfigWithDefault, + encoding: &EncodingConfig, ) -> Result<(), std::io::Error> { let mut buf = encode_event(encoding, event); buf.push(b'\n'); diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index f31effa85bac1..13411f7e844b8 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -5,7 +5,7 @@ use crate::{ serde::to_string, sinks::{ util::{ - encoding::{EncodingConfig, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextNdjson as Encoding}, retries::{RetryAction, RetryLogic}, BatchConfig, BatchSettings, Buffer, Compression, Concurrency, PartitionBatchSink, PartitionBuffer, PartitionInnerBuffer, ServiceBuilderExt, TowerRequestConfig, @@ -124,22 +124,6 @@ lazy_static! { }; } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Copy)] -#[serde(rename_all = "snake_case")] -enum Encoding { - Text, - Ndjson, -} - -impl Encoding { - fn content_type(&self) -> &'static str { - match self { - Self::Text => "text/plain", - Self::Ndjson => "application/x-ndjson", - } - } -} - inventory::submit! { SinkDescription::new::(NAME) } @@ -349,7 +333,7 @@ impl RequestSettings { let acl = config .acl .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap()); - let content_type = HeaderValue::from_str(config.encoding.codec().content_type()).unwrap(); + let content_type = get_content_type(&config.encoding.codec()); let content_encoding = config .compression .content_encoding() @@ -388,6 +372,14 @@ impl RequestSettings { } } +fn get_content_type(encoding: &Encoding) -> HeaderValue { + let ctype = match encoding { + Encoding::Text => "text/plain", + Encoding::Ndjson => "application/x-ndjson", + }; + HeaderValue::from_str(ctype).unwrap() +} + // Make a header pair from a key-value string pair fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> { Ok(( diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index 54702bb8b0205..f0c19d7151c21 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -5,7 +5,9 @@ use crate::{ http::HttpClient, sinks::{ util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{ + EncodingConfigWithDefault, EncodingConfiguration, EncodingJson as Encoding, + }, http::{BatchedHttpSink, HttpSink}, BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, TowerRequestConfig, }, @@ -54,14 +56,6 @@ fn default_skip_authentication() -> bool { false } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - inventory::submit! { SinkDescription::new::("gcp_pubsub") } diff --git a/src/sinks/gcp/stackdriver_logs.rs b/src/sinks/gcp/stackdriver_logs.rs index c54ec695788b7..c842e5c817246 100644 --- a/src/sinks/gcp/stackdriver_logs.rs +++ b/src/sinks/gcp/stackdriver_logs.rs @@ -5,7 +5,9 @@ use crate::{ http::HttpClient, sinks::{ util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{ + EncodingConfigWithDefault, EncodingConfiguration, EncodingJson as Encoding, + }, http::{BatchedHttpSink, HttpSink}, BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, TowerRequestConfig, }, @@ -61,14 +63,6 @@ struct StackdriverSink { severity_key: Option, } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - #[derive(Clone, Debug, Derivative, Deserialize, Serialize)] #[derivative(Default)] pub enum StackdriverLogName { diff --git a/src/sinks/http.rs b/src/sinks/http.rs index 78f342dd37848..49e19eb266fbc 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -1,14 +1,14 @@ use crate::{ config::{DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription}, - event::Event, http::{Auth, HttpClient}, sinks::util::{ buffer::compression::GZIP_DEFAULT, - encoding::{EncodingConfig, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJsonNdjson as Encoding}, http::{BatchedHttpSink, HttpSink}, BatchConfig, BatchSettings, Buffer, Compression, Concurrency, TowerRequestConfig, UriSerde, }, tls::{TlsOptions, TlsSettings}, + Event, }; use flate2::write::GzEncoder; use futures::{future, FutureExt, SinkExt}; @@ -89,14 +89,6 @@ pub enum HttpMethod { Put, } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - Text, - Ndjson, - Json, -} - inventory::submit! { SinkDescription::new::("http") } diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index be7a8cfbb5f6b..8a8d075b1432d 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -1,9 +1,10 @@ -use super::{default_host_key, Encoding}; +use super::default_host_key; use crate::{ config::{DataType, SinkConfig, SinkContext, SinkDescription}, sinks::splunk_hec::HecSinkConfig, sinks::util::{ - encoding::EncodingConfigWithDefault, BatchConfig, Compression, TowerRequestConfig, + encoding::{EncodingConfigWithDefault, EncodingTextJsonDefaultJson as Encoding}, + BatchConfig, Compression, TowerRequestConfig, }, sinks::{Healthcheck, VectorSink}, template::Template, @@ -69,14 +70,16 @@ impl HumioLogsConfig { HecSinkConfig { token: self.token.clone(), endpoint, - source: self.source.clone(), + host_key: self.host_key.clone(), + indexed_fields: vec![], + index: None, sourcetype: self.event_type.clone(), - encoding: self.encoding.clone().transmute(), + source: self.source.clone(), + encoding: self.encoding.clone().without_default(), compression: self.compression, batch: self.batch, request: self.request, - host_key: self.host_key.clone(), - ..Default::default() + tls: None, } } } diff --git a/src/sinks/humio/metrics.rs b/src/sinks/humio/metrics.rs index 0063484e8ef38..7d5bc82c060b9 100644 --- a/src/sinks/humio/metrics.rs +++ b/src/sinks/humio/metrics.rs @@ -1,8 +1,9 @@ -use super::{default_host_key, logs::HumioLogsConfig, Encoding}; +use super::{default_host_key, logs::HumioLogsConfig}; use crate::{ config::{DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription, TransformConfig}, sinks::util::{ - encoding::EncodingConfigWithDefault, BatchConfig, Compression, TowerRequestConfig, + encoding::{EncodingConfigWithDefault, EncodingTextJsonDefaultJson as Encoding}, + BatchConfig, Compression, TowerRequestConfig, }, sinks::{Healthcheck, VectorSink}, template::Template, diff --git a/src/sinks/humio/mod.rs b/src/sinks/humio/mod.rs index b74869309dab2..85d9c6beec8ba 100644 --- a/src/sinks/humio/mod.rs +++ b/src/sinks/humio/mod.rs @@ -1,27 +1,6 @@ pub mod logs; pub mod metrics; -use crate::sinks::splunk_hec; -use serde::{Deserialize, Serialize}; - -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Json, - Text, -} - -impl From for splunk_hec::Encoding { - fn from(v: Encoding) -> Self { - match v { - Encoding::Json => splunk_hec::Encoding::Json, - Encoding::Text => splunk_hec::Encoding::Text, - } - } -} - fn default_host_key() -> String { crate::config::LogSchema::default().host_key().to_string() } diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index ef7f4bdf2adec..fcb45d4c12459 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -9,7 +9,10 @@ use crate::{ }, util::{ encode_namespace, - encoding::{EncodingConfig, EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{ + EncodingConfig, EncodingConfigWithDefault, EncodingConfiguration, + EncodingText as Encoding, + }, http::{BatchedHttpSink, HttpSink}, BatchConfig, BatchSettings, Buffer, Compression, TowerRequestConfig, }, @@ -63,14 +66,6 @@ lazy_static! { }; } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - inventory::submit! { SinkDescription::new::("influxdb_logs") } diff --git a/src/sinks/kafka.rs b/src/sinks/kafka.rs index d446158f429d3..446eb7b761d03 100644 --- a/src/sinks/kafka.rs +++ b/src/sinks/kafka.rs @@ -5,7 +5,7 @@ use crate::{ kafka::{KafkaAuthConfig, KafkaCompression}, serde::to_string, sinks::util::{ - encoding::{EncodingConfig, EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, BatchConfig, }, template::{Template, TemplateError}, @@ -42,12 +42,12 @@ enum BuildError { TopicTemplate { source: TemplateError }, } -#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct KafkaSinkConfig { bootstrap_servers: String, topic: String, key_field: Option, - encoding: EncodingConfigWithDefault, + encoding: EncodingConfig, /// These batching options will **not** override librdkafka_options values. #[serde(default)] batch: BatchConfig, @@ -71,15 +71,6 @@ fn default_message_timeout_ms() -> u64 { 300000 // default in librdkafka } -#[derive(Clone, Copy, Debug, Derivative, Deserialize, Serialize, Eq, PartialEq)] -#[derivative(Default)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - #[derivative(Default)] - Text, - Json, -} - pub struct KafkaSink { producer: Arc, topic: Template, @@ -202,7 +193,7 @@ impl KafkaSink { producer: Arc::new(producer), topic: Template::try_from(config.topic).context(TopicTemplate)?, key_field: config.key_field, - encoding: config.encoding.into(), + encoding: config.encoding, flush_signal: Arc::new(Notify::new()), delivery_fut: FuturesUnordered::new(), in_flight: FuturesUnordered::new(), @@ -441,12 +432,12 @@ mod tests { let (key, bytes) = encode_event( event, &Some("key".into()), - &EncodingConfigWithDefault { + &EncodingConfig { codec: Encoding::Json, + only_fields: None, except_fields: Some(vec!["key".into()]), - ..Default::default() - } - .into(), + timestamp_format: None, + }, ); let map: BTreeMap = serde_json::from_slice(&bytes[..]).unwrap(); @@ -480,12 +471,14 @@ mod integration_test { let config = KafkaSinkConfig { bootstrap_servers: "localhost:9091".into(), topic: topic.clone(), - compression: KafkaCompression::None, - encoding: EncodingConfigWithDefault::from(Encoding::Text), key_field: None, + encoding: EncodingConfig::from(Encoding::Text), + batch: BatchConfig::default(), + compression: KafkaCompression::None, + auth: KafkaAuthConfig::default(), socket_timeout_ms: 60000, message_timeout_ms: 300000, - ..Default::default() + librdkafka_options: HashMap::new(), }; super::healthcheck(config).await.unwrap(); @@ -525,7 +518,7 @@ mod integration_test { bootstrap_servers: "localhost:9091".to_string(), topic: format!("{}-%Y%m%d", topic), compression: KafkaCompression::None, - encoding: EncodingConfigWithDefault::from(Encoding::Text), + encoding: Encoding::Text.into(), key_field: None, auth: KafkaAuthConfig { sasl: None, @@ -650,13 +643,14 @@ mod integration_test { let config = KafkaSinkConfig { bootstrap_servers: server.to_string(), topic: format!("{}-%Y%m%d", topic), - compression, - encoding: EncodingConfigWithDefault::from(Encoding::Text), key_field: None, + encoding: EncodingConfig::from(Encoding::Text), + batch: BatchConfig::default(), + compression, auth: kafka_auth.clone(), socket_timeout_ms: 60000, message_timeout_ms: 300000, - ..Default::default() + librdkafka_options: HashMap::new(), }; let topic = format!("{}-{}", topic, chrono::Utc::now().format("%Y%m%d")); let (acker, ack_counter) = Acker::new_for_testing(); diff --git a/src/sinks/logdna.rs b/src/sinks/logdna.rs index 8fff4509a6bf2..672349cabf755 100644 --- a/src/sinks/logdna.rs +++ b/src/sinks/logdna.rs @@ -3,7 +3,7 @@ use crate::{ event::Event, http::{Auth, HttpClient}, sinks::util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{EncodingConfigWithDefault, EncodingConfiguration, EncodingJson as Encoding}, http::{HttpSink, PartitionHttpSink}, BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig, UriSerde, @@ -64,14 +64,6 @@ impl GenerateConfig for LogdnaConfig { } } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - #[async_trait::async_trait] #[typetag::serde(name = "logdna")] impl SinkConfig for LogdnaConfig { diff --git a/src/sinks/loki.rs b/src/sinks/loki.rs index 8a2512fd3c151..208478abbbea3 100644 --- a/src/sinks/loki.rs +++ b/src/sinks/loki.rs @@ -18,7 +18,10 @@ use crate::{ http::{Auth, HttpClient}, sinks::util::{ buffer::loki::{LokiBuffer, LokiEvent, LokiRecord}, - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{ + EncodingConfigWithDefault, EncodingConfiguration, + EncodingTextJsonDefaultJson as Encoding, + }, http::{HttpSink, PartitionHttpSink}, BatchConfig, BatchSettings, PartitionBuffer, PartitionInnerBuffer, TowerRequestConfig, UriSerde, @@ -26,7 +29,6 @@ use crate::{ template::Template, tls::{TlsOptions, TlsSettings}, }; -use derivative::Derivative; use futures::{FutureExt, SinkExt}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -57,15 +59,6 @@ pub struct LokiConfig { tls: Option, } -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -enum Encoding { - #[derivative(Default)] - Json, - Text, -} - inventory::submit! { SinkDescription::new::("loki") } diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index 77b1e968b0bcf..3f6e934e8b83f 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -2,11 +2,13 @@ use crate::{ buffers::Acker, config::{DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription}, emit, - event::Event, internal_events::{NatsEventMissingKeys, NatsEventSendFail, NatsEventSendSuccess}, - sinks::util::encoding::{EncodingConfig, EncodingConfigWithDefault, EncodingConfiguration}, - sinks::util::StreamSink, + sinks::util::{ + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, + StreamSink, + }, template::{Template, TemplateError}, + Event, }; use async_trait::async_trait; use futures::{stream::BoxStream, FutureExt, StreamExt, TryFutureExt}; @@ -24,9 +26,9 @@ enum BuildError { * Code dealing with the SinkConfig struct. */ -#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct NatsSinkConfig { - encoding: EncodingConfigWithDefault, + encoding: EncodingConfig, #[serde(default = "default_name")] name: String, subject: String, @@ -37,15 +39,6 @@ fn default_name() -> String { String::from("vector") } -#[derive(Clone, Copy, Debug, Derivative, Deserialize, Serialize, Eq, PartialEq)] -#[derivative(Default)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - #[derivative(Default)] - Text, - Json, -} - inventory::submit! { SinkDescription::new::("nats") } @@ -127,13 +120,11 @@ pub struct NatsSink { impl NatsSink { fn new(config: NatsSinkConfig, acker: Acker) -> crate::Result { Ok(NatsSink { - acker, options: (&config).into(), + encoding: config.encoding, subject: Template::try_from(config.subject).context(SubjectTemplate)?, url: config.url, - - // DEV: the following causes a move; needs to be last. - encoding: config.encoding.into(), + acker, }) } } @@ -258,10 +249,10 @@ mod integration_tests { let subject = format!("test-{}", random_string(10)); let cnf = NatsSinkConfig { - encoding: EncodingConfigWithDefault::from(Encoding::Text), + encoding: EncodingConfig::from(Encoding::Text), + name: "".to_owned(), subject: subject.clone(), url: "nats://127.0.0.1:4222".to_owned(), - ..Default::default() }; // Establish the consumer subscription. diff --git a/src/sinks/new_relic_logs.rs b/src/sinks/new_relic_logs.rs index 5e8ec00651a00..d117405f96f7e 100644 --- a/src/sinks/new_relic_logs.rs +++ b/src/sinks/new_relic_logs.rs @@ -3,8 +3,8 @@ use crate::{ sinks::{ http::{HttpMethod, HttpSinkConfig}, util::{ - encoding::EncodingConfigWithDefault, BatchConfig, Compression, Concurrency, - TowerRequestConfig, + encoding::{EncodingConfigWithDefault, EncodingJson as Encoding}, + BatchConfig, Compression, Concurrency, TowerRequestConfig, }, }, }; @@ -64,22 +64,6 @@ inventory::submit! { impl_generate_config_from_default!(NewRelicLogsConfig); -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Json, -} - -impl From for crate::sinks::http::Encoding { - fn from(v: Encoding) -> crate::sinks::http::Encoding { - match v { - Encoding::Json => crate::sinks::http::Encoding::Json, - } - } -} - #[async_trait::async_trait] #[typetag::serde(name = "new_relic_logs")] impl SinkConfig for NewRelicLogsConfig { diff --git a/src/sinks/papertrail.rs b/src/sinks/papertrail.rs index d43fffc555071..e2fb9e73d9df4 100644 --- a/src/sinks/papertrail.rs +++ b/src/sinks/papertrail.rs @@ -1,9 +1,9 @@ use crate::{ config::{log_schema, DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription}, sinks::util::{ - encoding::{EncodingConfig, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, tcp::TcpSinkConfig, - Encoding, UriSerde, + UriSerde, }, tls::TlsConfig, Event, diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index 8aa3d9c97b236..e7267dc7af554 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -2,7 +2,7 @@ use crate::{ buffers::Acker, config::{log_schema, DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription}, event::Event, - sinks::util::encoding::{EncodingConfig, EncodingConfigWithDefault, EncodingConfiguration}, + sinks::util::encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, }; use futures::{future::BoxFuture, ready, stream::FuturesUnordered, FutureExt, Sink, Stream}; use pulsar::{ @@ -29,7 +29,7 @@ pub struct PulsarSinkConfig { #[serde(alias = "address")] endpoint: String, topic: String, - encoding: EncodingConfigWithDefault, + encoding: EncodingConfig, auth: Option, } @@ -39,15 +39,6 @@ pub struct AuthConfig { token: String, // } -#[derive(Clone, Copy, Debug, Derivative, Deserialize, Serialize, Eq, PartialEq)] -#[derivative(Default)] -#[serde(rename_all = "snake_case")] -pub enum Encoding { - #[derivative(Default)] - Text, - Json, -} - type PulsarProducer = Producer; type BoxedPulsarProducer = Box; @@ -96,7 +87,7 @@ impl SinkConfig for PulsarSinkConfig { .create_pulsar_producer() .await .context(CreatePulsarSink)?; - let sink = PulsarSink::new(producer, self.encoding.clone().into(), cx.acker()); + let sink = PulsarSink::new(producer, self.encoding.clone(), cx.acker()); let producer = self .create_pulsar_producer() @@ -290,12 +281,12 @@ mod tests { let event = encode_event( evt, - &EncodingConfigWithDefault { + &EncodingConfig { codec: Encoding::Json, + only_fields: None, except_fields: Some(vec!["key".into()]), - ..Default::default() - } - .into(), + timestamp_format: None, + }, ) .unwrap(); @@ -343,7 +334,7 @@ mod integration_tests { let (acker, ack_counter) = Acker::new_for_testing(); let producer = cnf.create_pulsar_producer().await.unwrap(); - let sink = PulsarSink::new(producer, cnf.encoding.clone().into(), acker); + let sink = PulsarSink::new(producer, cnf.encoding.clone(), acker); events.map(Ok).forward(sink).await.unwrap(); assert_eq!( diff --git a/src/sinks/sematext/logs.rs b/src/sinks/sematext/logs.rs index 6aa644539406c..a8b84b1e704cc 100644 --- a/src/sinks/sematext/logs.rs +++ b/src/sinks/sematext/logs.rs @@ -1,9 +1,10 @@ use super::Region; use crate::{ config::{DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription}, - sinks::elasticsearch::{ElasticSearchConfig, Encoding}, + sinks::elasticsearch::ElasticSearchConfig, sinks::util::{ - encoding::EncodingConfigWithDefault, BatchConfig, Compression, TowerRequestConfig, + encoding::{EncodingConfigWithDefault, EncodingJson as Encoding}, + BatchConfig, Compression, TowerRequestConfig, }, sinks::{Healthcheck, VectorSink}, Event, diff --git a/src/sinks/socket.rs b/src/sinks/socket.rs index edfba6dd3511a..3ac8f55194dab 100644 --- a/src/sinks/socket.rs +++ b/src/sinks/socket.rs @@ -3,7 +3,10 @@ use crate::sinks::util::unix::UnixSinkConfig; use crate::{ config::{DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription}, sinks::util::{ - encode_event, encoding::EncodingConfig, tcp::TcpSinkConfig, udp::UdpSinkConfig, Encoding, + encode_event, + encoding::{EncodingConfig, EncodingTextJson as Encoding}, + tcp::TcpSinkConfig, + udp::UdpSinkConfig, }, tls::TlsConfig, }; diff --git a/src/sinks/splunk_hec.rs b/src/sinks/splunk_hec.rs index 7725e89466cbc..92914c28071ed 100644 --- a/src/sinks/splunk_hec.rs +++ b/src/sinks/splunk_hec.rs @@ -1,5 +1,5 @@ use crate::{ - config::{log_schema, DataType, SinkConfig, SinkContext, SinkDescription}, + config::{log_schema, DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription}, event::{Event, LogEvent, Value}, http::HttpClient, internal_events::{ @@ -7,7 +7,7 @@ use crate::{ SplunkSourceTypeMissingKeys, }, sinks::util::{ - encoding::{EncodingConfigWithDefault, EncodingConfiguration}, + encoding::{EncodingConfig, EncodingConfiguration, EncodingTextJson as Encoding}, http::{BatchedHttpSink, HttpSink}, BatchConfig, BatchSettings, Buffer, Compression, Concurrency, TowerRequestConfig, }, @@ -29,7 +29,7 @@ pub enum BuildError { UriMissingScheme, } -#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[derive(Deserialize, Serialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct HecSinkConfig { pub token: String, @@ -43,11 +43,7 @@ pub struct HecSinkConfig { pub index: Option, pub sourcetype: Option