Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(observability): add tests to sinks for Data Volume tags #17853

Merged
merged 14 commits into from
Jul 28, 2023
91 changes: 82 additions & 9 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;
use std::ops::Add;
use std::{
collections::HashMap,
ops::{Add, AddAssign},
};

use crate::{
internal_event::{
Expand Down Expand Up @@ -119,6 +121,22 @@ impl GroupedCountByteSize {
}
}
}

/// Returns `true` if we are the `Tagged` variant - keeping track of the byte sizes
/// grouped by their relevant tags.
#[must_use]
pub fn is_tagged(&self) -> bool {
match self {
GroupedCountByteSize::Tagged { .. } => true,
GroupedCountByteSize::Untagged { .. } => false,
}
}

/// Returns `true` if we are the `Untagged` variant - keeping a single count for all events.
#[must_use]
pub fn is_untagged(&self) -> bool {
!self.is_tagged()
}
}

impl From<CountByteSize> for GroupedCountByteSize {
Expand All @@ -127,26 +145,81 @@ impl From<CountByteSize> for GroupedCountByteSize {
}
}

impl AddAssign for GroupedCountByteSize {
StephenWakely marked this conversation as resolved.
Show resolved Hide resolved
fn add_assign(&mut self, mut rhs: Self) {
if self.is_untagged() && rhs.is_tagged() {
// First handle the case where we are untagged and assigning to a tagged value.
// We need to change `self` and so need to ensure our match doesn't take ownership of the object.
*self = match (&self, &mut rhs) {
(Self::Untagged { size }, Self::Tagged { sizes }) => {
let mut sizes = std::mem::take(sizes);
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += *size,
None => {
sizes.insert(TaggedEventsSent::new_empty(), *size);
}
}

Self::Tagged { sizes }
}
_ => {
unreachable!()
}
};

return;
}

// For these cases, we know we won't have to change `self` so the match can take ownership.
match (self, rhs) {
(Self::Tagged { sizes: ref mut lhs }, Self::Tagged { sizes: rhs }) => {
for (key, value) in rhs {
match lhs.get_mut(&key) {
Some(size) => *size += value,
None => {
lhs.insert(key.clone(), value);
}
}
}
}

(Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
*lhs = *lhs + rhs;
}

(Self::Tagged { ref mut sizes }, Self::Untagged { size }) => {
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += size,
None => {
sizes.insert(TaggedEventsSent::new_empty(), size);
}
}
}
(Self::Untagged { .. }, Self::Tagged { .. }) => unreachable!(),
};
}
}

impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize {
type Output = GroupedCountByteSize;

fn add(self, other: &'a Self::Output) -> Self::Output {
match (self, other) {
(Self::Tagged { sizes: mut us }, Self::Tagged { sizes: them }) => {
for (key, value) in them {
match us.get_mut(key) {
(Self::Tagged { sizes: mut lhs }, Self::Tagged { sizes: rhs }) => {
for (key, value) in rhs {
match lhs.get_mut(key) {
Some(size) => *size += *value,
None => {
us.insert(key.clone(), *value);
lhs.insert(key.clone(), *value);
}
}
}

Self::Tagged { sizes: us }
Self::Tagged { sizes: lhs }
}

(Self::Untagged { size: us }, Self::Untagged { size: them }) => {
Self::Untagged { size: us + *them }
(Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
Self::Untagged { size: lhs + *rhs }
}

// The following two scenarios shouldn't really occur in practice, but are provided for completeness.
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ impl DatadogMetricsEncoder {
self.state.written += n;

let raw_bytes_written = self.state.written;
let byte_size = self.state.byte_size.clone();

// Consume the encoder state so we can do our final checks and return the necessary data.
let state = self.reset_state();
let payload = state
Expand All @@ -357,7 +359,7 @@ impl DatadogMetricsEncoder {
if recommended_splits == 1 {
// "One" split means no splits needed: our payload didn't exceed either of the limits.
Ok((
EncodeResult::compressed(payload, raw_bytes_written, self.state.byte_size.clone()),
EncodeResult::compressed(payload, raw_bytes_written, byte_size),
processed,
))
} else {
Expand Down
80 changes: 51 additions & 29 deletions src/sinks/datadog/metrics/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@ use futures::{channel::mpsc::Receiver, stream, StreamExt};
use hyper::StatusCode;
use indoc::indoc;
use rand::{thread_rng, Rng};
use vector_core::event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue};
use vector_core::{
config::{init_telemetry, Tags, Telemetry},
event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue},
};

use super::DatadogMetricsConfig;
use crate::{
config::SinkConfig,
sinks::util::test::{build_test_server_status, load_sink},
test_util::{
components::{assert_sink_compliance, SINK_TAGS},
components::{
assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS,
SINK_TAGS,
},
map_event_batch_stream, next_addr,
},
};
Expand Down Expand Up @@ -168,35 +174,51 @@ async fn smoke() {
}
}

#[tokio::test]
async fn real_endpoint() {
assert_sink_compliance(&SINK_TAGS, async {
let config = indoc! {r#"
async fn run_sink() {
let config = indoc! {r#"
default_api_key = "${TEST_DATADOG_API_KEY}"
default_namespace = "fake.test.integration"
"#};
let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap();
assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required");
let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key);
let (config, cx) = load_sink::<DatadogMetricsConfig>(config.as_str()).unwrap();

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
let events: Vec<_> = (0..10)
.map(|index| {
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
})
.collect();
let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));
let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap();
assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required");
let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key);
let (config, cx) = load_sink::<DatadogMetricsConfig>(config.as_str()).unwrap();

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
let events: Vec<_> = (0..10)
.map(|index| {
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
})
.collect();
let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));

sink.run(stream).await.unwrap();
assert_eq!(receiver.await, BatchStatus::Delivered);
}

sink.run(stream).await.unwrap();
assert_eq!(receiver.await, BatchStatus::Delivered);
})
.await;
#[tokio::test]
async fn real_endpoint() {
assert_sink_compliance(&SINK_TAGS, async { run_sink().await }).await;
}

#[tokio::test]
async fn data_volume_tags() {
init_telemetry(
Telemetry {
tags: Tags {
emit_service: true,
emit_source: true,
},
},
true,
);

assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async { run_sink().await }).await;
}
12 changes: 4 additions & 8 deletions src/sinks/datadog/traces/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use bytes::Bytes;
use prost::Message;
use snafu::Snafu;
use vector_common::request_metadata::RequestMetadata;
use vector_core::{
event::{EventFinalizers, Finalizable},
EstimatedJsonEncodedSizeOf,
};
use vector_core::event::{EventFinalizers, Finalizable};

use super::{
apm_stats::{compute_apm_stats, Aggregator},
Expand Down Expand Up @@ -125,7 +122,6 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
.for_each(|r| match r {
Ok((payload, mut processed)) => {
let uncompressed_size = payload.len();
let json_size = processed.estimated_json_encoded_size_of();
let metadata = DDTracesMetadata {
api_key: key
.api_key
Expand All @@ -137,14 +133,14 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
content_type: "application/x-protobuf".to_string(),
};

// build RequestMetadata
let builder = RequestMetadataBuilder::from_events(&processed);

let mut compressor = Compressor::from(self.compression);
match compressor.write_all(&payload) {
Ok(()) => {
let bytes = compressor.into_inner().freeze();

// build RequestMetadata
let builder =
RequestMetadataBuilder::new(n, uncompressed_size, json_size);
let bytes_len = NonZeroUsize::new(bytes.len())
.expect("payload should never be zero length");
let request_metadata = builder.with_request_size(bytes_len);
Expand Down
Loading