From e66e285cbd944bcb65b1262fb91bc1913b1885a6 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Thu, 29 Jun 2023 14:10:41 -0600 Subject: [PATCH] chore(sinks): Drop the custom `SinkContext::default` implementation (#17804) The `fn SinkContext::new_test` is actually just `SinkContext::default` in disguise, so drop the custom function in favour of the auto-derived implementation. --- src/config/sink.rs | 12 +------- src/sinks/amqp/integration_tests.rs | 4 +-- src/sinks/appsignal/mod.rs | 2 +- .../aws_cloudwatch_logs/integration_tests.rs | 12 ++++---- .../integration_tests.rs | 4 +-- .../aws_kinesis/firehose/integration_tests.rs | 2 +- src/sinks/aws_kinesis/firehose/tests.rs | 4 +-- .../aws_kinesis/streams/integration_tests.rs | 4 +-- src/sinks/aws_s3/integration_tests.rs | 18 +++++------ src/sinks/axiom.rs | 2 +- src/sinks/azure_monitor_logs.rs | 6 ++-- src/sinks/clickhouse/integration_tests.rs | 12 ++++---- src/sinks/databend/integration_tests.rs | 2 +- src/sinks/elasticsearch/common.rs | 2 +- src/sinks/elasticsearch/integration_tests.rs | 6 ++-- src/sinks/gcp/chronicle_unstructured.rs | 2 +- src/sinks/gcp/cloud_storage.rs | 2 +- src/sinks/gcp/pubsub.rs | 4 +-- src/sinks/gcp/stackdriver_logs.rs | 4 +-- src/sinks/gcp/stackdriver_metrics.rs | 2 +- src/sinks/honeycomb.rs | 2 +- src/sinks/http.rs | 8 ++--- src/sinks/humio/logs.rs | 8 ++--- src/sinks/influxdb/logs.rs | 2 +- src/sinks/influxdb/metrics.rs | 4 +-- src/sinks/new_relic/tests.rs | 2 +- src/sinks/papertrail.rs | 2 +- src/sinks/prometheus/exporter.rs | 10 +++---- src/sinks/prometheus/remote_write.rs | 4 +-- src/sinks/socket.rs | 8 ++--- .../splunk_hec/logs/integration_tests.rs | 30 +++++++++---------- src/sinks/splunk_hec/logs/tests.rs | 2 +- .../splunk_hec/metrics/integration_tests.rs | 8 ++--- src/sinks/splunk_hec/metrics/tests.rs | 2 +- src/sinks/statsd/tests.rs | 2 +- src/sinks/util/test.rs | 2 +- src/sinks/vector/mod.rs | 4 +-- src/sinks/webhdfs/integration_tests.rs | 4 +-- src/sinks/websocket/sink.rs | 4 +-- src/sources/prometheus/remote_write.rs | 4 +-- src/sources/splunk_hec/mod.rs | 2 +- src/sources/vector/mod.rs | 2 +- 42 files changed, 106 insertions(+), 116 deletions(-) diff --git a/src/config/sink.rs b/src/config/sink.rs index c0d6aba694b07..bf865eec40463 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -235,7 +235,7 @@ pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync dyn_clone::clone_trait_object!(SinkConfig); -#[derive(Debug, Clone)] +#[derive(Clone, Debug, Default)] pub struct SinkContext { pub healthcheck: SinkHealthcheckOptions, pub globals: GlobalOptions, @@ -244,16 +244,6 @@ pub struct SinkContext { } impl SinkContext { - #[cfg(test)] - pub fn new_test() -> Self { - Self { - healthcheck: SinkHealthcheckOptions::default(), - globals: GlobalOptions::default(), - proxy: ProxyConfig::default(), - schema: schema::Options::default(), - } - } - pub const fn globals(&self) -> &GlobalOptions { &self.globals } diff --git a/src/sinks/amqp/integration_tests.rs b/src/sinks/amqp/integration_tests.rs index ba8a9ee557022..08db9c6e46b27 100644 --- a/src/sinks/amqp/integration_tests.rs +++ b/src/sinks/amqp/integration_tests.rs @@ -72,7 +72,7 @@ async fn amqp_happy_path() { .await .unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, healthcheck) = config.build(cx).await.unwrap(); healthcheck.await.expect("Health check failed"); @@ -153,7 +153,7 @@ async fn amqp_round_trip() { .await .unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (amqp_sink, healthcheck) = config.build(cx).await.unwrap(); healthcheck.await.expect("Health check failed"); diff --git a/src/sinks/appsignal/mod.rs b/src/sinks/appsignal/mod.rs index 32c1247192368..31cb7a8e2ecc2 100644 --- a/src/sinks/appsignal/mod.rs +++ b/src/sinks/appsignal/mod.rs @@ -251,7 +251,7 @@ mod test { .expect("config should be valid"); config.endpoint = mock_endpoint.to_string(); - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let event = Event::Log(LogEvent::from("simple message")); diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index ece20f2fa1311..c0dbf0204260a 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -51,7 +51,7 @@ async fn cloudwatch_insert_log_event() { acknowledgements: Default::default(), }; - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let timestamp = chrono::Utc::now(); @@ -101,7 +101,7 @@ async fn cloudwatch_insert_log_events_sorted() { acknowledgements: Default::default(), }; - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let timestamp = chrono::Utc::now() - Duration::days(1); @@ -176,7 +176,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() { acknowledgements: Default::default(), }; - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let now = chrono::Utc::now(); @@ -255,7 +255,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() { acknowledgements: Default::default(), }; - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let timestamp = chrono::Utc::now(); @@ -310,7 +310,7 @@ async fn cloudwatch_insert_log_event_batched() { acknowledgements: Default::default(), }; - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let timestamp = chrono::Utc::now(); @@ -360,7 +360,7 @@ async fn cloudwatch_insert_log_event_partitioned() { acknowledgements: Default::default(), }; - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let timestamp = chrono::Utc::now(); diff --git a/src/sinks/aws_cloudwatch_metrics/integration_tests.rs b/src/sinks/aws_cloudwatch_metrics/integration_tests.rs index 57d88e704dacd..3af1c28597fc3 100644 --- a/src/sinks/aws_cloudwatch_metrics/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_metrics/integration_tests.rs @@ -35,7 +35,7 @@ async fn cloudwatch_metrics_healthcheck() { #[tokio::test] async fn cloudwatch_metrics_put_data() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = config(); let client = config.create_client(&cx.globals.proxy).await.unwrap(); let sink = CloudWatchMetricsSvc::new(config, client).unwrap(); @@ -94,7 +94,7 @@ async fn cloudwatch_metrics_put_data() { #[tokio::test] async fn cloudwatch_metrics_namespace_partitioning() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = config(); let client = config.create_client(&cx.globals.proxy).await.unwrap(); let sink = CloudWatchMetricsSvc::new(config, client).unwrap(); diff --git a/src/sinks/aws_kinesis/firehose/integration_tests.rs b/src/sinks/aws_kinesis/firehose/integration_tests.rs index 42d72009cace6..648d5082a1ad7 100644 --- a/src/sinks/aws_kinesis/firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis/firehose/integration_tests.rs @@ -62,7 +62,7 @@ async fn firehose_put_records() { let config = KinesisFirehoseSinkConfig { batch, base }; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = config.build(cx).await.unwrap(); diff --git a/src/sinks/aws_kinesis/firehose/tests.rs b/src/sinks/aws_kinesis/firehose/tests.rs index 54c55d9efee1d..bb6b94729d344 100644 --- a/src/sinks/aws_kinesis/firehose/tests.rs +++ b/src/sinks/aws_kinesis/firehose/tests.rs @@ -39,7 +39,7 @@ async fn check_batch_size() { let config = KinesisFirehoseSinkConfig { batch, base }; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let res = config.build(cx).await; assert_eq!( @@ -69,7 +69,7 @@ async fn check_batch_events() { let config = KinesisFirehoseSinkConfig { batch, base }; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let res = config.build(cx).await; assert_eq!( diff --git a/src/sinks/aws_kinesis/streams/integration_tests.rs b/src/sinks/aws_kinesis/streams/integration_tests.rs index 6f25a733d7d08..a800ff2b2960a 100644 --- a/src/sinks/aws_kinesis/streams/integration_tests.rs +++ b/src/sinks/aws_kinesis/streams/integration_tests.rs @@ -52,7 +52,7 @@ fn kinesis_address() -> String { // base, // }; // -// let cx = SinkContext::new_test(); +// let cx = SinkContext::default(); // // let sink = config.build(cx).await.unwrap().0; // @@ -107,7 +107,7 @@ async fn kinesis_put_records_without_partition_key() { base, }; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let sink = config.build(cx).await.unwrap().0; diff --git a/src/sinks/aws_s3/integration_tests.rs b/src/sinks/aws_s3/integration_tests.rs index e9ae49a4db181..5a5cfeb3248df 100644 --- a/src/sinks/aws_s3/integration_tests.rs +++ b/src/sinks/aws_s3/integration_tests.rs @@ -51,7 +51,7 @@ fn s3_address() -> String { #[tokio::test] async fn s3_insert_message_into_with_flat_key_prefix() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let bucket = uuid::Uuid::new_v4().to_string(); @@ -85,7 +85,7 @@ async fn s3_insert_message_into_with_flat_key_prefix() { #[tokio::test] async fn s3_insert_message_into_with_folder_key_prefix() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let bucket = uuid::Uuid::new_v4().to_string(); @@ -119,7 +119,7 @@ async fn s3_insert_message_into_with_folder_key_prefix() { #[tokio::test] async fn s3_insert_message_into_with_ssekms_key_id() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let bucket = uuid::Uuid::new_v4().to_string(); @@ -156,7 +156,7 @@ async fn s3_insert_message_into_with_ssekms_key_id() { #[tokio::test] async fn s3_rotate_files_after_the_buffer_size_is_reached() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let bucket = uuid::Uuid::new_v4().to_string(); @@ -213,7 +213,7 @@ async fn s3_gzip() { // to 1000, and using gzip compression. We test to ensure that all of the keys we end up // writing represent the sum total of the lines: we expect 3 batches, each of which should // have 1000 lines. - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let bucket = uuid::Uuid::new_v4().to_string(); @@ -258,7 +258,7 @@ async fn s3_zstd() { // to 1000, and using zstd compression. We test to ensure that all of the keys we end up // writing represent the sum total of the lines: we expect 3 batches, each of which should // have 1000 lines. - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let bucket = uuid::Uuid::new_v4().to_string(); @@ -303,7 +303,7 @@ async fn s3_zstd() { // https://github.com/localstack/localstack/issues/4166 #[tokio::test] async fn s3_insert_message_into_object_lock() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let bucket = uuid::Uuid::new_v4().to_string(); @@ -357,7 +357,7 @@ async fn s3_insert_message_into_object_lock() { #[tokio::test] async fn acknowledges_failures() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let bucket = uuid::Uuid::new_v4().to_string(); @@ -408,7 +408,7 @@ async fn s3_healthchecks_invalid_bucket() { #[tokio::test] async fn s3_flush_on_exhaustion() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let bucket = uuid::Uuid::new_v4().to_string(); create_bucket(&bucket, false).await; diff --git a/src/sinks/axiom.rs b/src/sinks/axiom.rs index b94c7f6e7f69c..54fa54025430a 100644 --- a/src/sinks/axiom.rs +++ b/src/sinks/axiom.rs @@ -162,7 +162,7 @@ mod integration_tests { assert!(!token.is_empty(), "$AXIOM_TOKEN required"); let dataset = env::var("AXIOM_DATASET").unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = AxiomConfig { url: Some(url.clone()), diff --git a/src/sinks/azure_monitor_logs.rs b/src/sinks/azure_monitor_logs.rs index aef40146c8d2a..03ef6de56c267 100644 --- a/src/sinks/azure_monitor_logs.rs +++ b/src/sinks/azure_monitor_logs.rs @@ -459,7 +459,7 @@ mod tests { default_headers: HeaderMap::new(), }; - let context = SinkContext::new_test(); + let context = SinkContext::default(); let client = HttpClient::new(None, &context.proxy).expect("should not fail to create HTTP client"); @@ -617,7 +617,7 @@ mod tests { "#, ) .unwrap(); - if config.build(SinkContext::new_test()).await.is_ok() { + if config.build(SinkContext::default()).await.is_ok() { panic!("config.build failed to error"); } } @@ -657,7 +657,7 @@ mod tests { "#, ) .unwrap(); - if config.build(SinkContext::new_test()).await.is_ok() { + if config.build(SinkContext::default()).await.is_ok() { panic!("config.build failed to error"); } } diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index 1061b8ddf8e11..21acee5aecc15 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -63,7 +63,7 @@ async fn insert_events() { ) .await; - let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let (mut input_event, mut receiver) = make_event(); input_event @@ -114,7 +114,7 @@ async fn skip_unknown_fields() { .create_table(&table, "host String, timestamp String, message String") .await; - let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let (mut input_event, mut receiver) = make_event(); input_event.as_mut_log().insert("unknown", "mysteries"); @@ -167,7 +167,7 @@ async fn insert_events_unix_timestamps() { ) .await; - let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let (mut input_event, _receiver) = make_event(); @@ -235,7 +235,7 @@ timestamp_format = "unix""#, ) .await; - let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let (mut input_event, _receiver) = make_event(); @@ -298,7 +298,7 @@ async fn no_retry_on_incorrect_data() { .create_table(&table, "host String, timestamp String") .await; - let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let (input_event, mut receiver) = make_event(); @@ -340,7 +340,7 @@ async fn no_retry_on_incorrect_data_warp() { batch, ..Default::default() }; - let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let (input_event, mut receiver) = make_event(); diff --git a/src/sinks/databend/integration_tests.rs b/src/sinks/databend/integration_tests.rs index b8c647c6f7d2d..5438775a4d385 100644 --- a/src/sinks/databend/integration_tests.rs +++ b/src/sinks/databend/integration_tests.rs @@ -124,7 +124,7 @@ async fn insert_event_with_cfg(cfg: String, table: String, client: DatabendAPICl .unwrap(); let (config, _) = load_sink::(&cfg).unwrap(); - let (sink, _hc) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); let (input_event, mut receiver) = make_event(); run_and_assert_sink_compliance( diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index f1a90ac4c9576..d5c5b214b5449 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -238,7 +238,7 @@ impl ElasticsearchCommon { #[cfg(test)] pub async fn parse_single(config: &ElasticsearchConfig) -> crate::Result { let mut commons = - Self::parse_many(config, crate::config::SinkContext::new_test().proxy()).await?; + Self::parse_many(config, crate::config::SinkContext::default().proxy()).await?; assert_eq!(commons.len(), 1); Ok(commons.remove(0)) } diff --git a/src/sinks/elasticsearch/integration_tests.rs b/src/sinks/elasticsearch/integration_tests.rs index 733cc1acaca22..023371e6f4fe9 100644 --- a/src/sinks/elasticsearch/integration_tests.rs +++ b/src/sinks/elasticsearch/integration_tests.rs @@ -146,7 +146,7 @@ async fn structures_events_correctly() { .expect("Config error"); let base_url = common.base_url.clone(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _hc) = config.build(cx.clone()).await.unwrap(); let (batch, mut receiver) = BatchNotifier::new_with_receiver(); @@ -555,7 +555,7 @@ async fn run_insert_tests_with_config( }; let base_url = common.base_url.clone(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, healthcheck) = config .build(cx.clone()) .await @@ -639,7 +639,7 @@ async fn run_insert_tests_with_config( } async fn run_insert_tests_with_multiple_endpoints(config: &ElasticsearchConfig) { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let commons = ElasticsearchCommon::parse_many(config, cx.proxy()) .await .expect("Config error"); diff --git a/src/sinks/gcp/chronicle_unstructured.rs b/src/sinks/gcp/chronicle_unstructured.rs index ddf70f31e4e25..36a537cb548b6 100644 --- a/src/sinks/gcp/chronicle_unstructured.rs +++ b/src/sinks/gcp/chronicle_unstructured.rs @@ -541,7 +541,7 @@ mod integration_tests { log_type: &str, auth_path: &str, ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); config(log_type, auth_path).build(cx).await } diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index 5b172586ee0cf..ff4f6bb378a21 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -429,7 +429,7 @@ mod tests { async fn component_spec_compliance() { let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - let context = SinkContext::new_test(); + let context = SinkContext::default(); let tls = TlsSettings::default(); let client = diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index a950c3dc46dab..5d2dc458d499a 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -260,7 +260,7 @@ mod tests { encoding.codec = "json" "#}) .unwrap(); - if config.build(SinkContext::new_test()).await.is_ok() { + if config.build(SinkContext::default()).await.is_ok() { panic!("config.build failed to error"); } } @@ -302,7 +302,7 @@ mod integration_tests { } async fn config_build(topic: &str) -> (VectorSink, crate::sinks::Healthcheck) { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); config(topic).build(cx).await.expect("Building sink failed") } diff --git a/src/sinks/gcp/stackdriver_logs.rs b/src/sinks/gcp/stackdriver_logs.rs index e79287429071d..fb24bce81a40b 100644 --- a/src/sinks/gcp/stackdriver_logs.rs +++ b/src/sinks/gcp/stackdriver_logs.rs @@ -451,7 +451,7 @@ mod tests { config.auth.api_key = Some("fake".to_string().into()); config.endpoint = mock_endpoint.to_string(); - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let event = Event::Log(LogEvent::from("simple message")); @@ -659,7 +659,7 @@ mod tests { resource.namespace = "office" "#}) .unwrap(); - if config.build(SinkContext::new_test()).await.is_ok() { + if config.build(SinkContext::default()).await.is_ok() { panic!("config.build failed to error"); } } diff --git a/src/sinks/gcp/stackdriver_metrics.rs b/src/sinks/gcp/stackdriver_metrics.rs index c18a8318b7f79..faa6989b80bd5 100644 --- a/src/sinks/gcp/stackdriver_metrics.rs +++ b/src/sinks/gcp/stackdriver_metrics.rs @@ -295,7 +295,7 @@ mod tests { config.auth.api_key = Some("fake".to_string().into()); config.endpoint = mock_endpoint.to_string(); - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let event = Event::Metric(Metric::new( diff --git a/src/sinks/honeycomb.rs b/src/sinks/honeycomb.rs index 559a1f4a5bf85..6cba7079a84e0 100644 --- a/src/sinks/honeycomb.rs +++ b/src/sinks/honeycomb.rs @@ -245,7 +245,7 @@ mod test { .expect("config should be valid"); config.endpoint = mock_endpoint.to_string(); - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let event = Event::Log(LogEvent::from("simple message")); diff --git a/src/sinks/http.rs b/src/sinks/http.rs index 8f7d0fdf8b633..2b0b5e7b36f26 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -627,7 +627,7 @@ mod tests { "#; let config: HttpSinkConfig = toml::from_str(config).unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); _ = config.build(cx).await.unwrap(); } @@ -861,7 +861,7 @@ mod tests { let config: HttpSinkConfig = toml::from_str(&config).unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = config.build(cx).await.unwrap(); let (rx, trigger, server) = build_test_server(in_addr); @@ -922,7 +922,7 @@ mod tests { let config: HttpSinkConfig = toml::from_str(&config).unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = config.build(cx).await.unwrap(); let (rx, trigger, server) = build_test_server(in_addr); @@ -1029,7 +1029,7 @@ mod tests { ); let config: HttpSinkConfig = toml::from_str(&config).unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = config.build(cx).await.unwrap(); (in_addr, sink) diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 6e30e66f29ee9..b87446ff015f7 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -251,7 +251,7 @@ mod integration_tests { async fn humio_insert_message() { wait_ready().await; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let repo = create_repository().await; @@ -301,7 +301,7 @@ mod integration_tests { async fn humio_insert_source() { wait_ready().await; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let repo = create_repository().await; @@ -337,7 +337,7 @@ mod integration_tests { let mut config = config(&repo.default_ingest_token); config.event_type = Template::try_from("json".to_string()).ok(); - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let message = random_string(100); let mut event = LogEvent::from(message.clone()); @@ -363,7 +363,7 @@ mod integration_tests { { let config = config(&repo.default_ingest_token); - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let message = random_string(100); let event = LogEvent::from(message.clone()); diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index fe2e44b950368..ef901ff4e66aa 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -906,7 +906,7 @@ mod integration_tests { let now = Utc::now(); let measure = format!("vector-{}", now.timestamp_nanos()); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = InfluxDbLogsConfig { namespace: None, diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index 5c6f98856198a..68dd5182cadf1 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -995,7 +995,7 @@ mod integration_tests { crate::test_util::trace_init(); let database = onboarding_v1(url).await; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = InfluxDbConfig { endpoint: url.to_string(), @@ -1090,7 +1090,7 @@ mod integration_tests { let endpoint = address_v2(); onboarding_v2(&endpoint).await; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = InfluxDbConfig { endpoint, diff --git a/src/sinks/new_relic/tests.rs b/src/sinks/new_relic/tests.rs index ae56843862f09..51be807831f56 100644 --- a/src/sinks/new_relic/tests.rs +++ b/src/sinks/new_relic/tests.rs @@ -28,7 +28,7 @@ async fn component_spec_compliance() { .expect("config should be valid"); config.override_uri = Some(mock_endpoint); - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let event = Event::Log(LogEvent::from("simple message")); diff --git a/src/sinks/papertrail.rs b/src/sinks/papertrail.rs index f3d3afb55ace0..bf2c45c6710b1 100644 --- a/src/sinks/papertrail.rs +++ b/src/sinks/papertrail.rs @@ -213,7 +213,7 @@ mod tests { config.endpoint = mock_endpoint.into(); config.tls = Some(TlsEnableableConfig::default()); - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let event = Event::Log(LogEvent::from("simple message")); diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index bb7e6c775b629..9ba90c8fb1610 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -894,7 +894,7 @@ mod tests { let mut receiver = BatchNotifier::apply_to(&mut events[..]); assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty)); - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4); let sink_handle = tokio::spawn(run_and_assert_sink_compliance( sink, @@ -958,7 +958,7 @@ mod tests { let mut receiver = BatchNotifier::apply_to(&mut events[..]); assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty)); - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4); let sink_handle = tokio::spawn(run_and_assert_sink_compliance( sink, @@ -1422,7 +1422,7 @@ mod integration_tests { flush_period_secs: Duration::from_secs(2), ..Default::default() }; - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let (name, event) = tests::create_metric_gauge(None, 123.4); let (_, delayed_event) = tests::create_metric_gauge(Some("delayed".to_string()), 123.4); @@ -1460,7 +1460,7 @@ mod integration_tests { flush_period_secs: Duration::from_secs(3), ..Default::default() }; - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let (tx, rx) = mpsc::unbounded_channel(); let input_events = UnboundedReceiverStream::new(rx); @@ -1517,7 +1517,7 @@ mod integration_tests { flush_period_secs: Duration::from_secs(3), ..Default::default() }; - let (sink, _) = config.build(SinkContext::new_test()).await.unwrap(); + let (sink, _) = config.build(SinkContext::default()).await.unwrap(); let (tx, rx) = mpsc::unbounded_channel(); let input_events = UnboundedReceiverStream::new(rx); diff --git a/src/sinks/prometheus/remote_write.rs b/src/sinks/prometheus/remote_write.rs index 1418cc2627924..a1e9b792fed4d 100644 --- a/src/sinks/prometheus/remote_write.rs +++ b/src/sinks/prometheus/remote_write.rs @@ -624,7 +624,7 @@ mod tests { let config = format!("endpoint = \"http://{}/write\"\n{}", addr, config); let config: RemoteWriteConfig = toml::from_str(&config).unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = config.build(cx).await.unwrap(); sink.run_events(events).await.unwrap(); @@ -709,7 +709,7 @@ mod integration_tests { assert_sink_compliance(&HTTP_SINK_TAGS, async { let database = onboarding_v1(url).await; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = RemoteWriteConfig { endpoint: format!("{}/api/v1/prom/write?db={}", url, database), diff --git a/src/sinks/socket.rs b/src/sinks/socket.rs index f22339497b47e..c6947f7658512 100644 --- a/src/sinks/socket.rs +++ b/src/sinks/socket.rs @@ -201,7 +201,7 @@ mod test { acknowledgements: Default::default(), }; - let context = SinkContext::new_test(); + let context = SinkContext::default(); assert_sink_compliance(&SINK_TAGS, async move { let (sink, _healthcheck) = config.build(context).await.unwrap(); @@ -256,7 +256,7 @@ mod test { let (lines, events) = random_lines_with_stream(10, 100, None); assert_sink_compliance(&SINK_TAGS, async move { - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); sink.run(events).await @@ -333,7 +333,7 @@ mod test { }), acknowledgements: Default::default(), }; - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let (mut sender, receiver) = mpsc::channel::>(0); let jh1 = tokio::spawn(async move { @@ -453,7 +453,7 @@ mod test { acknowledgements: Default::default(), }; - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let (_, events) = random_lines_with_stream(1000, 10000, None); diff --git a/src/sinks/splunk_hec/logs/integration_tests.rs b/src/sinks/splunk_hec/logs/integration_tests.rs index ece387f7f89d4..0510d2dbe9721 100644 --- a/src/sinks/splunk_hec/logs/integration_tests.rs +++ b/src/sinks/splunk_hec/logs/integration_tests.rs @@ -127,7 +127,7 @@ async fn config(encoding: EncodingConfig, indexed_fields: Vec) -> HecLog #[tokio::test] async fn splunk_insert_message() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = config(TextSerializerConfig::default().into(), vec![]).await; let (sink, _) = config.build(cx).await.unwrap(); @@ -147,7 +147,7 @@ async fn splunk_insert_message() { #[tokio::test] async fn splunk_insert_raw_message() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = HecLogsSinkConfig { endpoint_target: EndpointTarget::Raw, @@ -172,7 +172,7 @@ async fn splunk_insert_raw_message() { #[tokio::test] async fn splunk_insert_broken_token() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let mut config = config(TextSerializerConfig::default().into(), vec![]).await; config.default_token = "BROKEN_TOKEN".to_string().into(); @@ -188,7 +188,7 @@ async fn splunk_insert_broken_token() { #[tokio::test] async fn splunk_insert_source() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let mut config = config(TextSerializerConfig::default().into(), vec![]).await; config.source = Template::try_from("/var/log/syslog".to_string()).ok(); @@ -206,7 +206,7 @@ async fn splunk_insert_source() { #[tokio::test] async fn splunk_insert_index() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let mut config = config(TextSerializerConfig::default().into(), vec![]).await; config.index = Template::try_from("custom_index".to_string()).ok(); @@ -223,7 +223,7 @@ async fn splunk_insert_index() { #[tokio::test] async fn splunk_index_is_interpolated() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let indexed_fields = vec!["asdf".to_string()]; let mut config = config(JsonSerializerConfig::default().into(), indexed_fields).await; @@ -244,7 +244,7 @@ async fn splunk_index_is_interpolated() { #[tokio::test] async fn splunk_insert_many() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = config(TextSerializerConfig::default().into(), vec![]).await; let (sink, _) = config.build(cx).await.unwrap(); @@ -257,7 +257,7 @@ async fn splunk_insert_many() { #[tokio::test] async fn splunk_custom_fields() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let indexed_fields = vec!["asdf".into()]; let config = config(JsonSerializerConfig::default().into(), indexed_fields).await; @@ -277,7 +277,7 @@ async fn splunk_custom_fields() { #[tokio::test] async fn splunk_hostname() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let indexed_fields = vec!["asdf".into()]; let config = config(JsonSerializerConfig::default().into(), indexed_fields).await; @@ -300,7 +300,7 @@ async fn splunk_hostname() { #[tokio::test] async fn splunk_sourcetype() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let indexed_fields = vec!["asdf".to_string()]; let mut config = config(JsonSerializerConfig::default().into(), indexed_fields).await; @@ -324,7 +324,7 @@ async fn splunk_sourcetype() { #[tokio::test] async fn splunk_configure_hostname() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = HecLogsSinkConfig { host_key: "roast".into(), @@ -355,7 +355,7 @@ async fn splunk_configure_hostname() { #[tokio::test] async fn splunk_indexer_acknowledgements() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let acknowledgements_config = HecClientAcknowledgementsConfig { query_interval: NonZeroU8::new(1).unwrap(), @@ -385,7 +385,7 @@ async fn splunk_indexer_acknowledgements() { #[tokio::test] async fn splunk_indexer_acknowledgements_disabled_on_server() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = config( JsonSerializerConfig::default().into(), @@ -414,7 +414,7 @@ async fn splunk_auto_extracted_timestamp() { .map(|version| !version.starts_with("7.")) .unwrap_or(true) { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = HecLogsSinkConfig { auto_extract_timestamp: Some(true), @@ -467,7 +467,7 @@ async fn splunk_non_auto_extracted_timestamp() { .map(|version| !version.starts_with("7.")) .unwrap_or(true) { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let config = HecLogsSinkConfig { auto_extract_timestamp: Some(false), diff --git a/src/sinks/splunk_hec/logs/tests.rs b/src/sinks/splunk_hec/logs/tests.rs index 5f6bf777d0d40..0e5d4956e36bf 100644 --- a/src/sinks/splunk_hec/logs/tests.rs +++ b/src/sinks/splunk_hec/logs/tests.rs @@ -218,7 +218,7 @@ async fn splunk_passthrough_token() { auto_extract_timestamp: None, endpoint_target: EndpointTarget::Event, }; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = config.build(cx).await.unwrap(); diff --git a/src/sinks/splunk_hec/metrics/integration_tests.rs b/src/sinks/splunk_hec/metrics/integration_tests.rs index 6dc7f4fbf769a..8227e6431a9db 100644 --- a/src/sinks/splunk_hec/metrics/integration_tests.rs +++ b/src/sinks/splunk_hec/metrics/integration_tests.rs @@ -70,7 +70,7 @@ fn get_counter(batch: BatchNotifier) -> Event { #[tokio::test] async fn splunk_insert_counter_metric() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let mut config = config().await; config.index = Template::try_from("testmetrics".to_string()).ok(); @@ -93,7 +93,7 @@ async fn splunk_insert_counter_metric() { #[tokio::test] async fn splunk_insert_gauge_metric() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let mut config = config().await; config.index = Template::try_from("testmetrics".to_string()).ok(); @@ -116,7 +116,7 @@ async fn splunk_insert_gauge_metric() { #[tokio::test] async fn splunk_insert_multiple_counter_metrics() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let mut config = config().await; config.index = Template::try_from("testmetrics".to_string()).ok(); @@ -143,7 +143,7 @@ async fn splunk_insert_multiple_counter_metrics() { #[tokio::test] async fn splunk_insert_multiple_gauge_metrics() { - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let mut config = config().await; config.index = Template::try_from("testmetrics".to_string()).ok(); diff --git a/src/sinks/splunk_hec/metrics/tests.rs b/src/sinks/splunk_hec/metrics/tests.rs index dc05ff8bff1c9..7cfd251e307d9 100644 --- a/src/sinks/splunk_hec/metrics/tests.rs +++ b/src/sinks/splunk_hec/metrics/tests.rs @@ -330,7 +330,7 @@ async fn splunk_passthrough_token() { acknowledgements: Default::default(), default_namespace: None, }; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = config.build(cx).await.unwrap(); diff --git a/src/sinks/statsd/tests.rs b/src/sinks/statsd/tests.rs index 4c30a46b78551..b2311bd8fbec9 100644 --- a/src/sinks/statsd/tests.rs +++ b/src/sinks/statsd/tests.rs @@ -71,7 +71,7 @@ async fn test_send_to_statsd() { ]; let (tx, rx) = mpsc::channel(1); - let context = SinkContext::new_test(); + let context = SinkContext::default(); assert_sink_compliance(&SINK_TAGS, async move { let (sink, _healthcheck) = config.build(context).await.unwrap(); diff --git a/src/sinks/util/test.rs b/src/sinks/util/test.rs index b5b1a2ae461bd..9f19903f954be 100644 --- a/src/sinks/util/test.rs +++ b/src/sinks/util/test.rs @@ -20,7 +20,7 @@ where for<'a> T: Deserialize<'a> + SinkConfig, { let sink_config: T = toml::from_str(config)?; - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); Ok((sink_config, cx)) } diff --git a/src/sinks/vector/mod.rs b/src/sinks/vector/mod.rs index 99a4c5bf8a2ab..03d77611ad239 100644 --- a/src/sinks/vector/mod.rs +++ b/src/sinks/vector/mod.rs @@ -77,7 +77,7 @@ mod tests { let config = format!(r#"address = "http://{}/""#, in_addr); let config: VectorConfig = toml::from_str(&config).unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = config.build(cx).await.unwrap(); let (rx, trigger, server) = build_test_server_generic(in_addr, move || { @@ -121,7 +121,7 @@ mod tests { let config = format!(r#"address = "http://{}/""#, in_addr); let config: VectorConfig = toml::from_str(&config).unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = config.build(cx).await.unwrap(); let (_rx, trigger, server) = build_test_server_generic(in_addr, move || { diff --git a/src/sinks/webhdfs/integration_tests.rs b/src/sinks/webhdfs/integration_tests.rs index c4b64a60deb4e..419c3a7629bd5 100644 --- a/src/sinks/webhdfs/integration_tests.rs +++ b/src/sinks/webhdfs/integration_tests.rs @@ -24,7 +24,7 @@ async fn hdfs_healthchecks_invalid_node_node() { // Point to an invalid endpoint let config = config("http://127.0.0.1:1", 10); let (_, health_check) = config - .build(SinkContext::new_test()) + .build(SinkContext::default()) .await .expect("config build must with success"); let result = health_check.await; @@ -36,7 +36,7 @@ async fn hdfs_healthchecks_invalid_node_node() { async fn hdfs_healthchecks_valid_node_node() { let config = config(&webhdfs_endpoint(), 10); let (_, health_check) = config - .build(SinkContext::new_test()) + .build(SinkContext::default()) .await .expect("config build must with success"); let result = health_check.await; diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 07ced35ba2a59..6acbf99967b43 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -483,7 +483,7 @@ mod tests { let mut receiver = create_count_receiver(addr, tls.clone(), true, None); - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let (_lines, events) = random_lines_with_stream(10, 100, None); @@ -511,7 +511,7 @@ mod tests { ) { let mut receiver = create_count_receiver(addr, tls, false, auth); - let context = SinkContext::new_test(); + let context = SinkContext::default(); let (sink, _healthcheck) = config.build(context).await.unwrap(); let (lines, events) = random_lines_with_stream(10, 100, None); diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index 3b27770ba995b..fb8b84a3d724b 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -205,7 +205,7 @@ mod test { ..Default::default() }; let (sink, _) = sink - .build(SinkContext::new_test()) + .build(SinkContext::default()) .await .expect("Error building config."); @@ -299,7 +299,7 @@ mod test { ..Default::default() }; let (sink, _) = sink - .build(SinkContext::new_test()) + .build(SinkContext::default()) .await .expect("Error building config."); diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 0c465ffaef97e..222ffc2487041 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -1290,7 +1290,7 @@ mod tests { auto_extract_timestamp: None, endpoint_target: Default::default(), } - .build(SinkContext::new_test()) + .build(SinkContext::default()) .await .unwrap() } diff --git a/src/sources/vector/mod.rs b/src/sources/vector/mod.rs index a6fdaa494d60e..44b0921d75bcd 100644 --- a/src/sources/vector/mod.rs +++ b/src/sources/vector/mod.rs @@ -299,7 +299,7 @@ mod tests { // but the sink side already does such a test and this is good // to ensure interoperability. let sink: SinkConfig = toml::from_str(vector_source_config_str).unwrap(); - let cx = SinkContext::new_test(); + let cx = SinkContext::default(); let (sink, _) = sink.build(cx).await.unwrap(); let (mut events, stream) = test_util::random_events_with_stream(100, 100, None);