From 3485f2c53617270403317f4c21bb076a8f53eeee Mon Sep 17 00:00:00 2001 From: neuronull Date: Thu, 12 Oct 2023 12:20:41 -0600 Subject: [PATCH] chore(datadog_metrics sink): support and migrate to the `v2` series API endpoint (#18761) * add scaffolding for the differentiation of series API versions * fix(datadog_metrics sink): fix the integration tests which weren't actually validating anything * fix workflows * clippy * fix filter for traces * add first pass * add testing coverage * cargo.lock * reduce duplicated code * cleanup * clippy * feedback ds: remove check for sort by name * feedback ds: extend unit tests for v2 * feedback ds: extend the int test coverage * Revert "feedback ds: remove check for sort by name" This reverts commit c95a326c30a471e660742b6e354ab5d73e1201de. * add explicit sort check * add env var for v1 support * check events * add note in deprecations * remove dead code allow --- docs/DEPRECATION.md | 4 +- docs/DEPRECATIONS.md | 1 + lib/vector-core/src/event/mod.rs | 7 + src/sinks/datadog/metrics/config.rs | 58 +- src/sinks/datadog/metrics/encoder.rs | 506 +++++++++++++----- .../datadog/metrics/integration_tests.rs | 258 ++++++++- src/sinks/datadog/metrics/request_builder.rs | 4 +- src/sinks/datadog/metrics/sink.rs | 9 +- 8 files changed, 681 insertions(+), 166 deletions(-) diff --git a/docs/DEPRECATION.md b/docs/DEPRECATION.md index ff1edddde596b..4d8c82faaa256 100644 --- a/docs/DEPRECATION.md +++ b/docs/DEPRECATION.md @@ -78,7 +78,7 @@ When introducing a deprecation into Vector, the pull request introducing the dep the new name will be appended with the text `(formerly OldName)`. - Add a log message to Vector that is logged at the `WARN` level starting with the word `DEPRECATION` if Vector detects the deprecated configuration or feature being used (when possible). -- Add the deprecation to [DEPRECATIONS.md](docs/DEPRECATIONS.md) to track migration (if applicable) and removal +- Add the deprecation to [DEPRECATIONS.md](DEPRECATIONS.md) to track migration (if applicable) and removal When removing a deprecation in a subsequent release, the pull request should: @@ -86,4 +86,4 @@ When removing a deprecation in a subsequent release, the pull request should: - Remove the deprecation from the documentation - Add a note to the Breaking Changes section of the upgrade guide for the next release with a description and directions for transitioning if applicable. -- Remove the deprecation from [DEPRECATIONS.md](docs/DEPRECATIONS.md) +- Remove the deprecation from [DEPRECATIONS.md](DEPRECATIONS.md) diff --git a/docs/DEPRECATIONS.md b/docs/DEPRECATIONS.md index 061e19f36e9ed..2dc2553ecd94d 100644 --- a/docs/DEPRECATIONS.md +++ b/docs/DEPRECATIONS.md @@ -6,6 +6,7 @@ See [DEPRECATION.md](docs/DEPRECATION.md#process) for the process for updating t ## To be removed +* Support for `v1` series endpoint in the `datadog_metrics` sink should be removed. * legacy_openssl_provider v0.34.0 OpenSSL legacy provider flag should be removed * armv7_rpm v0.34.0 The armv7 RPM packages should be removed (replaced by armv7hl) * yaml_migration v0.34.0 Prefer loading `/etc/vector/vector.yaml` first diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index c96198f27d1bb..5bb7d144f91b2 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -319,6 +319,13 @@ impl Event { self } + /// Sets the `source_type` in the event metadata to the provided value. + #[must_use] + pub fn with_source_type(mut self, source_type: &'static str) -> Self { + self.metadata_mut().set_source_type(source_type); + self + } + /// Sets the `upstream_id` in the event metadata to the provided value. #[must_use] pub fn with_upstream_id(mut self, upstream_id: Arc) -> Self { diff --git a/src/sinks/datadog/metrics/config.rs b/src/sinks/datadog/metrics/config.rs index 9fb8c4cd48137..bfddadd64d6d9 100644 --- a/src/sinks/datadog/metrics/config.rs +++ b/src/sinks/datadog/metrics/config.rs @@ -1,3 +1,5 @@ +use std::sync::OnceLock; + use http::Uri; use snafu::ResultExt; use tower::ServiceBuilder; @@ -42,12 +44,43 @@ impl SinkBatchSettings for DatadogMetricsDefaultBatchSettings { const TIMEOUT_SECS: f64 = 2.0; } +pub(super) const SERIES_V1_PATH: &str = "/api/v1/series"; +pub(super) const SERIES_V2_PATH: &str = "/api/v2/series"; +pub(super) const SKETCHES_PATH: &str = "/api/beta/sketches"; + +// TODO: the series V1 endpoint support is considered deprecated and should be removed in a future release. +// At that time when the V1 support is removed, the SeriesApiVersion stops being useful and can be removed. + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SeriesApiVersion { + V1, + V2, +} + +impl SeriesApiVersion { + pub const fn get_path(self) -> &'static str { + match self { + Self::V1 => SERIES_V1_PATH, + Self::V2 => SERIES_V2_PATH, + } + } + fn get_api_version_backwards_compatible() -> Self { + static API_VERSION: OnceLock = OnceLock::new(); + *API_VERSION.get_or_init( + || match option_env!("VECTOR_TEMP_USE_DD_METRICS_SERIES_V1_API") { + Some(_) => Self::V1, + None => Self::V2, + }, + ) + } +} + /// Various metric type-specific API types. /// /// Each of these corresponds to a specific request path when making a request to the agent API. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum DatadogMetricsEndpoint { - Series, + Series(SeriesApiVersion), Sketches, } @@ -55,14 +88,19 @@ impl DatadogMetricsEndpoint { /// Gets the content type associated with the specific encoder for a given metric endpoint. pub const fn content_type(self) -> &'static str { match self { - DatadogMetricsEndpoint::Series => "application/json", - DatadogMetricsEndpoint::Sketches => "application/x-protobuf", + Self::Series(SeriesApiVersion::V1) => "application/json", + Self::Sketches | Self::Series(SeriesApiVersion::V2) => "application/x-protobuf", } } // Gets whether or not this is a series endpoint. pub const fn is_series(self) -> bool { - matches!(self, Self::Series) + matches!(self, Self::Series { .. }) + } + + // Creates an instance of the `Series` variant with the default API version. + pub fn series() -> Self { + Self::Series(SeriesApiVersion::get_api_version_backwards_compatible()) } } @@ -84,7 +122,7 @@ impl DatadogMetricsEndpointConfiguration { /// Gets the URI for the given Datadog metrics endpoint. pub fn get_uri_for_endpoint(&self, endpoint: DatadogMetricsEndpoint) -> Uri { match endpoint { - DatadogMetricsEndpoint::Series => self.series_endpoint.clone(), + DatadogMetricsEndpoint::Series { .. } => self.series_endpoint.clone(), DatadogMetricsEndpoint::Sketches => self.sketches_endpoint.clone(), } } @@ -169,8 +207,14 @@ impl DatadogMetricsConfig { &self, ) -> crate::Result { let base_uri = self.get_base_agent_endpoint(); - let series_endpoint = build_uri(&base_uri, "/api/v1/series")?; - let sketches_endpoint = build_uri(&base_uri, "/api/beta/sketches")?; + + // TODO: the V1 endpoint support is considered deprecated and should be removed in a future release. + // At that time, the get_api_version_backwards_compatible() should be replaced with statically using the v2. + let series_endpoint = build_uri( + &base_uri, + SeriesApiVersion::get_api_version_backwards_compatible().get_path(), + )?; + let sketches_endpoint = build_uri(&base_uri, SKETCHES_PATH)?; Ok(DatadogMetricsEndpointConfiguration::new( series_endpoint, diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 60b1772986836..015ed7d2464e5 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -8,7 +8,6 @@ use std::{ use bytes::{BufMut, Bytes}; use chrono::{DateTime, Utc}; use once_cell::sync::Lazy; -use prost::Message; use snafu::{ResultExt, Snafu}; use vector_common::request_metadata::GroupedCountByteSize; use vector_core::{ @@ -19,7 +18,7 @@ use vector_core::{ }; use super::config::{ - DatadogMetricsEndpoint, MAXIMUM_PAYLOAD_COMPRESSED_SIZE, MAXIMUM_PAYLOAD_SIZE, + DatadogMetricsEndpoint, SeriesApiVersion, MAXIMUM_PAYLOAD_COMPRESSED_SIZE, MAXIMUM_PAYLOAD_SIZE, }; use crate::{ common::datadog::{ @@ -33,11 +32,11 @@ const SERIES_PAYLOAD_HEADER: &[u8] = b"{\"series\":["; const SERIES_PAYLOAD_FOOTER: &[u8] = b"]}"; const SERIES_PAYLOAD_DELIMITER: &[u8] = b","; -const ORIGIN_CATEGORY_VALUE: u32 = 11; +pub(super) const ORIGIN_CATEGORY_VALUE: u32 = 11; const DEFAULT_DD_ORIGIN_PRODUCT_VALUE: u32 = 14; -static ORIGIN_PRODUCT_VALUE: Lazy = Lazy::new(|| { +pub(super) static ORIGIN_PRODUCT_VALUE: Lazy = Lazy::new(|| { option_env!("DD_ORIGIN_PRODUCT") .map(|p| { p.parse::() @@ -228,9 +227,24 @@ impl DatadogMetricsEncoder { .byte_size .add_event(&metric, metric.estimated_json_encoded_size_of()); + // For V2 Series metrics, and Sketches: We encode a single Series or Sketch metric incrementally, + // which means that we specifically write it as if we were writing a single field entry in the + // overall `SketchPayload` message or `MetricPayload` type. + // + // By doing so, we can encode multiple metrics and concatenate all the buffers, and have the + // resulting buffer appear as if it's a normal `<>Payload` message with a bunch of repeats + // of the `sketches` / `series` field. + // + // Crucially, this code works because `SketchPayload` has two fields -- metadata and sketches -- + // and we never actually set the metadata field... so the resulting message generated overall + // for `SketchPayload` with a single sketch looks just like as if we literally wrote out a + // single value for the given field. + // + // Similary, `MetricPayload` has a single repeated `series` field. + match self.endpoint { - // Series metrics are encoded via JSON, in an incremental fashion. - DatadogMetricsEndpoint::Series => { + // V1 Series metrics are encoded via JSON, in an incremental fashion. + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => { // A single `Metric` might generate multiple Datadog series metrics. let all_series = generate_series_metrics( &metric, @@ -252,19 +266,51 @@ impl DatadogMetricsEncoder { serde_json::to_writer(&mut self.state.buf, series)?; } } + // V2 Series metrics are encoded via ProtoBuf, in an incremental fashion. + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => match metric.value() { + MetricValue::Counter { .. } + | MetricValue::Gauge { .. } + | MetricValue::Set { .. } + | MetricValue::AggregatedSummary { .. } => { + let series_proto = series_to_proto_message( + &metric, + &self.default_namespace, + self.log_schema, + self.origin_product_value, + )?; + + encode_proto_key_and_message( + series_proto, + get_series_payload_series_field_number(), + &mut self.state.buf, + )?; + } + value => { + return Err(EncoderError::InvalidMetric { + expected: "series", + metric_value: value.as_name(), + }) + } + }, // Sketches are encoded via ProtoBuf, also in an incremental fashion. DatadogMetricsEndpoint::Sketches => match metric.value() { MetricValue::Sketch { sketch } => match sketch { MetricSketch::AgentDDSketch(ddsketch) => { - encode_sketch_incremental( + if let Some(sketch_proto) = sketch_to_proto_message( &metric, ddsketch, &self.default_namespace, self.log_schema, - &mut self.state.buf, self.origin_product_value, - ) - .map_err(|_| EncoderError::ProtoEncodingFailed)?; + ) { + encode_proto_key_and_message( + sketch_proto, + get_sketch_payload_sketches_field_number(), + &mut self.state.buf, + )?; + } else { + // If the sketch was empty, that's fine too + } } }, value => { @@ -394,22 +440,7 @@ impl DatadogMetricsEncoder { } } -fn get_sketch_payload_sketches_field_number() -> u32 { - static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceLock = OnceLock::new(); - *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| { - let descriptors = protobuf_descriptors(); - let descriptor = descriptors - .get_message_by_name("datadog.agentpayload.SketchPayload") - .expect("should not fail to find `SketchPayload` message in descriptor pool"); - - descriptor - .get_field_by_name("sketches") - .map(|field| field.number()) - .expect("`sketches` field must exist in `SketchPayload` message") - }) -} - -fn generate_sketch_metadata( +fn generate_proto_metadata( maybe_pass_through: Option<&DatadogMetricOriginMetadata>, maybe_source_type: Option<&'static str>, origin_product_value: u32, @@ -438,6 +469,36 @@ fn generate_sketch_metadata( ) } +fn get_sketch_payload_sketches_field_number() -> u32 { + static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceLock = OnceLock::new(); + *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| { + let descriptors = protobuf_descriptors(); + let descriptor = descriptors + .get_message_by_name("datadog.agentpayload.SketchPayload") + .expect("should not fail to find `SketchPayload` message in descriptor pool"); + + descriptor + .get_field_by_name("sketches") + .map(|field| field.number()) + .expect("`sketches` field must exist in `SketchPayload` message") + }) +} + +fn get_series_payload_series_field_number() -> u32 { + static SERIES_PAYLOAD_SERIES_FIELD_NUM: OnceLock = OnceLock::new(); + *SERIES_PAYLOAD_SERIES_FIELD_NUM.get_or_init(|| { + let descriptors = protobuf_descriptors(); + let descriptor = descriptors + .get_message_by_name("datadog.agentpayload.MetricPayload") + .expect("should not fail to find `MetricPayload` message in descriptor pool"); + + descriptor + .get_field_by_name("series") + .map(|field| field.number()) + .expect("`series` field must exist in `MetricPayload` message") + }) +} + fn sketch_to_proto_message( metric: &Metric, ddsketch: &AgentDDSketch, @@ -477,7 +538,7 @@ fn sketch_to_proto_message( let n = counts.into_iter().map(Into::into).collect(); let event_metadata = metric.metadata(); - let metadata = generate_sketch_metadata( + let metadata = generate_proto_metadata( event_metadata.datadog_origin_metadata(), event_metadata.source_type(), origin_product_value, @@ -504,49 +565,118 @@ fn sketch_to_proto_message( }) } -fn encode_sketch_incremental( +fn series_to_proto_message( metric: &Metric, - ddsketch: &AgentDDSketch, default_namespace: &Option>, log_schema: &'static LogSchema, - buf: &mut B, origin_product_value: u32, -) -> Result<(), prost::EncodeError> +) -> Result { + let metric_name = get_namespaced_name(metric, default_namespace); + let mut tags = metric.tags().cloned().unwrap_or_default(); + + let mut resources = vec![]; + + if let Some(host) = log_schema + .host_key() + .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default()) + { + resources.push(ddmetric_proto::metric_payload::Resource { + r#type: "host".to_string(), + name: host, + }); + } + + // In the `datadog_agent` source, the tag is added as `device` for the V1 endpoint + // and `resource.device` for the V2 endpoint. + if let Some(device) = tags.remove("device").or(tags.remove("resource.device")) { + resources.push(ddmetric_proto::metric_payload::Resource { + r#type: "device".to_string(), + name: device, + }); + } + + let source_type_name = tags.remove("source_type_name").unwrap_or_default(); + + let tags = encode_tags(&tags); + + let event_metadata = metric.metadata(); + let metadata = generate_proto_metadata( + event_metadata.datadog_origin_metadata(), + event_metadata.source_type(), + origin_product_value, + ); + trace!(?metadata, "Generated MetricSeries metadata."); + + let timestamp = encode_timestamp(metric.timestamp()); + + let (points, metric_type, interval) = match (metric.value(), metric.interval_ms()) { + (MetricValue::Counter { value }, maybe_interval_ms) => { + let (value, interval, metric_type) = match maybe_interval_ms { + None => (*value, 0, ddmetric_proto::metric_payload::MetricType::Count), + // When an interval is defined, it implies the value should be in a per-second form, + // so we need to get back to seconds from our milliseconds-based interval, and then + // divide our value by that amount as well. + Some(interval_ms) => ( + (*value) * 1000.0 / (interval_ms.get() as f64), + interval_ms.get() as i64 / 1000, + ddmetric_proto::metric_payload::MetricType::Rate, + ), + }; + let points = vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }]; + (points, metric_type, interval) + } + (MetricValue::Set { values }, _) => { + let points = vec![ddmetric_proto::metric_payload::MetricPoint { + value: values.len() as f64, + timestamp, + }]; + let metric_type = ddmetric_proto::metric_payload::MetricType::Gauge; + let interval = 0; + (points, metric_type, interval) + } + (MetricValue::Gauge { value }, _) => { + let points = vec![ddmetric_proto::metric_payload::MetricPoint { + value: *value, + timestamp, + }]; + let metric_type = ddmetric_proto::metric_payload::MetricType::Gauge; + let interval = 0; + (points, metric_type, interval) + } + // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization + (value, _) => { + // this case should have already been surfaced by encode_single_metric() so this should never be reached + return Err(EncoderError::InvalidMetric { + expected: "series", + metric_value: value.as_name(), + }); + } + }; + + Ok(ddmetric_proto::metric_payload::MetricSeries { + resources, + metric: metric_name, + tags, + points, + r#type: metric_type.into(), + // unit is omitted + unit: "".to_string(), + source_type_name, + interval, + metadata, + }) +} + +// Manually write the field tag and then encode the Message payload directly as a length-delimited message. +fn encode_proto_key_and_message(msg: T, tag: u32, buf: &mut B) -> Result<(), EncoderError> where + T: prost::Message, B: BufMut, { - // This encodes a single sketch metric incrementally, which means that we specifically write it - // as if we were writing a single field entry in the overall `SketchPayload` message - // type. - // - // By doing so, we can encode multiple sketches and concatenate all the buffers, and have the - // resulting buffer appear as if it's a normal `SketchPayload` message with a bunch of repeats - // of the `sketches` field. - // - // Crucially, this code works because `SketchPayload` has two fields -- metadata and sketches -- - // and we never actually set the metadata field... so the resulting message generated overall - // for `SketchPayload` with a single sketch looks just like as if we literally wrote out a - // single value for the given field. - - if let Some(sketch_proto) = sketch_to_proto_message( - metric, - ddsketch, - default_namespace, - log_schema, - origin_product_value, - ) { - // Manually write the field tag for `sketches` and then encode the sketch payload directly as a - // length-delimited message. - prost::encoding::encode_key( - get_sketch_payload_sketches_field_number(), - prost::encoding::WireType::LengthDelimited, - buf, - ); - sketch_proto.encode_length_delimited(buf) - } else { - // If the sketch was empty, that's fine too - Ok(()) - } + prost::encoding::encode_key(tag, prost::encoding::WireType::LengthDelimited, buf); + + msg.encode_length_delimited(buf) + .map_err(|_| EncoderError::ProtoEncodingFailed) } fn get_namespaced_name(metric: &Metric, default_namespace: &Option>) -> String { @@ -749,6 +879,7 @@ fn generate_series_metrics( device, metadata, }], + // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization (value, _) => { return Err(EncoderError::InvalidMetric { expected: "series", @@ -836,7 +967,7 @@ fn write_payload_header( writer: &mut dyn io::Write, ) -> io::Result { match endpoint { - DatadogMetricsEndpoint::Series => writer + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer .write_all(SERIES_PAYLOAD_HEADER) .map(|_| SERIES_PAYLOAD_HEADER.len()), _ => Ok(0), @@ -848,7 +979,7 @@ fn write_payload_delimiter( writer: &mut dyn io::Write, ) -> io::Result { match endpoint { - DatadogMetricsEndpoint::Series => writer + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer .write_all(SERIES_PAYLOAD_DELIMITER) .map(|_| SERIES_PAYLOAD_DELIMITER.len()), _ => Ok(0), @@ -860,7 +991,7 @@ fn write_payload_footer( writer: &mut dyn io::Write, ) -> io::Result { match endpoint { - DatadogMetricsEndpoint::Series => writer + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer .write_all(SERIES_PAYLOAD_FOOTER) .map(|_| SERIES_PAYLOAD_FOOTER.len()), _ => Ok(0), @@ -895,15 +1026,16 @@ mod tests { }; use super::{ - ddmetric_proto, encode_sketch_incremental, encode_tags, encode_timestamp, - generate_series_metrics, get_compressor, max_compression_overhead_len, - max_uncompressed_header_len, sketch_to_proto_message, validate_payload_size_limits, - write_payload_footer, write_payload_header, DatadogMetricsEncoder, EncoderError, + ddmetric_proto, encode_proto_key_and_message, encode_tags, encode_timestamp, + generate_series_metrics, get_compressor, get_sketch_payload_sketches_field_number, + max_compression_overhead_len, max_uncompressed_header_len, series_to_proto_message, + sketch_to_proto_message, validate_payload_size_limits, write_payload_footer, + write_payload_header, DatadogMetricsEncoder, EncoderError, }; use crate::{ common::datadog::DatadogMetricType, sinks::datadog::metrics::{ - config::DatadogMetricsEndpoint, + config::{DatadogMetricsEndpoint, SeriesApiVersion}, encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE}, }, }; @@ -936,10 +1068,16 @@ mod tests { fn get_compressed_empty_series_payload() -> Bytes { let mut compressor = get_compressor(); - _ = write_payload_header(DatadogMetricsEndpoint::Series, &mut compressor) - .expect("should not fail"); - _ = write_payload_footer(DatadogMetricsEndpoint::Series, &mut compressor) - .expect("should not fail"); + _ = write_payload_header( + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), + &mut compressor, + ) + .expect("should not fail"); + _ = write_payload_footer( + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), + &mut compressor, + ) + .expect("should not fail"); compressor.finish().expect("should not fail").freeze() } @@ -1037,10 +1175,19 @@ mod tests { )); // And sketches can't go to the series endpoint. - // Series metrics can't go to the sketches endpoint. - let mut series_encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series, None) - .expect("default payload size limits should be valid"); - let sketch_result = series_encoder.try_encode(get_simple_sketch()); + let mut series_v1_encoder = + DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None) + .expect("default payload size limits should be valid"); + let sketch_result = series_v1_encoder.try_encode(get_simple_sketch()); + assert!(matches!( + sketch_result.err(), + Some(EncoderError::InvalidMetric { .. }) + )); + + let mut series_v2_encoder = + DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None) + .expect("default payload size limits should be valid"); + let sketch_result = series_v2_encoder.try_encode(get_simple_sketch()); assert!(matches!( sketch_result.err(), Some(EncoderError::InvalidMetric { .. }) @@ -1060,23 +1207,41 @@ mod tests { let expected_value = value / (interval_ms / 1000) as f64; let expected_interval = interval_ms / 1000; - // Encode the metric and make sure we did the rate conversion correctly. - let result = generate_series_metrics( - &rate_counter, - &None, - log_schema(), - DEFAULT_DD_ORIGIN_PRODUCT_VALUE, - ); - assert!(result.is_ok()); + // series v1 + { + // Encode the metric and make sure we did the rate conversion correctly. + let result = generate_series_metrics( + &rate_counter, + &None, + log_schema(), + DEFAULT_DD_ORIGIN_PRODUCT_VALUE, + ); + assert!(result.is_ok()); - let metrics = result.unwrap(); - assert_eq!(metrics.len(), 1); + let metrics = result.unwrap(); + assert_eq!(metrics.len(), 1); - let actual = &metrics[0]; - assert_eq!(actual.r#type, DatadogMetricType::Rate); - assert_eq!(actual.interval, Some(expected_interval)); - assert_eq!(actual.points.len(), 1); - assert_eq!(actual.points[0].1, expected_value); + let actual = &metrics[0]; + assert_eq!(actual.r#type, DatadogMetricType::Rate); + assert_eq!(actual.interval, Some(expected_interval)); + assert_eq!(actual.points.len(), 1); + assert_eq!(actual.points[0].1, expected_value); + } + + // series v2 + { + let series_proto = series_to_proto_message( + &rate_counter, + &None, + log_schema(), + DEFAULT_DD_ORIGIN_PRODUCT_VALUE, + ) + .unwrap(); + assert_eq!(series_proto.r#type, 2); + assert_eq!(series_proto.interval, expected_interval as i64); + assert_eq!(series_proto.points.len(), 1); + assert_eq!(series_proto.points[0].value, expected_value); + } } #[test] @@ -1090,23 +1255,41 @@ mod tests { ); let counter = get_simple_counter_with_metadata(event_metadata); - let result = generate_series_metrics( - &counter, - &None, - log_schema(), - DEFAULT_DD_ORIGIN_PRODUCT_VALUE, - ); - assert!(result.is_ok()); + // series v1 + { + let result = generate_series_metrics( + &counter, + &None, + log_schema(), + DEFAULT_DD_ORIGIN_PRODUCT_VALUE, + ); + assert!(result.is_ok()); - let metrics = result.unwrap(); - assert_eq!(metrics.len(), 1); + let metrics = result.unwrap(); + assert_eq!(metrics.len(), 1); - let actual = &metrics[0]; - let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap(); + let actual = &metrics[0]; + let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap(); - assert_eq!(generated_origin.product().unwrap(), product); - assert_eq!(generated_origin.category().unwrap(), category); - assert_eq!(generated_origin.service().unwrap(), service); + assert_eq!(generated_origin.product().unwrap(), product); + assert_eq!(generated_origin.category().unwrap(), category); + assert_eq!(generated_origin.service().unwrap(), service); + } + // series v2 + { + let series_proto = series_to_proto_message( + &counter, + &None, + log_schema(), + DEFAULT_DD_ORIGIN_PRODUCT_VALUE, + ) + .unwrap(); + + let generated_origin = series_proto.metadata.unwrap().origin.unwrap(); + assert_eq!(generated_origin.origin_product, product); + assert_eq!(generated_origin.origin_category, category); + assert_eq!(generated_origin.origin_service, service); + } } #[test] @@ -1120,26 +1303,69 @@ mod tests { counter.metadata_mut().set_source_type("statsd"); - let result = generate_series_metrics(&counter, &None, log_schema(), product); - assert!(result.is_ok()); + // series v1 + { + let result = generate_series_metrics(&counter, &None, log_schema(), product); + assert!(result.is_ok()); - let metrics = result.unwrap(); - assert_eq!(metrics.len(), 1); + let metrics = result.unwrap(); + assert_eq!(metrics.len(), 1); - let actual = &metrics[0]; - let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap(); + let actual = &metrics[0]; + let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap(); - assert_eq!(generated_origin.product().unwrap(), product); - assert_eq!(generated_origin.category().unwrap(), category); - assert_eq!(generated_origin.service().unwrap(), service); + assert_eq!(generated_origin.product().unwrap(), product); + assert_eq!(generated_origin.category().unwrap(), category); + assert_eq!(generated_origin.service().unwrap(), service); + } + // series v2 + { + let series_proto = series_to_proto_message( + &counter, + &None, + log_schema(), + DEFAULT_DD_ORIGIN_PRODUCT_VALUE, + ) + .unwrap(); + + let generated_origin = series_proto.metadata.unwrap().origin.unwrap(); + assert_eq!(generated_origin.origin_product, product); + assert_eq!(generated_origin.origin_category, category); + assert_eq!(generated_origin.origin_service, service); + } } #[test] - fn encode_single_series_metric_with_default_limits() { + fn encode_single_series_v1_metric_with_default_limits() { // This is a simple test where we ensure that a single metric, with the default limits, can // be encoded without hitting any errors. - let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series, None) - .expect("default payload size limits should be valid"); + let mut encoder = + DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None) + .expect("default payload size limits should be valid"); + let counter = get_simple_counter(); + let expected = counter.clone(); + + // Encode the counter. + let result = encoder.try_encode(counter); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), None); + + // Finish the payload, make sure we got what we came for. + let result = encoder.finish(); + assert!(result.is_ok()); + + let (_payload, mut processed) = result.unwrap(); + assert_eq!(processed.len(), 1); + assert_eq!(expected, processed.pop().unwrap()); + } + + #[test] + fn encode_single_series_v2_metric_with_default_limits() { + // This is a simple test where we ensure that a single metric, with the default limits, can + // be encoded without hitting any errors. + let mut encoder = + DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None) + .expect("default payload size limits should be valid"); let counter = get_simple_counter(); let expected = counter.clone(); @@ -1225,15 +1451,18 @@ mod tests { for metric in &metrics { match metric.value() { MetricValue::Sketch { sketch } => match sketch { - MetricSketch::AgentDDSketch(ddsketch) => encode_sketch_incremental( - metric, - ddsketch, - &None, - log_schema(), - &mut incremental_buf, - 14, - ) - .unwrap(), + MetricSketch::AgentDDSketch(ddsketch) => { + if let Some(sketch_proto) = + sketch_to_proto_message(metric, ddsketch, &None, log_schema(), 14) + { + encode_proto_key_and_message( + sketch_proto, + get_sketch_payload_sketches_field_number(), + &mut incremental_buf, + ) + .unwrap(); + } + } }, _ => panic!("should be a sketch"), } @@ -1248,13 +1477,16 @@ mod tests { let header_len = max_uncompressed_header_len(); // This is too small. - let result = - validate_payload_size_limits(DatadogMetricsEndpoint::Series, header_len, usize::MAX); + let result = validate_payload_size_limits( + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), + header_len, + usize::MAX, + ); assert_eq!(result, None); // This is just right. let result = validate_payload_size_limits( - DatadogMetricsEndpoint::Series, + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), header_len + 1, usize::MAX, ); @@ -1267,7 +1499,7 @@ mod tests { // This is too small. let result = validate_payload_size_limits( - DatadogMetricsEndpoint::Series, + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), usize::MAX, compression_overhead_len, ); @@ -1275,7 +1507,7 @@ mod tests { // This is just right. let result = validate_payload_size_limits( - DatadogMetricsEndpoint::Series, + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), usize::MAX, compression_overhead_len + 1, ); @@ -1317,7 +1549,7 @@ mod tests { // uncompressed payload would exceed the limit. let header_len = max_uncompressed_header_len(); let mut encoder = DatadogMetricsEncoder::with_payload_limits( - DatadogMetricsEndpoint::Series, + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None, header_len + 1, usize::MAX, @@ -1393,7 +1625,7 @@ mod tests { let uncompressed_limit = 128; let compressed_limit = 32; let mut encoder = DatadogMetricsEncoder::with_payload_limits( - DatadogMetricsEndpoint::Series, + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None, uncompressed_limit, compressed_limit, @@ -1496,7 +1728,7 @@ mod tests { // We check this with targeted unit tests as well but this is some cheap insurance to // show that we're hopefully not missing any particular corner cases. let result = DatadogMetricsEncoder::with_payload_limits( - DatadogMetricsEndpoint::Series, + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None, uncompressed_limit, compressed_limit, diff --git a/src/sinks/datadog/metrics/integration_tests.rs b/src/sinks/datadog/metrics/integration_tests.rs index 99645780b65ca..8c993b22bca04 100644 --- a/src/sinks/datadog/metrics/integration_tests.rs +++ b/src/sinks/datadog/metrics/integration_tests.rs @@ -1,16 +1,21 @@ +use std::num::NonZeroU32; + use bytes::Bytes; use chrono::{SubsecRound, Utc}; use flate2::read::ZlibDecoder; use futures::{channel::mpsc::Receiver, stream, StreamExt}; +use http::request::Parts; use hyper::StatusCode; use indoc::indoc; +use prost::Message; use rand::{thread_rng, Rng}; + use vector_core::{ config::{init_telemetry, Tags, Telemetry}, event::{BatchNotifier, BatchStatus, Event, Metric, MetricKind, MetricValue}, + metric_tags, }; -use super::DatadogMetricsConfig; use crate::{ config::SinkConfig, sinks::util::test::{build_test_server_status, load_sink}, @@ -23,7 +28,18 @@ use crate::{ }, }; -fn generate_metric_events() -> Vec { +use super::{ + config::{SERIES_V1_PATH, SERIES_V2_PATH}, + encoder::{ORIGIN_CATEGORY_VALUE, ORIGIN_PRODUCT_VALUE}, + DatadogMetricsConfig, +}; + +#[allow(warnings, clippy::pedantic, clippy::nursery)] +mod ddmetric_proto { + include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs")); +} + +fn generate_counters() -> Vec { let timestamp = Utc::now().trunc_subsecs(3); let events: Vec<_> = (0..10) .map(|index| { @@ -36,14 +52,54 @@ fn generate_metric_events() -> Vec { value: index as f64, }, ) - .with_timestamp(Some(ts)), + .with_timestamp(Some(ts)) + .with_tags(Some(metric_tags!( + "resource.device" => "a_device", + "host" => "a_host", + "source_type_name" => "a_name", + "cool_tag_name" => "i_know_right", + ))), ) + // this ensures we get Origin Metadata, with an undefined service but that's ok. + .with_source_type("a_source_like_none_other") }) .collect(); events } +fn generate_counter_gauge_set() -> Vec { + let ts = Utc::now().trunc_subsecs(3); + let events = vec![ + // gauge + Event::Metric(Metric::new( + "gauge", + MetricKind::Incremental, + MetricValue::Gauge { value: 5678.0 }, + )), + // counter with interval + Event::Metric( + Metric::new( + "counter_with_interval", + MetricKind::Incremental, + MetricValue::Counter { value: 1234.0 }, + ) + .with_interval_ms(NonZeroU32::new(2000)) + .with_timestamp(Some(ts)), + ), + // set + Event::Metric(Metric::new( + "set", + MetricKind::Incremental, + MetricValue::Set { + values: vec!["zorp".into(), "zork".into()].into_iter().collect(), + }, + )), + ]; + + events +} + /// Starts a test sink with random metrics running into it /// /// This function starts a Datadog Metrics sink with a simplistic configuration and @@ -53,7 +109,7 @@ fn generate_metric_events() -> Vec { /// Testers may set `http_status` and `batch_status`. The first controls what /// status code faked HTTP responses will have, the second acts as a check on /// the `Receiver`'s status before being returned to the caller. -async fn start_test() -> (Vec, Receiver<(http::request::Parts, Bytes)>) { +async fn start_test(events: Vec) -> (Vec, Receiver<(http::request::Parts, Bytes)>) { let config = indoc! {r#" default_api_key = "atoken" default_namespace = "foo" @@ -73,8 +129,6 @@ async fn start_test() -> (Vec, Receiver<(http::request::Parts, Bytes)>) { let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let events = generate_metric_events(); - let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); sink.run(stream).await.unwrap(); @@ -92,28 +146,204 @@ fn decompress_payload(payload: Vec) -> std::io::Result> { } #[tokio::test] -/// Assert the basic functionality of the sink in good conditions +/// Assert proper handling of different metric types +async fn all_series_metric_types() { + let metrics = generate_counter_gauge_set(); + let (expected, rx) = start_test(metrics).await; + + let output = rx.take(expected.len()).collect::>().await; + + assert!(output.len() == 1, "Should have received a response"); + + let request = output.first().unwrap(); + + match request.0.uri.path() { + SERIES_V1_PATH => warn!("Deprecated endpoint used."), + SERIES_V2_PATH => validate_protobuf_set_gauge_rate(request), + _ => panic!("Unexpected request type received!"), + } +} + +#[tokio::test] +/// Assert the basic functionality of the sink in good conditions with +/// a small batch of counters. /// /// This test rigs the sink to return OK to responses, checks that all batches /// were delivered and then asserts that every message is able to be /// deserialized. +/// +/// In addition to validating the counter values, we also validate the various +/// fields such as the Resources, handling of tags, and the Metadata. async fn smoke() { - let (expected, rx) = start_test().await; + let counters = generate_counters(); + let (expected, rx) = start_test(counters).await; let output = rx.take(expected.len()).collect::>().await; assert!(output.len() == 1, "Should have received a response"); - let val = output.first().unwrap(); + let request = output.first().unwrap(); + match request.0.uri.path() { + SERIES_V1_PATH => validate_json_counters(request), + SERIES_V2_PATH => validate_protobuf_counters(request), + _ => panic!("Unexpected request type received!"), + } +} + +fn validate_common(request: &(Parts, Bytes)) { + assert_eq!(request.0.headers.get("DD-API-KEY").unwrap(), "atoken"); + assert!(request.0.headers.contains_key("DD-Agent-Payload")); +} + +fn validate_protobuf_counters(request: &(Parts, Bytes)) { assert_eq!( - val.0.headers.get("Content-Type").unwrap(), + request.0.headers.get("Content-Type").unwrap(), + "application/x-protobuf" + ); + + validate_common(request); + + let compressed_payload = request.1.to_vec(); + let payload = decompress_payload(compressed_payload).expect("Could not decompress payload"); + let frame = Bytes::copy_from_slice(&payload); + + let payload = + ddmetric_proto::MetricPayload::decode(frame).expect("Could not decode protobuf frame"); + + let series = payload.series; + + assert!(!series.is_empty()); + + // check metrics are sorted by name, which helps HTTP compression + let metric_names: Vec = series.iter().map(|serie| serie.metric.clone()).collect(); + let mut sorted_names = metric_names.clone(); + sorted_names.sort(); + assert_eq!(metric_names, sorted_names); + + series.iter().for_each(|serie| { + // name + assert!(serie.metric.starts_with("foo.counter_")); + + // type + assert_eq!( + serie.r#type(), + ddmetric_proto::metric_payload::MetricType::Count + ); + + // resources + serie + .resources + .iter() + .for_each(|resource| match resource.r#type.as_str() { + "host" => assert_eq!(resource.name.as_str(), "a_host"), + "device" => assert_eq!(resource.name.as_str(), "a_device"), + _ => panic!("Unexpected resource found!"), + }); + + // source_type_name + assert_eq!(serie.source_type_name, "a_name"); + + // tags + assert_eq!(serie.tags.len(), 1); + assert_eq!(serie.tags.first().unwrap(), "cool_tag_name:i_know_right"); + + // unit + assert!(serie.unit.is_empty()); + + // interval + assert_eq!(serie.interval, 0); + + // metadata + let origin_metadata = serie.metadata.as_ref().unwrap().origin.as_ref().unwrap(); + assert_eq!(origin_metadata.origin_product, *ORIGIN_PRODUCT_VALUE); + assert_eq!(origin_metadata.origin_category, ORIGIN_CATEGORY_VALUE); + assert_eq!(origin_metadata.origin_service, 0); + }); + + // points + // the input values are [0..10) + assert_eq!( + series + .iter() + .map(|serie| serie.points.iter().map(|point| point.value).sum::()) + .sum::(), + 45.0 + ); +} + +fn validate_protobuf_set_gauge_rate(request: &(Parts, Bytes)) { + assert_eq!( + request.0.headers.get("Content-Type").unwrap(), + "application/x-protobuf" + ); + + validate_common(request); + + let compressed_payload = request.1.to_vec(); + let payload = decompress_payload(compressed_payload).expect("Could not decompress payload"); + let frame = Bytes::copy_from_slice(&payload); + + let payload = + ddmetric_proto::MetricPayload::decode(frame).expect("Could not decode protobuf frame"); + + let mut series = payload.series; + + assert_eq!(series.len(), 3); + + // The below evaluation of each metric type implies validation of sorting the metrics + // by name to improve HTTP compression due to the order they are defined vs processed. + // However just to be safe we will also validate explicitly. + let metric_names: Vec = series.iter().map(|serie| serie.metric.clone()).collect(); + let mut sorted_names = metric_names.clone(); + sorted_names.sort(); + assert_eq!(metric_names, sorted_names); + + // validate set (gauge) + { + let gauge = series.pop().unwrap(); + assert_eq!( + gauge.r#type(), + ddmetric_proto::metric_payload::MetricType::Gauge + ); + assert_eq!(gauge.interval, 0); + assert_eq!(gauge.points[0].value, 2_f64); + } + + // validate gauge + { + let gauge = series.pop().unwrap(); + assert_eq!( + gauge.r#type(), + ddmetric_proto::metric_payload::MetricType::Gauge + ); + assert_eq!(gauge.points[0].value, 5678.0); + assert_eq!(gauge.interval, 0); + } + + // validate counter w interval = rate + { + let count = series.pop().unwrap(); + assert_eq!( + count.r#type(), + ddmetric_proto::metric_payload::MetricType::Rate + ); + assert_eq!(count.interval, 2); + + assert_eq!(count.points.len(), 1); + assert_eq!(count.points[0].value, 1234.0 / count.interval as f64); + } +} + +fn validate_json_counters(request: &(Parts, Bytes)) { + assert_eq!( + request.0.headers.get("Content-Type").unwrap(), "application/json" ); - assert_eq!(val.0.headers.get("DD-API-KEY").unwrap(), "atoken"); - assert!(val.0.headers.contains_key("DD-Agent-Payload")); - let compressed_payload = val.1.to_vec(); + validate_common(request); + + let compressed_payload = request.1.to_vec(); let payload = decompress_payload(compressed_payload).unwrap(); let payload = std::str::from_utf8(&payload).unwrap(); let payload: serde_json::Value = serde_json::from_str(payload).unwrap(); @@ -203,7 +433,7 @@ async fn run_sink() { let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); - let events = generate_metric_events(); + let events = generate_counters(); let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); diff --git a/src/sinks/datadog/metrics/request_builder.rs b/src/sinks/datadog/metrics/request_builder.rs index d217986d6f520..f84c95d482669 100644 --- a/src/sinks/datadog/metrics/request_builder.rs +++ b/src/sinks/datadog/metrics/request_builder.rs @@ -81,7 +81,7 @@ impl DatadogMetricsRequestBuilder { Ok(Self { endpoint_configuration, series_encoder: DatadogMetricsEncoder::new( - DatadogMetricsEndpoint::Series, + DatadogMetricsEndpoint::series(), default_namespace.clone(), )?, sketches_encoder: DatadogMetricsEncoder::new( @@ -93,7 +93,7 @@ impl DatadogMetricsRequestBuilder { fn get_encoder(&mut self, endpoint: DatadogMetricsEndpoint) -> &mut DatadogMetricsEncoder { match endpoint { - DatadogMetricsEndpoint::Series => &mut self.series_encoder, + DatadogMetricsEndpoint::Series { .. } => &mut self.series_encoder, DatadogMetricsEndpoint::Sketches => &mut self.sketches_encoder, } } diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index fccfe040fdd95..d04ba43d70bed 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -41,12 +41,13 @@ impl Partitioner for DatadogMetricsTypePartitioner { fn partition(&self, item: &Self::Item) -> Self::Key { let endpoint = match item.data().value() { - MetricValue::Counter { .. } => DatadogMetricsEndpoint::Series, - MetricValue::Gauge { .. } => DatadogMetricsEndpoint::Series, - MetricValue::Set { .. } => DatadogMetricsEndpoint::Series, + MetricValue::Counter { .. } => DatadogMetricsEndpoint::series(), + MetricValue::Gauge { .. } => DatadogMetricsEndpoint::series(), + MetricValue::Set { .. } => DatadogMetricsEndpoint::series(), MetricValue::Distribution { .. } => DatadogMetricsEndpoint::Sketches, MetricValue::AggregatedHistogram { .. } => DatadogMetricsEndpoint::Sketches, - MetricValue::AggregatedSummary { .. } => DatadogMetricsEndpoint::Series, + // NOTE: AggregatedSummary will be split into counters and gauges during normalization + MetricValue::AggregatedSummary { .. } => DatadogMetricsEndpoint::series(), MetricValue::Sketch { .. } => DatadogMetricsEndpoint::Sketches, }; (item.metadata().datadog_api_key(), endpoint)