From 42d88572cc81d0cb58ce94d6e75d7029dc91db6e Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Tue, 4 Jul 2023 11:25:34 +0100 Subject: [PATCH 01/12] Test loki sink for data volume tags Signed-off-by: Stephen Wakely --- src/sinks/loki/integration_tests.rs | 35 +++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/src/sinks/loki/integration_tests.rs b/src/sinks/loki/integration_tests.rs index 245c8ebcb2c7b..bce55e8952a7c 100644 --- a/src/sinks/loki/integration_tests.rs +++ b/src/sinks/loki/integration_tests.rs @@ -7,7 +7,7 @@ use futures::stream; use lookup::owned_value_path; use vector_common::encode_logfmt; use vector_core::{ - config::LogNamespace, + config::{init_telemetry, LogNamespace, Tags, Telemetry}, event::{BatchNotifier, BatchStatus, Event, LogEvent}, }; use vrl::value::{kind::Collection, Kind}; @@ -20,7 +20,10 @@ use crate::{ sinks::{util::test::load_sink, VectorSink}, template::Template, test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, + components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, SINK_TAGS, + }, generate_events_with_stream, generate_lines_with_stream, random_lines, }, }; @@ -151,6 +154,34 @@ async fn text() { } } +#[tokio::test] +async fn data_volume_tags() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + + let (stream, sink) = build_sink("text", false).await; + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (lines, events) = generate_lines_with_stream(line_generator, 10, Some(batch)); + run_and_assert_data_volume_sink_compliance(sink, events, &DATA_VOLUME_SINK_TAGS).await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + tokio::time::sleep(tokio::time::Duration::new(1, 0)).await; + + let (_, outputs) = fetch_stream(stream.to_string(), "default").await; + assert_eq!(lines.len(), outputs.len()); + for (i, output) in outputs.iter().enumerate() { + assert_eq!(output, &lines[i]); + } +} + #[tokio::test] async fn namespaced_timestamp() { let (stream, sink) = build_sink("json", true).await; From 0d8fedb9559d5c429f1d9d84de2e161f112249ee Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 12 Jul 2023 13:46:27 +0100 Subject: [PATCH 02/12] Add new relic test Signed-off-by: Stephen Wakely --- src/sinks/new_relic/tests.rs | 39 +++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/src/sinks/new_relic/tests.rs b/src/sinks/new_relic/tests.rs index 51be807831f56..f977b58f0177c 100644 --- a/src/sinks/new_relic/tests.rs +++ b/src/sinks/new_relic/tests.rs @@ -3,13 +3,17 @@ use std::{collections::HashMap, convert::TryFrom, time::SystemTime}; use chrono::{DateTime, Utc}; use futures::{future::ready, stream}; use serde::Deserialize; +use vector_core::config::{init_telemetry, Tags, Telemetry}; use super::*; use crate::{ config::{GenerateConfig, SinkConfig, SinkContext}, event::{Event, LogEvent, Metric, MetricKind, MetricValue, Value}, test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, + components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, SINK_TAGS, + }, http::{always_200_response, spawn_blackhole_http_server}, }, }; @@ -19,8 +23,7 @@ fn generate_config() { crate::test_util::test_generate_config::(); } -#[tokio::test] -async fn component_spec_compliance() { +async fn sink() -> (VectorSink, Event) { let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; let config = NewRelicConfig::generate_config().to_string(); @@ -32,9 +35,39 @@ async fn component_spec_compliance() { let (sink, _healthcheck) = config.build(context).await.unwrap(); let event = Event::Log(LogEvent::from("simple message")); + + (sink, event) +} + +#[tokio::test] +async fn component_spec_compliance() { + let (sink, event) = sink().await; run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; } +#[tokio::test] +async fn component_spec_compliance_data_volume() { + // We need to configure Vector to emit the service and source tags. + // The default is to not emit these. + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + + let (sink, event) = sink().await; + run_and_assert_data_volume_sink_compliance( + sink, + stream::once(ready(event)), + &DATA_VOLUME_SINK_TAGS, + ) + .await; +} + #[test] fn generate_event_api_model() { // Without message field From c0e87a185e081fe48b31d2161baa09bd100e7a51 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 12 Jul 2023 15:37:23 +0100 Subject: [PATCH 03/12] Add elastic search test Signed-off-by: Stephen Wakely --- .../datadog/metrics/integration_tests.rs | 65 ++++++----- src/sinks/elasticsearch/integration_tests.rs | 105 +++++++++++++----- 2 files changed, 112 insertions(+), 58 deletions(-) diff --git a/src/sinks/datadog/metrics/integration_tests.rs b/src/sinks/datadog/metrics/integration_tests.rs index 572b304db8f3e..bd03e6e39671c 100644 --- a/src/sinks/datadog/metrics/integration_tests.rs +++ b/src/sinks/datadog/metrics/integration_tests.rs @@ -11,7 +11,10 @@ use crate::{ config::SinkConfig, sinks::util::test::{build_test_server_status, load_sink}, test_util::{ - components::{assert_sink_compliance, SINK_TAGS}, + components::{ + assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS, + SINK_TAGS, + }, map_event_batch_stream, next_addr, }, }; @@ -168,35 +171,41 @@ async fn smoke() { } } -#[tokio::test] -async fn real_endpoint() { - assert_sink_compliance(&SINK_TAGS, async { - let config = indoc! {r#" +async fn run_sink() { + let config = indoc! {r#" default_api_key = "${TEST_DATADOG_API_KEY}" default_namespace = "fake.test.integration" "#}; - let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap(); - assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required"); - let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key); - let (config, cx) = load_sink::(config.as_str()).unwrap(); - - let (sink, _) = config.build(cx).await.unwrap(); - let (batch, receiver) = BatchNotifier::new_with_receiver(); - let events: Vec<_> = (0..10) - .map(|index| { - Event::Metric(Metric::new( - "counter", - MetricKind::Absolute, - MetricValue::Counter { - value: index as f64, - }, - )) - }) - .collect(); - let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); + let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap(); + assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required"); + let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key); + let (config, cx) = load_sink::(config.as_str()).unwrap(); - sink.run(stream).await.unwrap(); - assert_eq!(receiver.await, BatchStatus::Delivered); - }) - .await; + let (sink, _) = config.build(cx).await.unwrap(); + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let events: Vec<_> = (0..10) + .map(|index| { + Event::Metric(Metric::new( + "counter", + MetricKind::Absolute, + MetricValue::Counter { + value: index as f64, + }, + )) + }) + .collect(); + let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); + + sink.run(stream).await.unwrap(); + assert_eq!(receiver.await, BatchStatus::Delivered); +} + +#[tokio::test] +async fn real_endpoint() { + assert_sink_compliance(&SINK_TAGS, async { run_sink().await }).await; +} + +#[tokio::test] +async fn data_volume_tags() { + assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async { run_sink().await }).await; } diff --git a/src/sinks/elasticsearch/integration_tests.rs b/src/sinks/elasticsearch/integration_tests.rs index 023371e6f4fe9..5860576522a94 100644 --- a/src/sinks/elasticsearch/integration_tests.rs +++ b/src/sinks/elasticsearch/integration_tests.rs @@ -8,7 +8,7 @@ use futures::{future::ready, stream}; use http::{Request, StatusCode}; use serde_json::{json, Value}; use vector_core::{ - config::log_schema, + config::{init_telemetry, log_schema, Tags, Telemetry}, event::{BatchNotifier, BatchStatus, Event, LogEvent}, }; @@ -23,8 +23,8 @@ use crate::{ }, test_util::{ components::{ - run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS, - HTTP_SINK_TAGS, + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + run_and_assert_sink_error, COMPONENT_ERROR_TAGS, DATA_VOLUME_SINK_TAGS, HTTP_SINK_TAGS, }, random_events_with_stream, random_string, trace_init, }, @@ -288,7 +288,25 @@ async fn insert_events_over_http() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, + BatchStatus::Delivered, + ) + .await; +} + +#[tokio::test] +async fn insert_events_with_data_volume() { + trace_init(); + + run_insert_tests( + ElasticsearchConfig { + endpoints: vec![http_server()], + doc_type: "log_lines".into(), + compression: Compression::None, + batch: batch_settings(), + ..Default::default() + }, + TestType::DataVolume, BatchStatus::Delivered, ) .await; @@ -306,7 +324,7 @@ async fn insert_events_over_http_with_gzip_compression() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, BatchStatus::Delivered, ) .await; @@ -332,7 +350,7 @@ async fn insert_events_over_https() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, BatchStatus::Delivered, ) .await; @@ -355,7 +373,7 @@ async fn insert_events_on_aws() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, BatchStatus::Delivered, ) .await; @@ -379,7 +397,7 @@ async fn insert_events_on_aws_with_compression() { batch: batch_settings(), ..Default::default() }, - false, + TestType::Normal, BatchStatus::Delivered, ) .await; @@ -397,7 +415,7 @@ async fn insert_events_with_failure() { batch: batch_settings(), ..Default::default() }, - true, + TestType::Error, BatchStatus::Rejected, ) .await; @@ -415,7 +433,7 @@ async fn insert_events_with_failure_and_gzip_compression() { batch: batch_settings(), ..Default::default() }, - true, + TestType::Error, BatchStatus::Rejected, ) .await; @@ -449,7 +467,7 @@ async fn insert_events_in_data_stream() { .await .expect("Data stream creation error"); - run_insert_tests_with_config(&cfg, false, BatchStatus::Delivered).await; + run_insert_tests_with_config(&cfg, TestType::Normal, BatchStatus::Delivered).await; } #[tokio::test] @@ -512,14 +530,26 @@ async fn distributed_insert_events_failover() { async fn run_insert_tests( mut config: ElasticsearchConfig, - break_events: bool, + test_type: TestType, status: BatchStatus, ) { + if test_type == TestType::DataVolume { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + } + config.bulk = BulkConfig { index: gen_index(), ..Default::default() }; - run_insert_tests_with_config(&config, break_events, status).await; + run_insert_tests_with_config(&config, test_type, status).await; } fn create_http_client() -> reqwest::Client { @@ -537,9 +567,16 @@ fn create_http_client() -> reqwest::Client { .expect("Could not build HTTP client") } +#[derive(Eq, PartialEq)] +enum TestType { + Error, + DataVolume, + Normal, +} + async fn run_insert_tests_with_config( config: &ElasticsearchConfig, - break_events: bool, + test_type: TestType, batch_status: BatchStatus, ) { let common = ElasticsearchCommon::parse_single(config) @@ -565,22 +602,30 @@ async fn run_insert_tests_with_config( let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (input, events) = random_events_with_stream(100, 100, Some(batch)); - if break_events { - // Break all but the first event to simulate some kind of partial failure - let mut doit = false; - let events = events.map(move |mut events| { - if doit { - events.iter_logs_mut().for_each(|log| { - log.insert("_type", 1); - }); - } - doit = true; - events - }); + match test_type { + TestType::Error => { + // Break all but the first event to simulate some kind of partial failure + let mut doit = false; + let events = events.map(move |mut events| { + if doit { + events.iter_logs_mut().for_each(|log| { + log.insert("_type", 1); + }); + } + doit = true; + events + }); + + run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await; + } - run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await; - } else { - run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + TestType::DataVolume => { + run_and_assert_data_volume_sink_compliance(sink, events, &DATA_VOLUME_SINK_TAGS).await; + } + + TestType::Normal => { + run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + } } assert_eq!(receiver.try_recv(), Ok(batch_status)); @@ -607,7 +652,7 @@ async fn run_insert_tests_with_config( .or_else(|| response["hits"]["total"].as_u64()) .expect("Elasticsearch response does not include hits->total nor hits->total->value"); - if break_events { + if test_type == TestType::Error { assert_ne!(input.len() as u64, total); } else { assert_eq!(input.len() as u64, total); From c743937e5cd7aeba915de0256caf40d3d51fb1f5 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 12 Jul 2023 16:07:23 +0100 Subject: [PATCH 04/12] Add splunk logs tests Signed-off-by: Stephen Wakely --- .../splunk_hec/logs/integration_tests.rs | 81 ++++++++++++++++++- 1 file changed, 79 insertions(+), 2 deletions(-) diff --git a/src/sinks/splunk_hec/logs/integration_tests.rs b/src/sinks/splunk_hec/logs/integration_tests.rs index 0510d2dbe9721..17eec43316ecf 100644 --- a/src/sinks/splunk_hec/logs/integration_tests.rs +++ b/src/sinks/splunk_hec/logs/integration_tests.rs @@ -6,7 +6,10 @@ use futures::{future::ready, stream}; use lookup::lookup_v2::OptionalValuePath; use serde_json::Value as JsonValue; use tokio::time::{sleep, Duration}; -use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; +use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus, Event, LogEvent}, +}; use crate::{ codecs::EncodingConfig, @@ -25,7 +28,10 @@ use crate::{ }, template::Template, test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, HTTP_SINK_TAGS, + }, random_lines_with_stream, random_string, }, }; @@ -145,6 +151,45 @@ async fn splunk_insert_message() { assert!(entry.get("message").is_none()); } +fn enable_telemetry() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); +} + +#[tokio::test] +async fn splunk_insert_message_data_volume() { + enable_telemetry(); + + let cx = SinkContext::default(); + + let config = config(TextSerializerConfig::default().into(), vec![]).await; + let (sink, _) = config.build(cx).await.unwrap(); + + let message = random_string(100); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let event = LogEvent::from(message.clone()).with_batch_notifier(&batch); + drop(batch); + run_and_assert_data_volume_sink_compliance( + sink, + stream::once(ready(event)), + &DATA_VOLUME_SINK_TAGS, + ) + .await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let entry = find_entry(message.as_str()).await; + + assert_eq!(message, entry["_raw"].as_str().unwrap()); + assert!(entry.get("message").is_none()); +} + #[tokio::test] async fn splunk_insert_raw_message() { let cx = SinkContext::default(); @@ -170,6 +215,38 @@ async fn splunk_insert_raw_message() { assert!(entry.get("message").is_none()); } +#[tokio::test] +async fn splunk_insert_raw_message_data_volume() { + enable_telemetry(); + + let cx = SinkContext::default(); + + let config = HecLogsSinkConfig { + endpoint_target: EndpointTarget::Raw, + source: Some(Template::try_from("zork").unwrap()), + ..config(TextSerializerConfig::default().into(), vec![]).await + }; + let (sink, _) = config.build(cx).await.unwrap(); + + let message = random_string(100); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let event = LogEvent::from(message.clone()).with_batch_notifier(&batch); + drop(batch); + run_and_assert_data_volume_sink_compliance( + sink, + stream::once(ready(event)), + &DATA_VOLUME_SINK_TAGS, + ) + .await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let entry = find_entry(message.as_str()).await; + + assert_eq!(message, entry["_raw"].as_str().unwrap()); + assert_eq!("zork", entry[SOURCE_FIELD].as_str().unwrap()); + assert!(entry.get("message").is_none()); +} + #[tokio::test] async fn splunk_insert_broken_token() { let cx = SinkContext::default(); From 911592cdfec0819cb2f8b66bd81d1270ec704dae Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 12 Jul 2023 16:18:21 +0100 Subject: [PATCH 05/12] Add splunk metrics tests Signed-off-by: Stephen Wakely --- .../splunk_hec/metrics/integration_tests.rs | 54 +++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/src/sinks/splunk_hec/metrics/integration_tests.rs b/src/sinks/splunk_hec/metrics/integration_tests.rs index 8227e6431a9db..c4d7364736493 100644 --- a/src/sinks/splunk_hec/metrics/integration_tests.rs +++ b/src/sinks/splunk_hec/metrics/integration_tests.rs @@ -2,8 +2,11 @@ use std::convert::TryFrom; use futures::{future::ready, stream}; use serde_json::Value as JsonValue; -use vector_core::event::{BatchNotifier, BatchStatus, Event, MetricValue}; -use vector_core::metric_tags; +use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus, Event, MetricValue}, + metric_tags, +}; use super::config::HecMetricsSinkConfig; use crate::{ @@ -16,7 +19,10 @@ use crate::{ util::{BatchConfig, Compression, TowerRequestConfig}, }, template::Template, - test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + test_util::components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, HTTP_SINK_TAGS, + }, }; const USERNAME: &str = "admin"; @@ -91,6 +97,48 @@ async fn splunk_insert_counter_metric() { ); } +fn enable_telemetry() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); +} + +#[tokio::test] +async fn splunk_insert_counter_metric_data_volume() { + enable_telemetry(); + + let cx = SinkContext::default(); + + let mut config = config().await; + config.index = Template::try_from("testmetrics".to_string()).ok(); + let (sink, _) = config.build(cx).await.unwrap(); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let event = get_counter(batch.clone()); + drop(batch); + run_and_assert_data_volume_sink_compliance( + sink, + stream::once(ready(event)), + &DATA_VOLUME_SINK_TAGS, + ) + .await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + assert!( + metric_dimensions_exist( + "example-counter", + &["host", "source", "sourcetype", "tag_counter_test"], + ) + .await + ); +} + #[tokio::test] async fn splunk_insert_gauge_metric() { let cx = SinkContext::default(); From 5623fd7018a33e63acd0a1cd4bdd970967cde999 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 13 Jul 2023 14:20:34 +0100 Subject: [PATCH 06/12] Add vector test Signed-off-by: Stephen Wakely --- src/sinks/vector/mod.rs | 48 +++++++++++++++++++++++++++++++++---- src/sinks/vector/service.rs | 3 +++ 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/src/sinks/vector/mod.rs b/src/sinks/vector/mod.rs index 03d77611ad239..e3a6b2b1b918a 100644 --- a/src/sinks/vector/mod.rs +++ b/src/sinks/vector/mod.rs @@ -45,7 +45,10 @@ mod tests { use http::request::Parts; use hyper::Method; use prost::Message; - use vector_core::event::{BatchNotifier, BatchStatus}; + use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus}, + }; use super::config::with_default_scheme; use super::*; @@ -55,7 +58,10 @@ mod tests { proto::vector as proto, sinks::util::test::build_test_server_generic, test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + components::{ + run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, + DATA_VOLUME_SINK_TAGS, HTTP_SINK_TAGS, + }, next_addr, random_lines_with_stream, }, }; @@ -68,8 +74,12 @@ mod tests { crate::test_util::test_generate_config::(); } - #[tokio::test] - async fn deliver_message() { + enum TestType { + Normal, + DataVolume, + } + + async fn run_sink_test(test_type: TestType) { let num_lines = 10; let in_addr = next_addr(); @@ -93,7 +103,15 @@ mod tests { let (batch, mut receiver) = BatchNotifier::new_with_receiver(); let (input_lines, events) = random_lines_with_stream(8, num_lines, Some(batch)); - run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + match test_type { + TestType::Normal => run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await, + + TestType::DataVolume => { + run_and_assert_data_volume_sink_compliance(sink, events, &DATA_VOLUME_SINK_TAGS) + .await + } + } + drop(trigger); assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); @@ -112,6 +130,26 @@ mod tests { assert_eq!(input_lines, output_lines); } + #[tokio::test] + async fn deliver_message() { + run_sink_test(TestType::Normal).await; + } + + #[tokio::test] + async fn data_volume_tags() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + + run_sink_test(TestType::DataVolume).await; + } + #[tokio::test] async fn acknowledges_error() { let num_lines = 10; diff --git a/src/sinks/vector/service.rs b/src/sinks/vector/service.rs index 5277a408634ee..fdf60393691bb 100644 --- a/src/sinks/vector/service.rs +++ b/src/sinks/vector/service.rs @@ -119,6 +119,9 @@ impl Service for VectorService { protocol: &service.protocol, endpoint: &service.endpoint, }); + + dbg!(&events_byte_size); + VectorResponse { events_byte_size } }) .map_err(|source| VectorSinkError::Request { source }.into()) From 01d011b752652ec750bb2e9a2759cd00c6b18b18 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 14 Jul 2023 13:14:33 +0100 Subject: [PATCH 07/12] Fix Vector test Signed-off-by: Stephen Wakely --- lib/vector-common/src/request_metadata.rs | 76 ++++++++++++++++++++++- src/sinks/util/metadata.rs | 12 ++++ src/sinks/vector/service.rs | 2 - src/sinks/vector/sink.rs | 37 ++++++++--- 4 files changed, 114 insertions(+), 13 deletions(-) diff --git a/lib/vector-common/src/request_metadata.rs b/lib/vector-common/src/request_metadata.rs index 9b93a63df7626..bc3e006978f57 100644 --- a/lib/vector-common/src/request_metadata.rs +++ b/lib/vector-common/src/request_metadata.rs @@ -1,4 +1,4 @@ -use std::ops::Add; +use std::ops::{Add, AddAssign}; use std::{collections::HashMap, sync::Arc}; use crate::{ @@ -145,6 +145,25 @@ impl GroupedCountByteSize { } } } + + /// Returns `true` if we are the `Tagged` variant - keeping track of the byte sizes + /// grouped by their relevant tags. + #[must_use] + pub fn is_tagged(&self) -> bool { + match self { + GroupedCountByteSize::Tagged { .. } => true, + GroupedCountByteSize::Untagged { .. } => false, + } + } + + /// Returns `true` if we are the `Untagged` variant - keeping a single count for all events. + #[must_use] + pub fn is_untagged(&self) -> bool { + match self { + GroupedCountByteSize::Tagged { .. } => false, + GroupedCountByteSize::Untagged { .. } => true, + } + } } impl From for GroupedCountByteSize { @@ -153,6 +172,61 @@ impl From for GroupedCountByteSize { } } +impl AddAssign for GroupedCountByteSize { + fn add_assign(&mut self, mut rhs: Self) { + if self.is_untagged() && rhs.is_tagged() { + // First handle the case where we are untagged and assigning to a tagged value. + // We need to change `self` and so need to ensure our match doesn't take ownership of the object. + *self = match (&self, &mut rhs) { + (Self::Untagged { size }, Self::Tagged { sizes }) => { + let mut sizes = std::mem::take(sizes); + match sizes.get_mut(&EventCountTags::new_empty()) { + Some(empty_size) => *empty_size += *size, + None => { + sizes.insert(EventCountTags::new_empty(), *size); + } + } + + Self::Tagged { sizes } + } + _ => { + unreachable!() + } + }; + + return; + } + + // For these cases, we know we won't have to change `self` so the match can take ownership. + match (self, rhs) { + (Self::Tagged { sizes: ref mut us }, Self::Tagged { sizes: them }) => { + for (key, value) in them { + match us.get_mut(&key) { + Some(size) => *size += value, + None => { + us.insert(key.clone(), value); + } + } + } + } + + (Self::Untagged { size: us }, Self::Untagged { size: them }) => { + *us = *us + them; + } + + (Self::Tagged { ref mut sizes }, Self::Untagged { size }) => { + match sizes.get_mut(&EventCountTags::new_empty()) { + Some(empty_size) => *empty_size += size, + None => { + sizes.insert(EventCountTags::new_empty(), size); + } + } + } + (Self::Untagged { .. }, Self::Tagged { .. }) => unreachable!(), + }; + } +} + impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize { type Output = GroupedCountByteSize; diff --git a/src/sinks/util/metadata.rs b/src/sinks/util/metadata.rs index e6f4e7739e4d2..1d0091a4831f9 100644 --- a/src/sinks/util/metadata.rs +++ b/src/sinks/util/metadata.rs @@ -68,6 +68,18 @@ impl RequestMetadataBuilder { } } + pub const fn new_proper( + event_count: usize, + events_byte_size: usize, + grouped_events_byte_size: GroupedCountByteSize, + ) -> Self { + Self { + event_count, + events_byte_size, + events_estimated_json_encoded_byte_size: grouped_events_byte_size, + } + } + pub fn track_event(&mut self, event: E) where E: ByteSizeOf + GetEventCountTags + EstimatedJsonEncodedSizeOf, diff --git a/src/sinks/vector/service.rs b/src/sinks/vector/service.rs index fdf60393691bb..a89271d4f928d 100644 --- a/src/sinks/vector/service.rs +++ b/src/sinks/vector/service.rs @@ -120,8 +120,6 @@ impl Service for VectorService { endpoint: &service.endpoint, }); - dbg!(&events_byte_size); - VectorResponse { events_byte_size } }) .map_err(|source| VectorSinkError::Request { source }.into()) diff --git a/src/sinks/vector/sink.rs b/src/sinks/vector/sink.rs index 229867194ddfd..56f7608f375d6 100644 --- a/src/sinks/vector/sink.rs +++ b/src/sinks/vector/sink.rs @@ -4,8 +4,9 @@ use async_trait::async_trait; use futures::{stream::BoxStream, StreamExt}; use prost::Message; use tower::Service; -use vector_common::json_size::JsonSize; +use vector_common::request_metadata::GroupedCountByteSize; use vector_core::{ + config::telemetry, stream::{BatcherSettings, DriverResponse}, ByteSizeOf, EstimatedJsonEncodedSizeOf, }; @@ -20,18 +21,29 @@ use crate::{ /// Data for a single event. struct EventData { byte_size: usize, - json_byte_size: JsonSize, + json_byte_size: GroupedCountByteSize, finalizers: EventFinalizers, wrapper: EventWrapper, } /// Temporary struct to collect events during batching. -#[derive(Clone, Default)] +#[derive(Clone)] struct EventCollection { pub finalizers: EventFinalizers, pub events: Vec, pub events_byte_size: usize, - pub events_json_byte_size: JsonSize, + pub events_json_byte_size: GroupedCountByteSize, +} + +impl Default for EventCollection { + fn default() -> Self { + Self { + finalizers: Default::default(), + events: Default::default(), + events_byte_size: Default::default(), + events_json_byte_size: telemetry().create_request_count_byte_size(), + } + } } pub struct VectorSink { @@ -48,11 +60,16 @@ where { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input - .map(|mut event| EventData { - byte_size: event.size_of(), - json_byte_size: event.estimated_json_encoded_size_of(), - finalizers: event.take_finalizers(), - wrapper: EventWrapper::from(event), + .map(|mut event| { + let mut byte_size = telemetry().create_request_count_byte_size(); + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + EventData { + byte_size: event.size_of(), + json_byte_size: byte_size, + finalizers: event.take_finalizers(), + wrapper: EventWrapper::from(event), + } }) .batched(self.batch_settings.into_reducer_config( |data: &EventData| data.wrapper.encoded_len(), @@ -64,7 +81,7 @@ where }, )) .map(|event_collection| { - let builder = RequestMetadataBuilder::new( + let builder = RequestMetadataBuilder::new_proper( event_collection.events.len(), event_collection.events_byte_size, event_collection.events_json_byte_size, From d2eb1d2aef85ea7212b6fceda32e26b4a788ba85 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 14 Jul 2023 13:35:45 +0100 Subject: [PATCH 08/12] Remove RequestMetadataBuilder::new Signed-off-by: Stephen Wakely --- src/sinks/datadog/traces/request_builder.rs | 12 +++---- src/sinks/splunk_hec/common/service.rs | 7 +++- src/sinks/util/metadata.rs | 40 +++++---------------- src/sinks/vector/sink.rs | 2 +- 4 files changed, 20 insertions(+), 41 deletions(-) diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs index dbe57714995fc..68081627cc76a 100644 --- a/src/sinks/datadog/traces/request_builder.rs +++ b/src/sinks/datadog/traces/request_builder.rs @@ -9,10 +9,7 @@ use bytes::Bytes; use prost::Message; use snafu::Snafu; use vector_common::request_metadata::RequestMetadata; -use vector_core::{ - event::{EventFinalizers, Finalizable}, - EstimatedJsonEncodedSizeOf, -}; +use vector_core::event::{EventFinalizers, Finalizable}; use super::{ apm_stats::{compute_apm_stats, Aggregator}, @@ -125,7 +122,6 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ .for_each(|r| match r { Ok((payload, mut processed)) => { let uncompressed_size = payload.len(); - let json_size = processed.estimated_json_encoded_size_of(); let metadata = DDTracesMetadata { api_key: key .api_key @@ -137,14 +133,14 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec)> for DatadogTracesRequ content_type: "application/x-protobuf".to_string(), }; + // build RequestMetadata + let builder = RequestMetadataBuilder::from_events(&processed); + let mut compressor = Compressor::from(self.compression); match compressor.write_all(&payload) { Ok(()) => { let bytes = compressor.into_inner().freeze(); - // build RequestMetadata - let builder = - RequestMetadataBuilder::new(n, uncompressed_size, json_size); let bytes_len = NonZeroUsize::new(bytes.len()) .expect("payload should never be zero length"); let request_metadata = builder.with_request_size(bytes_len); diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index a8abc57e77b09..ce9b324c10fcd 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -282,6 +282,7 @@ mod tests { use bytes::Bytes; use futures_util::{poll, stream::FuturesUnordered, StreamExt}; use tower::{util::BoxService, Service, ServiceExt}; + use vector_common::internal_event::CountByteSize; use vector_core::{ config::proxy::ProxyConfig, event::{EventFinalizers, EventStatus}, @@ -339,7 +340,11 @@ mod tests { let body = Bytes::from("test-message"); let events_byte_size = body.len(); - let builder = RequestMetadataBuilder::new(1, events_byte_size, events_byte_size.into()); + let builder = RequestMetadataBuilder::new( + 1, + events_byte_size, + CountByteSize(1, events_byte_size.into()).into(), + ); let bytes_len = NonZeroUsize::new(events_byte_size).expect("payload should never be zero length"); let metadata = builder.with_request_size(bytes_len); diff --git a/src/sinks/util/metadata.rs b/src/sinks/util/metadata.rs index 1d0091a4831f9..90e1f37539872 100644 --- a/src/sinks/util/metadata.rs +++ b/src/sinks/util/metadata.rs @@ -1,20 +1,15 @@ use std::num::NonZeroUsize; +use vector_common::request_metadata::{GetEventCountTags, GroupedCountByteSize, RequestMetadata}; use vector_core::{config, ByteSizeOf, EstimatedJsonEncodedSizeOf}; -use vector_common::{ - internal_event::CountByteSize, - json_size::JsonSize, - request_metadata::{GetEventCountTags, GroupedCountByteSize, RequestMetadata}, -}; - use super::request_builder::EncodeResult; #[derive(Clone, Default)] pub struct RequestMetadataBuilder { event_count: usize, events_byte_size: usize, - events_estimated_json_encoded_byte_size: GroupedCountByteSize, + grouped_events_byte_size: GroupedCountByteSize, } impl RequestMetadataBuilder { @@ -34,7 +29,7 @@ impl RequestMetadataBuilder { Self { event_count: events.len(), events_byte_size, - events_estimated_json_encoded_byte_size: size, + grouped_events_byte_size: size, } } @@ -48,27 +43,11 @@ impl RequestMetadataBuilder { Self { event_count: 1, events_byte_size: event.size_of(), - events_estimated_json_encoded_byte_size: size, - } - } - - pub fn new( - event_count: usize, - events_byte_size: usize, - events_estimated_json_encoded_byte_size: JsonSize, - ) -> Self { - Self { - event_count, - events_byte_size, - events_estimated_json_encoded_byte_size: CountByteSize( - event_count, - events_estimated_json_encoded_byte_size, - ) - .into(), + grouped_events_byte_size: size, } } - pub const fn new_proper( + pub const fn new( event_count: usize, events_byte_size: usize, grouped_events_byte_size: GroupedCountByteSize, @@ -76,7 +55,7 @@ impl RequestMetadataBuilder { Self { event_count, events_byte_size, - events_estimated_json_encoded_byte_size: grouped_events_byte_size, + grouped_events_byte_size, } } @@ -87,8 +66,7 @@ impl RequestMetadataBuilder { self.event_count += 1; self.events_byte_size += event.size_of(); let json_size = event.estimated_json_encoded_size_of(); - self.events_estimated_json_encoded_byte_size - .add_event(&event, json_size); + self.grouped_events_byte_size.add_event(&event, json_size); } pub fn with_request_size(&self, size: NonZeroUsize) -> RequestMetadata { @@ -99,7 +77,7 @@ impl RequestMetadataBuilder { self.events_byte_size, size, size, - self.events_estimated_json_encoded_byte_size.clone(), + self.grouped_events_byte_size.clone(), ) } @@ -111,7 +89,7 @@ impl RequestMetadataBuilder { result .compressed_byte_size .unwrap_or(result.uncompressed_byte_size), - self.events_estimated_json_encoded_byte_size.clone(), + self.grouped_events_byte_size.clone(), ) } } diff --git a/src/sinks/vector/sink.rs b/src/sinks/vector/sink.rs index 56f7608f375d6..fe77de6837883 100644 --- a/src/sinks/vector/sink.rs +++ b/src/sinks/vector/sink.rs @@ -81,7 +81,7 @@ where }, )) .map(|event_collection| { - let builder = RequestMetadataBuilder::new_proper( + let builder = RequestMetadataBuilder::new( event_collection.events.len(), event_collection.events_byte_size, event_collection.events_json_byte_size, From 9435f928b92ebe4098804d292d4691591ff86c91 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 20 Jul 2023 12:01:52 +0100 Subject: [PATCH 09/12] Feedback from Spencer and Kyle Signed-off-by: Stephen Wakely --- lib/vector-common/src/request_metadata.rs | 31 ++++++++++------------- src/sinks/redis.rs | 2 +- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/lib/vector-common/src/request_metadata.rs b/lib/vector-common/src/request_metadata.rs index bc3e006978f57..87ff25a1cf5dc 100644 --- a/lib/vector-common/src/request_metadata.rs +++ b/lib/vector-common/src/request_metadata.rs @@ -159,10 +159,7 @@ impl GroupedCountByteSize { /// Returns `true` if we are the `Untagged` variant - keeping a single count for all events. #[must_use] pub fn is_untagged(&self) -> bool { - match self { - GroupedCountByteSize::Tagged { .. } => false, - GroupedCountByteSize::Untagged { .. } => true, - } + !self.is_tagged() } } @@ -199,19 +196,19 @@ impl AddAssign for GroupedCountByteSize { // For these cases, we know we won't have to change `self` so the match can take ownership. match (self, rhs) { - (Self::Tagged { sizes: ref mut us }, Self::Tagged { sizes: them }) => { - for (key, value) in them { - match us.get_mut(&key) { + (Self::Tagged { sizes: ref mut lhs }, Self::Tagged { sizes: rhs }) => { + for (key, value) in rhs { + match lhs.get_mut(&key) { Some(size) => *size += value, None => { - us.insert(key.clone(), value); + lhs.insert(key.clone(), value); } } } } - (Self::Untagged { size: us }, Self::Untagged { size: them }) => { - *us = *us + them; + (Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => { + *lhs = *lhs + rhs; } (Self::Tagged { ref mut sizes }, Self::Untagged { size }) => { @@ -232,21 +229,21 @@ impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize { fn add(self, other: &'a Self::Output) -> Self::Output { match (self, other) { - (Self::Tagged { sizes: mut us }, Self::Tagged { sizes: them }) => { - for (key, value) in them { - match us.get_mut(key) { + (Self::Tagged { sizes: mut lhs }, Self::Tagged { sizes: rhs }) => { + for (key, value) in rhs { + match lhs.get_mut(key) { Some(size) => *size += *value, None => { - us.insert(key.clone(), *value); + lhs.insert(key.clone(), *value); } } } - Self::Tagged { sizes: us } + Self::Tagged { sizes: lhs } } - (Self::Untagged { size: us }, Self::Untagged { size: them }) => { - Self::Untagged { size: us + *them } + (Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => { + Self::Untagged { size: lhs + *rhs } } // The following two scenarios shouldn't really occur in practice, but are provided for completeness. diff --git a/src/sinks/redis.rs b/src/sinks/redis.rs index 402ad28aee906..31ef10ed66c5f 100644 --- a/src/sinks/redis.rs +++ b/src/sinks/redis.rs @@ -126,7 +126,7 @@ pub struct RedisSinkConfig { /// The URL of the Redis endpoint to connect to. /// - /// The URL _must_ take the form of `protocol://server:port/db` where the protocol can either be + /// The URg _must_ take the form of `protocol://server:port/db` where the protocol can either be /// `redis` or `rediss` for connections secured via TLS. #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))] #[serde(alias = "url")] From fa32c242ac7c0c2b62290eeee54c1a0674ad8b44 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 20 Jul 2023 12:32:44 +0100 Subject: [PATCH 10/12] Typo crept in Signed-off-by: Stephen Wakely --- src/sinks/redis.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/redis.rs b/src/sinks/redis.rs index 31ef10ed66c5f..402ad28aee906 100644 --- a/src/sinks/redis.rs +++ b/src/sinks/redis.rs @@ -126,7 +126,7 @@ pub struct RedisSinkConfig { /// The URL of the Redis endpoint to connect to. /// - /// The URg _must_ take the form of `protocol://server:port/db` where the protocol can either be + /// The URL _must_ take the form of `protocol://server:port/db` where the protocol can either be /// `redis` or `rediss` for connections secured via TLS. #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))] #[serde(alias = "url")] From b718c3733df08977ad8e8356f2f77d3b5470a64c Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 21 Jul 2023 10:31:36 +0100 Subject: [PATCH 11/12] Move function up a little Signed-off-by: Stephen Wakely --- .../splunk_hec/logs/integration_tests.rs | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/sinks/splunk_hec/logs/integration_tests.rs b/src/sinks/splunk_hec/logs/integration_tests.rs index 433a87f571e1a..8bfe02dfee305 100644 --- a/src/sinks/splunk_hec/logs/integration_tests.rs +++ b/src/sinks/splunk_hec/logs/integration_tests.rs @@ -131,6 +131,18 @@ async fn config(encoding: EncodingConfig, indexed_fields: Vec) -> HecLog } } +fn enable_telemetry() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); +} + #[tokio::test] async fn splunk_insert_message() { let cx = SinkContext::default(); @@ -151,18 +163,6 @@ async fn splunk_insert_message() { assert!(entry.get("message").is_none()); } -fn enable_telemetry() { - init_telemetry( - Telemetry { - tags: Tags { - emit_service: true, - emit_source: true, - }, - }, - true, - ); -} - #[tokio::test] async fn splunk_insert_message_data_volume() { enable_telemetry(); From 0fbe38e373a70e10e77146c73f2a8529e03e6f2b Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 27 Jul 2023 11:59:48 +0100 Subject: [PATCH 12/12] Fixed datadog agent test Signed-off-by: Stephen Wakely --- src/sinks/datadog/metrics/encoder.rs | 4 +++- src/sinks/datadog/metrics/integration_tests.rs | 15 ++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index b056d672cf46a..347271d5651ae 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -333,6 +333,8 @@ impl DatadogMetricsEncoder { self.state.written += n; let raw_bytes_written = self.state.written; + let byte_size = self.state.byte_size.clone(); + // Consume the encoder state so we can do our final checks and return the necessary data. let state = self.reset_state(); let payload = state @@ -357,7 +359,7 @@ impl DatadogMetricsEncoder { if recommended_splits == 1 { // "One" split means no splits needed: our payload didn't exceed either of the limits. Ok(( - EncodeResult::compressed(payload, raw_bytes_written, self.state.byte_size.clone()), + EncodeResult::compressed(payload, raw_bytes_written, byte_size), processed, )) } else { diff --git a/src/sinks/datadog/metrics/integration_tests.rs b/src/sinks/datadog/metrics/integration_tests.rs index bd03e6e39671c..458cc5be987f3 100644 --- a/src/sinks/datadog/metrics/integration_tests.rs +++ b/src/sinks/datadog/metrics/integration_tests.rs @@ -4,7 +4,10 @@ use futures::{channel::mpsc::Receiver, stream, StreamExt}; use hyper::StatusCode; use indoc::indoc; use rand::{thread_rng, Rng}; -use vector_core::event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue}; +use vector_core::{ + config::{init_telemetry, Tags, Telemetry}, + event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue}, +}; use super::DatadogMetricsConfig; use crate::{ @@ -207,5 +210,15 @@ async fn real_endpoint() { #[tokio::test] async fn data_volume_tags() { + init_telemetry( + Telemetry { + tags: Tags { + emit_service: true, + emit_source: true, + }, + }, + true, + ); + assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async { run_sink().await }).await; }