From c0357cb7726548f4bb822ce2b433e7c96c75e5e2 Mon Sep 17 00:00:00 2001 From: ymgyt Date: Sat, 5 Aug 2023 01:45:18 +0900 Subject: [PATCH 1/8] feat: produce measurement with attributes --- src/metrics.rs | 195 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 133 insertions(+), 62 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index da8d1ca..b6b76f7 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,8 +1,15 @@ use std::{collections::HashMap, fmt, sync::RwLock}; -use tracing::{field::Visit, Subscriber}; -use tracing_core::Field; +use tracing::{ + callsite::Identifier, + field::{FieldSet, Visit}, + Subscriber, +}; +use tracing_core::{Field, Interest}; -use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}; +use opentelemetry::{ + metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}, + KeyValue, Value, +}; use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -43,6 +50,7 @@ impl Instruments { meter: &Meter, instrument_type: InstrumentType, metric_name: &'static str, + attributes: &[KeyValue], ) { fn update_or_insert( map: &MetricsMap, @@ -73,7 +81,7 @@ impl Instruments { &self.u64_counter, metric_name, || meter.u64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(value, attributes), ); } InstrumentType::CounterF64(value) => { @@ -81,7 +89,7 @@ impl Instruments { &self.f64_counter, metric_name, || meter.f64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(value, attributes), ); } InstrumentType::UpDownCounterI64(value) => { @@ -89,7 +97,7 @@ impl Instruments { &self.i64_up_down_counter, metric_name, || meter.i64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(value, attributes), ); } InstrumentType::UpDownCounterF64(value) => { @@ -97,7 +105,7 @@ impl Instruments { &self.f64_up_down_counter, metric_name, || meter.f64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(value, attributes), ); } InstrumentType::HistogramU64(value) => { @@ -105,7 +113,7 @@ impl Instruments { &self.u64_histogram, metric_name, || meter.u64_histogram(metric_name).init(), - |rec| rec.record(value, &[]), + |rec| rec.record(value, attributes), ); } InstrumentType::HistogramI64(value) => { @@ -113,7 +121,7 @@ impl Instruments { &self.i64_histogram, metric_name, || meter.i64_histogram(metric_name).init(), - |rec| rec.record(value, &[]), + |rec| rec.record(value, attributes), ); } InstrumentType::HistogramF64(value) => { @@ -121,7 +129,7 @@ impl Instruments { &self.f64_histogram, metric_name, || meter.f64_histogram(metric_name).init(), - |rec| rec.record(value, &[]), + |rec| rec.record(value, attributes), ); } }; @@ -129,8 +137,8 @@ impl Instruments { } pub(crate) struct MetricVisitor<'a> { - pub(crate) instruments: &'a Instruments, - pub(crate) meter: &'a Meter, + attributes: &'a mut Vec, + visited_metrics: &'a mut Vec<(&'static str, InstrumentType)>, } impl<'a> Visit for MetricVisitor<'a> { @@ -140,18 +148,12 @@ impl<'a> Visit for MetricVisitor<'a> { fn record_u64(&mut self, field: &Field, value: u64) { if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { - self.instruments.update_metric( - self.meter, - InstrumentType::CounterU64(value), - metric_name, - ); + self.visited_metrics + .push((metric_name, InstrumentType::CounterU64(value))); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { if value <= I64_MAX { - self.instruments.update_metric( - self.meter, - InstrumentType::UpDownCounterI64(value as i64), - metric_name, - ); + self.visited_metrics + .push((metric_name, InstrumentType::UpDownCounterI64(value as i64))); } else { eprintln!( "[tracing-opentelemetry]: Received Counter metric, but \ @@ -161,57 +163,83 @@ impl<'a> Visit for MetricVisitor<'a> { ); } } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { - self.instruments.update_metric( - self.meter, - InstrumentType::HistogramU64(value), - metric_name, - ); + self.visited_metrics + .push((metric_name, InstrumentType::HistogramU64(value))); + } else if value <= I64_MAX { + self.attributes + .push(KeyValue::new(field.name(), Value::I64(value as i64))); } } fn record_f64(&mut self, field: &Field, value: f64) { if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { - self.instruments.update_metric( - self.meter, - InstrumentType::CounterF64(value), - metric_name, - ); + self.visited_metrics + .push((metric_name, InstrumentType::CounterF64(value))); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { - self.instruments.update_metric( - self.meter, - InstrumentType::UpDownCounterF64(value), - metric_name, - ); + self.visited_metrics + .push((metric_name, InstrumentType::UpDownCounterF64(value))); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { - self.instruments.update_metric( - self.meter, - InstrumentType::HistogramF64(value), - metric_name, - ); + self.visited_metrics + .push((metric_name, InstrumentType::HistogramF64(value))); + } else { + self.attributes + .push(KeyValue::new(field.name(), Value::F64(value))); } } fn record_i64(&mut self, field: &Field, value: i64) { if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { - self.instruments.update_metric( - self.meter, - InstrumentType::CounterU64(value as u64), - metric_name, - ); + self.visited_metrics + .push((metric_name, InstrumentType::CounterU64(value as u64))); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { - self.instruments.update_metric( - self.meter, - InstrumentType::UpDownCounterI64(value), - metric_name, - ); + self.visited_metrics + .push((metric_name, InstrumentType::UpDownCounterI64(value))); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { - self.instruments.update_metric( - self.meter, - InstrumentType::HistogramI64(value), - metric_name, - ); + self.visited_metrics + .push((metric_name, InstrumentType::HistogramI64(value))); + } else { + self.attributes.push(KeyValue::new(field.name(), value)); } } + + fn record_str(&mut self, field: &Field, value: &str) { + self.attributes + .push(KeyValue::new(field.name(), value.to_owned())); + } + + fn record_bool(&mut self, field: &Field, value: bool) { + self.attributes.push(KeyValue::new(field.name(), value)); + } +} + +struct Callsites { + callsites: RwLock>, +} + +#[derive(Default)] +struct CallsiteInfo { + has_metrics_fields: bool, +} + +impl Callsites { + fn new() -> Self { + Callsites { + callsites: Default::default(), + } + } + + fn register_metrics_callsite(&self, callsite: Identifier, info: CallsiteInfo) { + self.callsites.write().unwrap().insert(callsite, info); + } + + fn has_metrics_field(&self, callsite: &Identifier) -> bool { + self.callsites + .read() + .unwrap() + .get(callsite) + .map(|info| info.has_metrics_fields) + .unwrap_or(false) + } } /// A layer that publishes metrics via the OpenTelemetry SDK. @@ -322,6 +350,7 @@ impl<'a> Visit for MetricVisitor<'a> { pub struct MetricsLayer { meter: Meter, instruments: Instruments, + callsites: Callsites, } impl MetricsLayer { @@ -339,8 +368,18 @@ impl MetricsLayer { MetricsLayer { meter, instruments: Default::default(), + callsites: Callsites::new(), } } + + fn has_metrics_fields(&self, field_set: &FieldSet) -> bool { + field_set.iter().any(|field| { + let name = field.name(); + name.starts_with(METRIC_PREFIX_COUNTER) + || name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER) + || name.starts_with(METRIC_PREFIX_HISTOGRAM) + }) + } } impl Layer for MetricsLayer @@ -348,10 +387,42 @@ where S: Subscriber + for<'span> LookupSpan<'span>, { fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) { - let mut metric_visitor = MetricVisitor { - instruments: &self.instruments, - meter: &self.meter, - }; - event.record(&mut metric_visitor); + if self + .callsites + .has_metrics_field(&event.metadata().callsite()) + { + let mut attributes = Vec::new(); + let mut visited_metrics = Vec::new(); + let mut metric_visitor = MetricVisitor { + attributes: &mut attributes, + visited_metrics: &mut visited_metrics, + }; + event.record(&mut metric_visitor); + + // associate attrivutes with visited metrics + visited_metrics + .into_iter() + .for_each(|(metric_name, value)| { + self.instruments.update_metric( + &self.meter, + value, + metric_name, + attributes.as_slice(), + ); + }) + } + } + + fn register_callsite(&self, metadata: &'static tracing::Metadata<'static>) -> Interest { + if metadata.is_event() && self.has_metrics_fields(metadata.fields()) { + self.callsites.register_metrics_callsite( + metadata.callsite(), + CallsiteInfo { + has_metrics_fields: true, + }, + ); + } + + Interest::always() } } From 194d7836f07e53a4abe1fdb3f0ef8c5bd17729af Mon Sep 17 00:00:00 2001 From: ymgyt Date: Sat, 5 Aug 2023 01:45:53 +0900 Subject: [PATCH 2/8] test: add attributes test case to metrics --- tests/metrics_publishing.rs | 285 ++++++++++++++++++++++++++++++++++-- 1 file changed, 273 insertions(+), 12 deletions(-) diff --git a/tests/metrics_publishing.rs b/tests/metrics_publishing.rs index faad19e..04b04de 100644 --- a/tests/metrics_publishing.rs +++ b/tests/metrics_publishing.rs @@ -9,9 +9,9 @@ use opentelemetry::{ }, InstrumentKind, ManualReader, MeterProvider, }, - Resource, + AttributeSet, Resource, }, - Context, + Context, KeyValue, }; use std::{fmt::Debug, sync::Arc}; use tracing::Subscriber; @@ -23,8 +23,12 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; #[tokio::test] async fn u64_counter_is_exported() { - let (subscriber, exporter) = - init_subscriber("hello_world".to_string(), InstrumentKind::Counter, 1_u64); + let (subscriber, exporter) = init_subscriber( + "hello_world".to_string(), + InstrumentKind::Counter, + 1_u64, + None, + ); tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world = 1_u64); @@ -35,8 +39,12 @@ async fn u64_counter_is_exported() { #[tokio::test] async fn u64_counter_is_exported_i64_at_instrumentation_point() { - let (subscriber, exporter) = - init_subscriber("hello_world2".to_string(), InstrumentKind::Counter, 1_u64); + let (subscriber, exporter) = init_subscriber( + "hello_world2".to_string(), + InstrumentKind::Counter, + 1_u64, + None, + ); tracing::subscriber::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world2 = 1_i64); @@ -51,6 +59,7 @@ async fn f64_counter_is_exported() { "float_hello_world".to_string(), InstrumentKind::Counter, 1.000000123_f64, + None, ); tracing::subscriber::with_default(subscriber, || { @@ -62,8 +71,12 @@ async fn f64_counter_is_exported() { #[tokio::test] async fn i64_up_down_counter_is_exported() { - let (subscriber, exporter) = - init_subscriber("pebcak".to_string(), InstrumentKind::UpDownCounter, -5_i64); + let (subscriber, exporter) = init_subscriber( + "pebcak".to_string(), + InstrumentKind::UpDownCounter, + -5_i64, + None, + ); tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak = -5_i64); @@ -74,8 +87,12 @@ async fn i64_up_down_counter_is_exported() { #[tokio::test] async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { - let (subscriber, exporter) = - init_subscriber("pebcak2".to_string(), InstrumentKind::UpDownCounter, 5_i64); + let (subscriber, exporter) = init_subscriber( + "pebcak2".to_string(), + InstrumentKind::UpDownCounter, + 5_i64, + None, + ); tracing::subscriber::with_default(subscriber, || { tracing::info!(counter.pebcak2 = 5_u64); @@ -90,6 +107,7 @@ async fn f64_up_down_counter_is_exported() { "pebcak_blah".to_string(), InstrumentKind::UpDownCounter, 99.123_f64, + None, ); tracing::subscriber::with_default(subscriber, || { @@ -101,8 +119,12 @@ async fn f64_up_down_counter_is_exported() { #[tokio::test] async fn u64_histogram_is_exported() { - let (subscriber, exporter) = - init_subscriber("abcdefg".to_string(), InstrumentKind::Histogram, 9_u64); + let (subscriber, exporter) = init_subscriber( + "abcdefg".to_string(), + InstrumentKind::Histogram, + 9_u64, + None, + ); tracing::subscriber::with_default(subscriber, || { tracing::info!(histogram.abcdefg = 9_u64); @@ -117,6 +139,7 @@ async fn i64_histogram_is_exported() { "abcdefg_auenatsou".to_string(), InstrumentKind::Histogram, -19_i64, + None, ); tracing::subscriber::with_default(subscriber, || { @@ -132,6 +155,7 @@ async fn f64_histogram_is_exported() { "abcdefg_racecar".to_string(), InstrumentKind::Histogram, 777.0012_f64, + None, ); tracing::subscriber::with_default(subscriber, || { @@ -141,10 +165,235 @@ async fn f64_histogram_is_exported() { exporter.export().unwrap(); } +#[tokio::test] +async fn u64_counter_with_attributes_is_exported() { + let (subscriber, exporter) = init_subscriber( + "hello_world".to_string(), + InstrumentKind::Counter, + 1_u64, + Some(AttributeSet::from( + [ + KeyValue::new("u64_key_1", 1_i64), + KeyValue::new("i64_key_1", 2_i64), + KeyValue::new("f64_key_1", 3_f64), + KeyValue::new("str_key_1", "foo"), + KeyValue::new("bool_key_1", true), + ] + .as_slice(), + )), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!( + monotonic_counter.hello_world = 1_u64, + u64_key_1 = 1_u64, + i64_key_1 = 2_i64, + f64_key_1 = 3_f64, + str_key_1 = "foo", + bool_key_1 = true, + ); + }); + + exporter.export().unwrap(); +} + +#[tokio::test] +async fn f64_counter_with_attributes_is_exported() { + let (subscriber, exporter) = init_subscriber( + "hello_world".to_string(), + InstrumentKind::Counter, + 1_f64, + Some(AttributeSet::from( + [ + KeyValue::new("u64_key_1", 1_i64), + KeyValue::new("i64_key_1", 2_i64), + KeyValue::new("f64_key_1", 3_f64), + KeyValue::new("str_key_1", "foo"), + KeyValue::new("bool_key_1", true), + ] + .as_slice(), + )), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!( + monotonic_counter.hello_world = 1_f64, + u64_key_1 = 1_u64, + i64_key_1 = 2_i64, + f64_key_1 = 3_f64, + str_key_1 = "foo", + bool_key_1 = true, + ); + }); + + exporter.export().unwrap(); +} + +#[tokio::test] +async fn i64_up_down_counter_with_attributes_is_exported() { + let (subscriber, exporter) = init_subscriber( + "hello_world".to_string(), + InstrumentKind::UpDownCounter, + -1_i64, + Some(AttributeSet::from( + [ + KeyValue::new("u64_key_1", 1_i64), + KeyValue::new("i64_key_1", 2_i64), + KeyValue::new("f64_key_1", 3_f64), + KeyValue::new("str_key_1", "foo"), + KeyValue::new("bool_key_1", true), + ] + .as_slice(), + )), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!( + counter.hello_world = -1_i64, + u64_key_1 = 1_u64, + i64_key_1 = 2_i64, + f64_key_1 = 3_f64, + str_key_1 = "foo", + bool_key_1 = true, + ); + }); + + exporter.export().unwrap(); +} + +#[tokio::test] +async fn f64_up_down_counter_with_attributes_is_exported() { + let (subscriber, exporter) = init_subscriber( + "hello_world".to_string(), + InstrumentKind::UpDownCounter, + -1_f64, + Some(AttributeSet::from( + [ + KeyValue::new("u64_key_1", 1_i64), + KeyValue::new("i64_key_1", 2_i64), + KeyValue::new("f64_key_1", 3_f64), + KeyValue::new("str_key_1", "foo"), + KeyValue::new("bool_key_1", true), + ] + .as_slice(), + )), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!( + counter.hello_world = -1_f64, + u64_key_1 = 1_u64, + i64_key_1 = 2_i64, + f64_key_1 = 3_f64, + str_key_1 = "foo", + bool_key_1 = true, + ); + }); + + exporter.export().unwrap(); +} + +#[tokio::test] +async fn u64_histogram_with_attributes_is_exported() { + let (subscriber, exporter) = init_subscriber( + "hello_world".to_string(), + InstrumentKind::Histogram, + 1_u64, + Some(AttributeSet::from( + [ + KeyValue::new("u64_key_1", 1_i64), + KeyValue::new("i64_key_1", 2_i64), + KeyValue::new("f64_key_1", 3_f64), + KeyValue::new("str_key_1", "foo"), + KeyValue::new("bool_key_1", true), + ] + .as_slice(), + )), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!( + histogram.hello_world = 1_u64, + u64_key_1 = 1_u64, + i64_key_1 = 2_i64, + f64_key_1 = 3_f64, + str_key_1 = "foo", + bool_key_1 = true, + ); + }); + + exporter.export().unwrap(); +} + +#[tokio::test] +async fn i64_histogram_with_attributes_is_exported() { + let (subscriber, exporter) = init_subscriber( + "hello_world".to_string(), + InstrumentKind::Histogram, + -1_i64, + Some(AttributeSet::from( + [ + KeyValue::new("u64_key_1", 1_i64), + KeyValue::new("i64_key_1", 2_i64), + KeyValue::new("f64_key_1", 3_f64), + KeyValue::new("str_key_1", "foo"), + KeyValue::new("bool_key_1", true), + ] + .as_slice(), + )), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!( + histogram.hello_world = -1_i64, + u64_key_1 = 1_u64, + i64_key_1 = 2_i64, + f64_key_1 = 3_f64, + str_key_1 = "foo", + bool_key_1 = true, + ); + }); + + exporter.export().unwrap(); +} + +#[tokio::test] +async fn f64_histogram_with_attributes_is_exported() { + let (subscriber, exporter) = init_subscriber( + "hello_world".to_string(), + InstrumentKind::Histogram, + 1_f64, + Some(AttributeSet::from( + [ + KeyValue::new("u64_key_1", 1_i64), + KeyValue::new("i64_key_1", 2_i64), + KeyValue::new("f64_key_1", 3_f64), + KeyValue::new("str_key_1", "foo"), + KeyValue::new("bool_key_1", true), + ] + .as_slice(), + )), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::info!( + histogram.hello_world = 1_f64, + u64_key_1 = 1_u64, + i64_key_1 = 2_i64, + f64_key_1 = 3_f64, + str_key_1 = "foo", + bool_key_1 = true, + ); + }); + + exporter.export().unwrap(); +} + fn init_subscriber( expected_metric_name: String, expected_instrument_kind: InstrumentKind, expected_value: T, + expected_attributes: Option, ) -> (impl Subscriber + 'static, TestExporter) { let reader = ManualReader::builder() .with_aggregation_selector(Box::new(DefaultAggregationSelector::new())) @@ -159,6 +408,7 @@ fn init_subscriber( expected_metric_name, expected_instrument_kind, expected_value, + expected_attributes, reader, _meter_provider: provider.clone(), }; @@ -218,6 +468,7 @@ struct TestExporter { expected_metric_name: String, expected_instrument_kind: InstrumentKind, expected_value: T, + expected_attributes: Option, reader: TestReader, _meter_provider: MeterProvider, } @@ -255,6 +506,12 @@ where .map(|data_point| data_point.value) .sum() ); + + if let Some(expected_attributes) = self.expected_attributes.as_ref() { + sum.data_points.iter().for_each(|data_point| { + assert_eq!(expected_attributes, &data_point.attributes,) + }); + } } InstrumentKind::Histogram => { let histogram = @@ -262,6 +519,10 @@ where let histogram_data = histogram.data_points.first().unwrap(); assert!(histogram_data.count > 0); assert_eq!(histogram_data.sum, self.expected_value); + + if let Some(expected_attributes) = self.expected_attributes.as_ref() { + assert_eq!(expected_attributes, &histogram_data.attributes); + } } unexpected => { panic!("InstrumentKind {:?} not currently supported!", unexpected) From c3650585a87228b969481e9371fb28f6c5aa670f Mon Sep 17 00:00:00 2001 From: ymgyt Date: Sat, 5 Aug 2023 17:44:17 +0900 Subject: [PATCH 3/8] feat: use Filter trait --- Cargo.toml | 1 + src/metrics.rs | 140 ++++++++++++++++++++----------------------------- 2 files changed, 58 insertions(+), 83 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d74ece3..4bd1685 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ once_cell = "1.13.0" # Fix minimal-versions async-trait = { version = "0.1.56", optional = true } thiserror = { version = "1.0.31", optional = true } +smallvec = "1.11.0" [dev-dependencies] async-trait = "0.1.56" diff --git a/src/metrics.rs b/src/metrics.rs index b6b76f7..cfb8c57 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,16 +1,18 @@ use std::{collections::HashMap, fmt, sync::RwLock}; -use tracing::{ - callsite::Identifier, - field::{FieldSet, Visit}, - Subscriber, -}; -use tracing_core::{Field, Interest}; +use tracing::{field::Visit, Subscriber}; +use tracing_core::{Field, Metadata}; use opentelemetry::{ metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}, KeyValue, Value, }; -use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; +use tracing_subscriber::{ + layer::{Context, Filter}, + registry::LookupSpan, + Layer, +}; + +use smallvec::SmallVec; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; @@ -137,8 +139,8 @@ impl Instruments { } pub(crate) struct MetricVisitor<'a> { - attributes: &'a mut Vec, - visited_metrics: &'a mut Vec<(&'static str, InstrumentType)>, + attributes: &'a mut SmallVec<[KeyValue; 8]>, + visited_metrics: &'a mut SmallVec<[(&'static str, InstrumentType); 2]>, } impl<'a> Visit for MetricVisitor<'a> { @@ -212,36 +214,6 @@ impl<'a> Visit for MetricVisitor<'a> { } } -struct Callsites { - callsites: RwLock>, -} - -#[derive(Default)] -struct CallsiteInfo { - has_metrics_fields: bool, -} - -impl Callsites { - fn new() -> Self { - Callsites { - callsites: Default::default(), - } - } - - fn register_metrics_callsite(&self, callsite: Identifier, info: CallsiteInfo) { - self.callsites.write().unwrap().insert(callsite, info); - } - - fn has_metrics_field(&self, callsite: &Identifier) -> bool { - self.callsites - .read() - .unwrap() - .get(callsite) - .map(|info| info.has_metrics_fields) - .unwrap_or(false) - } -} - /// A layer that publishes metrics via the OpenTelemetry SDK. /// /// # Usage @@ -350,7 +322,6 @@ impl Callsites { pub struct MetricsLayer { meter: Meter, instruments: Instruments, - callsites: Callsites, } impl MetricsLayer { @@ -365,21 +336,12 @@ impl MetricsLayer { None::<&'static str>, None, ); + MetricsLayer { meter, instruments: Default::default(), - callsites: Callsites::new(), } } - - fn has_metrics_fields(&self, field_set: &FieldSet) -> bool { - field_set.iter().any(|field| { - let name = field.name(); - name.starts_with(METRIC_PREFIX_COUNTER) - || name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER) - || name.starts_with(METRIC_PREFIX_HISTOGRAM) - }) - } } impl Layer for MetricsLayer @@ -387,42 +349,54 @@ where S: Subscriber + for<'span> LookupSpan<'span>, { fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) { - if self - .callsites - .has_metrics_field(&event.metadata().callsite()) - { - let mut attributes = Vec::new(); - let mut visited_metrics = Vec::new(); - let mut metric_visitor = MetricVisitor { - attributes: &mut attributes, - visited_metrics: &mut visited_metrics, - }; - event.record(&mut metric_visitor); + let mut attributes = SmallVec::new(); + let mut visited_metrics = SmallVec::new(); + let mut metric_visitor = MetricVisitor { + attributes: &mut attributes, + visited_metrics: &mut visited_metrics, + }; + event.record(&mut metric_visitor); - // associate attrivutes with visited metrics - visited_metrics - .into_iter() - .for_each(|(metric_name, value)| { - self.instruments.update_metric( - &self.meter, - value, - metric_name, - attributes.as_slice(), - ); - }) - } + // associate attrivutes with visited metrics + visited_metrics + .into_iter() + .for_each(|(metric_name, value)| { + self.instruments.update_metric( + &self.meter, + value, + metric_name, + attributes.as_slice(), + ); + }) } +} - fn register_callsite(&self, metadata: &'static tracing::Metadata<'static>) -> Interest { - if metadata.is_event() && self.has_metrics_fields(metadata.fields()) { - self.callsites.register_metrics_callsite( - metadata.callsite(), - CallsiteInfo { - has_metrics_fields: true, - }, - ); - } +impl Filter for MetricsLayer { + fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool { + meta.is_event() + && meta.fields().iter().any(|field| { + let name = field.name(); + name.starts_with(METRIC_PREFIX_COUNTER) + || name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER) + || name.starts_with(METRIC_PREFIX_HISTOGRAM) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry::metrics::noop::NoopMeterProvider; + use tracing_subscriber::layer::SubscriberExt; + + #[test] + fn layer_should_not_panic_on_non_metrics_event() { + let noop = NoopMeterProvider::new(); + let layer = MetricsLayer::new(noop); + let subscriber = tracing_subscriber::registry().with(layer); - Interest::always() + tracing::subscriber::with_default(subscriber, || { + tracing::info!(key = "val", "foo"); + }); } } From 802809cb44b83e2314359db79b6de3f6a848cb72 Mon Sep 17 00:00:00 2001 From: ymgyt Date: Sat, 5 Aug 2023 21:00:56 +0900 Subject: [PATCH 4/8] chore: add metrics bench --- Cargo.toml | 4 ++ benches/metrics.rs | 114 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 benches/metrics.rs diff --git a/Cargo.toml b/Cargo.toml index 4bd1685..d1716fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,10 @@ bench = false name = "trace" harness = false +[[bench]] +name = "metrics" +harness = false + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/benches/metrics.rs b/benches/metrics.rs new file mode 100644 index 0000000..a955553 --- /dev/null +++ b/benches/metrics.rs @@ -0,0 +1,114 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use opentelemetry::metrics::noop::NoopMeterProvider; +use tracing_opentelemetry::MetricsLayer; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +fn metrics_events(c: &mut Criterion) { + let mut group = c.benchmark_group("otel_metrics_events"); + { + let _subscriber = tracing_subscriber::registry().set_default(); + group.bench_function("no_metrics_layer", |b| { + b.iter(|| { + tracing::info!(key_1 = "va", "msg"); + }) + }); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(MetricsLayer::new(NoopMeterProvider::new())) + .set_default(); + group.bench_function("no_metrics_events", |b| { + b.iter(|| { + tracing::info!(key_1 = "va", "msg"); + }) + }); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(MetricsLayer::new(NoopMeterProvider::new())) + .set_default(); + group.bench_function("1_metrics_event", |b| { + b.iter(|| { + tracing::info!(monotonic_counter.foo = 1, "msg"); + }) + }); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(MetricsLayer::new(NoopMeterProvider::new())) + .set_default(); + group.bench_function("2_metrics_events", |b| { + b.iter(|| { + tracing::info!(monotonic_counter.foo = 1, monotonic_counter.bar = 1, "msg"); + }) + }); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(MetricsLayer::new(NoopMeterProvider::new())) + .set_default(); + group.bench_function("1_metrics_event_with_attributes", |b| { + b.iter(|| { + tracing::info!(monotonic_counter.foo = 1, attr_1 = 10, "msg"); + }) + }); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(MetricsLayer::new(NoopMeterProvider::new())) + .set_default(); + group.bench_function("1_metrics_event_with_attributes_8", |b| { + b.iter(|| { + tracing::info!( + monotonic_counter.foo = 1, + attr_1 = 10, + attr_2 = 20, + attr_3 = 30, + attr_4 = 40, + attr_5 = 50, + attr_6 = 60, + attr_7 = 70, + attr_8 = 80, + "msg" + ); + }) + }); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(MetricsLayer::new(NoopMeterProvider::new())) + .set_default(); + group.bench_function("no_metrics_event_with_attributes_12", |b| { + b.iter(|| { + tracing::info!( + attr_1 = 10, + attr_2 = 20, + attr_3 = 30, + attr_4 = 40, + attr_5 = 50, + attr_6 = 60, + attr_7 = 70, + attr_8 = 80, + attr_9 = 90, + attr_10 = 100, + attr_11 = 110, + attr_12 = 120, + "msg" + ); + }) + }); + } +} + +criterion_group! { + name = benches; + config = Criterion::default(); + targets = metrics_events +} +criterion_main!(benches); From 3cfc96fe61791ed1363e5f2f77f2bf249e07c5ee Mon Sep 17 00:00:00 2001 From: ymgyt Date: Sat, 5 Aug 2023 21:03:14 +0900 Subject: [PATCH 5/8] fix: wrap with filter layer --- src/metrics.rs | 159 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 138 insertions(+), 21 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index cfb8c57..5d976ac 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,12 +1,13 @@ use std::{collections::HashMap, fmt, sync::RwLock}; use tracing::{field::Visit, Subscriber}; -use tracing_core::{Field, Metadata}; +use tracing_core::{Field, Interest, Metadata}; use opentelemetry::{ metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}, KeyValue, Value, }; use tracing_subscriber::{ + filter::Filtered, layer::{Context, Filter}, registry::LookupSpan, Layer, @@ -319,14 +320,16 @@ impl<'a> Visit for MetricVisitor<'a> { /// its callsite, eliminating the need for any maps. /// #[cfg_attr(docsrs, doc(cfg(feature = "metrics")))] -pub struct MetricsLayer { - meter: Meter, - instruments: Instruments, +pub struct MetricsLayer { + inner: Filtered, } -impl MetricsLayer { +impl MetricsLayer +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ /// Create a new instance of MetricsLayer. - pub fn new(meter_provider: M) -> Self + pub fn new(meter_provider: M) -> MetricsLayer where M: MeterProvider, { @@ -337,14 +340,51 @@ impl MetricsLayer { None, ); - MetricsLayer { + let layer = InstrumentLayer { meter, instruments: Default::default(), + }; + + MetricsLayer { + inner: layer.with_filter(MetricsFilter), } } } -impl Layer for MetricsLayer +struct MetricsFilter; + +impl MetricsFilter { + fn has_metrics_fields(&self, meta: &Metadata<'_>) -> bool { + meta.is_event() + && meta.fields().iter().any(|field| { + let name = field.name(); + name.starts_with(METRIC_PREFIX_COUNTER) + || name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER) + || name.starts_with(METRIC_PREFIX_HISTOGRAM) + }) + } +} + +impl Filter for MetricsFilter { + fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool { + self.has_metrics_fields(meta) + } + + fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest { + if self.has_metrics_fields(meta) { + Interest::always() + } else { + Interest::never() + } + } +} + +struct InstrumentLayer { + meter: Meter, + instruments: Instruments, +} + +impl Layer for InstrumentLayer where S: Subscriber + for<'span> LookupSpan<'span>, { @@ -371,28 +411,105 @@ where } } -impl Filter for MetricsLayer { - fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool { - meta.is_event() - && meta.fields().iter().any(|field| { - let name = field.name(); - name.starts_with(METRIC_PREFIX_COUNTER) - || name.starts_with(METRIC_PREFIX_MONOTONIC_COUNTER) - || name.starts_with(METRIC_PREFIX_HISTOGRAM) - }) +impl Layer for MetricsLayer +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + fn on_register_dispatch(&self, collector: &tracing_core::Dispatch) { + self.inner.on_register_dispatch(collector) + } + + fn on_layer(&mut self, subscriber: &mut S) { + self.inner.on_layer(subscriber) + } + + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest { + self.inner.register_callsite(metadata) + } + + fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool { + self.inner.enabled(metadata, ctx) + } + + fn on_new_span( + &self, + attrs: &tracing_core::span::Attributes<'_>, + id: &tracing_core::span::Id, + ctx: Context<'_, S>, + ) { + self.inner.on_new_span(attrs, id, ctx) + } + + fn max_level_hint(&self) -> Option { + self.inner.max_level_hint() + } + + fn on_record( + &self, + span: &tracing_core::span::Id, + values: &tracing_core::span::Record<'_>, + ctx: Context<'_, S>, + ) { + self.inner.on_record(span, values, ctx) + } + + fn on_follows_from( + &self, + span: &tracing_core::span::Id, + follows: &tracing_core::span::Id, + ctx: Context<'_, S>, + ) { + self.inner.on_follows_from(span, follows, ctx) + } + + fn event_enabled(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) -> bool { + self.inner.event_enabled(event, ctx) + } + + fn on_event(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) { + self.inner.on_event(event, ctx) + } + + fn on_enter(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) { + self.inner.on_enter(id, ctx) + } + + fn on_exit(&self, id: &tracing_core::span::Id, ctx: Context<'_, S>) { + self.inner.on_exit(id, ctx) + } + + fn on_close(&self, id: tracing_core::span::Id, ctx: Context<'_, S>) { + self.inner.on_close(id, ctx) + } + + fn on_id_change( + &self, + old: &tracing_core::span::Id, + new: &tracing_core::span::Id, + ctx: Context<'_, S>, + ) { + self.inner.on_id_change(old, new, ctx) } } #[cfg(test)] mod tests { use super::*; - use opentelemetry::metrics::noop::NoopMeterProvider; use tracing_subscriber::layer::SubscriberExt; + struct PanicLayer; + impl Layer for PanicLayer + where + S: Subscriber + for<'span> LookupSpan<'span>, + { + fn on_event(&self, _event: &tracing_core::Event<'_>, _ctx: Context<'_, S>) { + panic!("panic"); + } + } + #[test] - fn layer_should_not_panic_on_non_metrics_event() { - let noop = NoopMeterProvider::new(); - let layer = MetricsLayer::new(noop); + fn filter_layer_should_filter_non_metrics_event() { + let layer = PanicLayer.with_filter(MetricsFilter); let subscriber = tracing_subscriber::registry().with(layer); tracing::subscriber::with_default(subscriber, || { From 2c0dc3a5d49634100d0f65820278acf8ac688795 Mon Sep 17 00:00:00 2001 From: ymgyt Date: Sat, 5 Aug 2023 21:47:39 +0900 Subject: [PATCH 6/8] fix: remove methods that are incompatible with the minimal version --- Cargo.toml | 2 +- src/metrics.rs | 14 +++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fc36ece..c853a82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ once_cell = "1.13.0" # Fix minimal-versions async-trait = { version = "0.1.56", optional = true } thiserror = { version = "1.0.31", optional = true } -smallvec = "1.11.0" +smallvec = "1.0" [dev-dependencies] async-trait = "0.1.56" diff --git a/src/metrics.rs b/src/metrics.rs index 5d976ac..792662d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -354,7 +354,7 @@ where struct MetricsFilter; impl MetricsFilter { - fn has_metrics_fields(&self, meta: &Metadata<'_>) -> bool { + fn is_metrics_event(&self, meta: &Metadata<'_>) -> bool { meta.is_event() && meta.fields().iter().any(|field| { let name = field.name(); @@ -367,11 +367,11 @@ impl MetricsFilter { impl Filter for MetricsFilter { fn enabled(&self, meta: &Metadata<'_>, _cx: &Context<'_, S>) -> bool { - self.has_metrics_fields(meta) + self.is_metrics_event(meta) } fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest { - if self.has_metrics_fields(meta) { + if self.is_metrics_event(meta) { Interest::always() } else { Interest::never() @@ -415,10 +415,6 @@ impl Layer for MetricsLayer where S: Subscriber + for<'span> LookupSpan<'span>, { - fn on_register_dispatch(&self, collector: &tracing_core::Dispatch) { - self.inner.on_register_dispatch(collector) - } - fn on_layer(&mut self, subscriber: &mut S) { self.inner.on_layer(subscriber) } @@ -462,10 +458,6 @@ where self.inner.on_follows_from(span, follows, ctx) } - fn event_enabled(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) -> bool { - self.inner.event_enabled(event, ctx) - } - fn on_event(&self, event: &tracing_core::Event<'_>, ctx: Context<'_, S>) { self.inner.on_event(event, ctx) } From c6b6a56da834c05b2bf8454881b55bc009ac6d46 Mon Sep 17 00:00:00 2001 From: ymgyt Date: Sun, 6 Aug 2023 16:02:09 +0900 Subject: [PATCH 7/8] test: rename metrics benchmarks --- benches/metrics.rs | 108 ++++++++++++++++++++++++++++++++------------- 1 file changed, 78 insertions(+), 30 deletions(-) diff --git a/benches/metrics.rs b/benches/metrics.rs index a955553..edb8e8a 100644 --- a/benches/metrics.rs +++ b/benches/metrics.rs @@ -18,7 +18,7 @@ fn metrics_events(c: &mut Criterion) { let _subscriber = tracing_subscriber::registry() .with(MetricsLayer::new(NoopMeterProvider::new())) .set_default(); - group.bench_function("no_metrics_events", |b| { + group.bench_function("metrics_events_0_attr_0", |b| { b.iter(|| { tracing::info!(key_1 = "va", "msg"); }) @@ -29,9 +29,9 @@ fn metrics_events(c: &mut Criterion) { let _subscriber = tracing_subscriber::registry() .with(MetricsLayer::new(NoopMeterProvider::new())) .set_default(); - group.bench_function("1_metrics_event", |b| { + group.bench_function("metrics_events_1_attr_0", |b| { b.iter(|| { - tracing::info!(monotonic_counter.foo = 1, "msg"); + tracing::info!(monotonic_counter.c1 = 1, "msg"); }) }); } @@ -40,9 +40,9 @@ fn metrics_events(c: &mut Criterion) { let _subscriber = tracing_subscriber::registry() .with(MetricsLayer::new(NoopMeterProvider::new())) .set_default(); - group.bench_function("2_metrics_events", |b| { + group.bench_function("metrics_events_2_attr_0", |b| { b.iter(|| { - tracing::info!(monotonic_counter.foo = 1, monotonic_counter.bar = 1, "msg"); + tracing::info!(monotonic_counter.c1 = 1, monotonic_counter.c2 = 1, "msg"); }) }); } @@ -51,9 +51,63 @@ fn metrics_events(c: &mut Criterion) { let _subscriber = tracing_subscriber::registry() .with(MetricsLayer::new(NoopMeterProvider::new())) .set_default(); - group.bench_function("1_metrics_event_with_attributes", |b| { + group.bench_function("metrics_events_4_attr_0", |b| { b.iter(|| { - tracing::info!(monotonic_counter.foo = 1, attr_1 = 10, "msg"); + tracing::info!( + monotonic_counter.c1 = 1, + monotonic_counter.c2 = 1, + monotonic_counter.c3 = 1, + monotonic_counter.c4 = 1, + "msg" + ); + }) + }); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(MetricsLayer::new(NoopMeterProvider::new())) + .set_default(); + group.bench_function("metrics_events_8_attr_0", |b| { + b.iter(|| { + tracing::info!( + monotonic_counter.c1 = 1, + monotonic_counter.c2 = 1, + monotonic_counter.c3 = 1, + monotonic_counter.c4 = 1, + monotonic_counter.c5 = 1, + monotonic_counter.c6 = 1, + monotonic_counter.c7 = 1, + monotonic_counter.c8 = 1, + "msg" + ); + }) + }); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(MetricsLayer::new(NoopMeterProvider::new())) + .set_default(); + group.bench_function("metrics_events_1_attr_1", |b| { + b.iter(|| { + tracing::info!(monotonic_counter.c1 = 1, key_1 = 1_i64, "msg"); + }) + }); + } + + { + let _subscriber = tracing_subscriber::registry() + .with(MetricsLayer::new(NoopMeterProvider::new())) + .set_default(); + group.bench_function("metrics_events_1_attr_2", |b| { + b.iter(|| { + tracing::info!( + monotonic_counter.c1 = 1, + key_1 = 1_i64, + key_2 = 1_i64, + "msg" + ); }) }); } @@ -62,18 +116,14 @@ fn metrics_events(c: &mut Criterion) { let _subscriber = tracing_subscriber::registry() .with(MetricsLayer::new(NoopMeterProvider::new())) .set_default(); - group.bench_function("1_metrics_event_with_attributes_8", |b| { + group.bench_function("metrics_events_1_attr_4", |b| { b.iter(|| { tracing::info!( - monotonic_counter.foo = 1, - attr_1 = 10, - attr_2 = 20, - attr_3 = 30, - attr_4 = 40, - attr_5 = 50, - attr_6 = 60, - attr_7 = 70, - attr_8 = 80, + monotonic_counter.c1 = 1, + key_1 = 1_i64, + key_2 = 1_i64, + key_3 = 1_i64, + key_4 = 1_i64, "msg" ); }) @@ -84,26 +134,24 @@ fn metrics_events(c: &mut Criterion) { let _subscriber = tracing_subscriber::registry() .with(MetricsLayer::new(NoopMeterProvider::new())) .set_default(); - group.bench_function("no_metrics_event_with_attributes_12", |b| { + group.bench_function("metrics_events_1_attr_8", |b| { b.iter(|| { tracing::info!( - attr_1 = 10, - attr_2 = 20, - attr_3 = 30, - attr_4 = 40, - attr_5 = 50, - attr_6 = 60, - attr_7 = 70, - attr_8 = 80, - attr_9 = 90, - attr_10 = 100, - attr_11 = 110, - attr_12 = 120, + monotonic_counter.c1 = 1, + key_1 = 1_i64, + key_2 = 1_i64, + key_3 = 1_i64, + key_4 = 1_i64, + key_5 = 1_i64, + key_6 = 1_i64, + key_7 = 1_i64, + key_8 = 1_i64, "msg" ); }) }); } + group.finish(); } criterion_group! { From 9b49e543f0b9963ee44fce59c7cacf06b7bdba45 Mon Sep 17 00:00:00 2001 From: ymgyt Date: Tue, 8 Aug 2023 13:14:27 +0900 Subject: [PATCH 8/8] chore: make smallvec optional --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c853a82..9c0aeeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ rust-version = "1.60.0" [features] default = ["tracing-log", "metrics"] # Enables support for exporting OpenTelemetry metrics -metrics = ["opentelemetry/metrics"] +metrics = ["opentelemetry/metrics", "smallvec"] [dependencies] opentelemetry = { version = "0.20.0", default-features = false, features = ["trace"] } @@ -35,7 +35,7 @@ once_cell = "1.13.0" # Fix minimal-versions async-trait = { version = "0.1.56", optional = true } thiserror = { version = "1.0.31", optional = true } -smallvec = "1.0" +smallvec = { version = "1.0", optional = true } [dev-dependencies] async-trait = "0.1.56"