Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more tests to Metric SDK aggregation #1600

Merged
183 changes: 183 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub use view::*;

#[cfg(all(test, feature = "testing"))]
mod tests {
use self::data::ScopeMetrics;
use super::*;
use crate::metrics::data::{ResourceMetrics, Temporality};
use crate::metrics::reader::TemporalitySelector;
Expand All @@ -70,6 +71,7 @@ mod tests {
metrics::{MeterProvider as _, Unit},
KeyValue,
};
use std::borrow::Cow;

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
Expand Down Expand Up @@ -214,6 +216,178 @@ mod tests {
assert_eq!(datapoint.value, 15);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
Comment on lines +219 to +220
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't think we need this for every test but not a strong opinion

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_duplicate_instrument_different_meter_no_merge() {
// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
let meter1 = meter_provider.meter("test.meter1");
let meter2 = meter_provider.meter("test.meter2");
let counter1 = meter1
.u64_counter("my_counter")
.with_unit(Unit::new("my_unit"))
.with_description("my_description")
.init();

let counter2 = meter2
.u64_counter("my_counter")
.with_unit(Unit::new("my_unit"))
.with_description("my_description")
.init();

let attribute = vec![KeyValue::new("key1", "value1")];
counter1.add(10, &attribute);
counter2.add(5, &attribute);
Comment on lines +229 to +245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just loop over test.meter1 and test.meter2 here?


meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(
resource_metrics[0].scope_metrics.len() == 2,
"There should be 2 separate scope"
);
assert!(
resource_metrics[0].scope_metrics[0].metrics.len() == 1,
"There should be single metric for the scope"
);
assert!(
resource_metrics[0].scope_metrics[1].metrics.len() == 1,
"There should be single metric for the scope"
);

let scope1 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter1");
let scope2 = find_scope_metric(&resource_metrics[0].scope_metrics, "test.meter2");

if let Some(scope1) = scope1 {
let metric1 = &scope1.metrics[0];
assert_eq!(metric1.name, "my_counter");
assert_eq!(metric1.unit.as_str(), "my_unit");
assert_eq!(metric1.description, "my_description");
let sum1 = metric1
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for Counter instruments by default");

// Expecting 1 time-series.
assert_eq!(sum1.data_points.len(), 1);

let datapoint1 = &sum1.data_points[0];
assert_eq!(datapoint1.value, 10);
} else {
panic!("No MetricScope found for 'test.meter1'");
}

if let Some(scope2) = scope2 {
let metric2 = &scope2.metrics[0];
assert_eq!(metric2.name, "my_counter");
assert_eq!(metric2.unit.as_str(), "my_unit");
assert_eq!(metric2.description, "my_description");
let sum2 = metric2
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for Counter instruments by default");

// Expecting 1 time-series.
assert_eq!(sum2.data_points.len(), 1);

let datapoint2 = &sum2.data_points[0];
assert_eq!(datapoint2.value, 5);
} else {
panic!("No MetricScope found for 'test.meter2'");
}
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn instrumentation_scope_identity_test() {
// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
// Meters are identical except for scope attributes, but scope attributes are not an identifying property.
// Hence there should be a single metric stream output for this test.
let meter1 = meter_provider.versioned_meter(
"test.meter1",
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
Some("v0.1.0"),
Some("schema_url"),
Some(vec![KeyValue::new("key", "value1")]),
);
let meter2 = meter_provider.versioned_meter(
"test.meter1",
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
Some("v0.1.0"),
Some("schema_url"),
Some(vec![KeyValue::new("key", "value2")]),
);
let counter1 = meter1
.u64_counter("my_counter")
.with_unit(Unit::new("my_unit"))
.with_description("my_description")
.init();

let counter2 = meter2
.u64_counter("my_counter")
.with_unit(Unit::new("my_unit"))
.with_description("my_description")
.init();

let attribute = vec![KeyValue::new("key1", "value1")];
counter1.add(10, &attribute);
counter2.add(5, &attribute);

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
println!("resource_metrics: {:?}", resource_metrics);
assert!(
resource_metrics[0].scope_metrics.len() == 1,
"There should be a single scope as the meters are identical"
);
assert!(
resource_metrics[0].scope_metrics[0].metrics.len() == 1,
"There should be single metric for the scope as instruments are identical"
);

let scope = &resource_metrics[0].scope_metrics[0].scope;
assert_eq!(scope.name, "test.meter1");
cijothomas marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(scope.version, Some(Cow::Borrowed("v0.1.0")));
assert_eq!(scope.schema_url, Some(Cow::Borrowed("schema_url")));

// Should we validate this, as this is a user error
assert_eq!(scope.attributes, vec![KeyValue::new("key", "value1")]);
cijothomas marked this conversation as resolved.
Show resolved Hide resolved

let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_counter");
assert_eq!(metric.unit.as_str(), "my_unit");
assert_eq!(metric.description, "my_description");
let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for Counter instruments by default");

// Expecting 1 time-series.
assert_eq!(sum.data_points.len(), 1);

let datapoint = &sum.data_points[0];
assert_eq!(datapoint.value, 15);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down Expand Up @@ -692,6 +866,15 @@ mod tests {
assert!(resource_metrics.is_empty(), "No metrics should be exported as no new measurements were recorded since last collect.");
}

fn find_scope_metric<'a>(
metrics: &'a [ScopeMetrics],
name: &'a str,
) -> Option<&'a ScopeMetrics> {
metrics
.iter()
.find(|&scope_metric| scope_metric.scope.name == name)
}

struct DeltaTemporalitySelector();
impl TemporalitySelector for DeltaTemporalitySelector {
fn temporality(&self, _kind: InstrumentKind) -> Temporality {
Expand Down
Loading