Skip to content

Commit

Permalink
Add experimental synchronous gauge (#1410)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb authored Dec 12, 2023
1 parent 1dd269a commit 038b91b
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 22 deletions.
21 changes: 21 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,27 @@ OpenTelemetry supports multiple ways to configure the API, SDK and other compone
- Environment variables
- Compiling time configurations provided in the source code

### Experimental/Unstable features:

Use `otel_unstable` feature flag for implementation of specification with [experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.27.0/specification/document-status.md) status. This approach ensures clear demarcation and safe integration of new or evolving features. Utilize the following structure:

```rust
#[cfg(feature = "otel_unstable")]
{
// Your feature implementation
}
```
It's important to regularly review and remove the `otel_unstable` flag from the code once the feature becomes stable. This cleanup process is crucial to maintain the overall code quality and to ensure that stable features are accurately reflected in the main build.

### Optional features:

The potential features include:

- Stable and non-experimental features that compliant to specification, and have a feature flag to minimize compilation size. Example: feature flags for signals (like `logs`, `traces`, `metrics`) and runtimes (`rt-tokio`, `rt-tokio-current-thread`, `rt-async-std`).
- Stable and non-experimental features, although not part of the specification, are crucial for enhancing the tracing/log crate's functionality or boosting performance. These features are also subject to discussion and approval by the OpenTelemetry Rust Maintainers. An example of such a feature is `logs_level_enabled`.

All such features should adhere to naming convention `<signal>_<feature_name>`

## Style Guide

- Run `cargo clippy --all` - this will catch common mistakes and improve
Expand Down
8 changes: 6 additions & 2 deletions examples/metrics-basic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ license = "Apache-2.0"
publish = false

[dependencies]
opentelemetry = { path = "../../opentelemetry", features = ["metrics"] }
opentelemetry = { path = "../../opentelemetry", features = ["metrics", "otel_unstable"] }
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["metrics", "rt-tokio"] }
opentelemetry-stdout = { path = "../../opentelemetry-stdout", features = ["metrics"]}
tokio = { version = "1.0", features = ["full"] }
serde_json = {version = "1.0"}
serde_json = {version = "1.0"}

[features]
default = ["otel_unstable"]
otel_unstable = ["opentelemetry/otel_unstable"]
32 changes: 25 additions & 7 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,37 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

// Note that there is no ObservableHistogram instrument.

// Create a Gauge Instrument.
// Note that the Gauge instrument is experimental, and can be changed/removed in the future releases.
#[cfg(feature = "otel_unstable")]
{
let gauge = meter
.f64_gauge("my_gauge")
.with_description("A gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();

gauge.record(
1.0,
[
KeyValue::new("mykey1", "myvalue1"),
KeyValue::new("mykey2", "myvalue2"),
]
.as_ref(),
);
}

// Create a ObservableGauge instrument and register a callback that reports the measurement.
let gauge = meter
.f64_observable_gauge("my_gauge")
.with_description("A gauge set to 1.0")
let observable_gauge = meter
.f64_observable_gauge("my_observable_gauge")
.with_description("An observable gauge set to 1.0")
.with_unit(Unit::new("myunit"))
.init();

// Register a callback that reports the measurement.
meter.register_callback(&[gauge.as_any()], move |observer| {
meter.register_callback(&[observable_gauge.as_any()], move |observer| {
observer.observe_f64(
&gauge,
&observable_gauge,
1.0,
[
KeyValue::new("mykey1", "myvalue1"),
Expand All @@ -130,8 +150,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
)
})?;

// Note that Gauge only has a Observable version.

// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## vNext

### Added

- [#1410](https://github.com/open-telemetry/opentelemetry-rust/pull/1410) Add experimental synchronous gauge

### Changed

- **Breaking**
Expand Down
16 changes: 15 additions & 1 deletion opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::{any::Any, borrow::Cow, collections::HashSet, hash::Hash, marker, sync:

use opentelemetry::{
metrics::{
AsyncInstrument, MetricsError, Result, SyncCounter, SyncHistogram, SyncUpDownCounter, Unit,
AsyncInstrument, MetricsError, Result, SyncCounter, SyncGauge, SyncHistogram,
SyncUpDownCounter, Unit,
},
Key, KeyValue,
};
Expand Down Expand Up @@ -33,6 +34,11 @@ pub enum InstrumentKind {
/// A group of instruments that record increasing and decreasing values in an
/// asynchronous callback.
ObservableUpDownCounter,

/// a group of instruments that record current value synchronously with
/// the code path they are measuring.
Gauge,
///
/// a group of instruments that record current values in an asynchronous callback.
ObservableGauge,
}
Expand Down Expand Up @@ -268,6 +274,14 @@ impl<T: Copy + 'static> SyncUpDownCounter<T> for ResolvedMeasures<T> {
}
}

impl<T: Copy + 'static> SyncGauge<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
measure.call(val, AttributeSet::from(attrs))
}
}
}

impl<T: Copy + 'static> SyncHistogram<T> for ResolvedMeasures<T> {
fn record(&self, val: T, attrs: &[KeyValue]) {
for measure in &self.measures {
Expand Down
60 changes: 57 additions & 3 deletions opentelemetry-sdk/src/metrics/meter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use opentelemetry::{
global,
metrics::{
noop::{NoopAsyncInstrument, NoopRegistration},
AsyncInstrument, Callback, CallbackRegistration, Counter, Histogram, InstrumentProvider,
MetricsError, ObservableCounter, ObservableGauge, ObservableUpDownCounter,
Observer as ApiObserver, Result, Unit, UpDownCounter,
AsyncInstrument, Callback, CallbackRegistration, Counter, Gauge, Histogram,
InstrumentProvider, MetricsError, ObservableCounter, ObservableGauge,
ObservableUpDownCounter, Observer as ApiObserver, Result, Unit, UpDownCounter,
},
KeyValue,
};
Expand Down Expand Up @@ -299,6 +299,57 @@ impl InstrumentProvider for SdkMeter {
Ok(ObservableUpDownCounter::new(observable))
}

fn u64_gauge(
&self,
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Unit>,
) -> Result<Gauge<u64>> {
validate_instrument_config(name.as_ref(), unit.as_ref(), self.validation_policy)?;
let p = InstrumentResolver::new(self, &self.u64_resolver);
p.lookup(
InstrumentKind::Gauge,
name,
description,
unit.unwrap_or_default(),
)
.map(|i| Gauge::new(Arc::new(i)))
}

fn f64_gauge(
&self,
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Unit>,
) -> Result<Gauge<f64>> {
validate_instrument_config(name.as_ref(), unit.as_ref(), self.validation_policy)?;
let p = InstrumentResolver::new(self, &self.f64_resolver);
p.lookup(
InstrumentKind::Gauge,
name,
description,
unit.unwrap_or_default(),
)
.map(|i| Gauge::new(Arc::new(i)))
}

fn i64_gauge(
&self,
name: Cow<'static, str>,
description: Option<Cow<'static, str>>,
unit: Option<Unit>,
) -> Result<Gauge<i64>> {
validate_instrument_config(name.as_ref(), unit.as_ref(), self.validation_policy)?;
let p = InstrumentResolver::new(self, &self.i64_resolver);
p.lookup(
InstrumentKind::Gauge,
name,
description,
unit.unwrap_or_default(),
)
.map(|i| Gauge::new(Arc::new(i)))
}

fn u64_observable_gauge(
&self,
name: Cow<'static, str>,
Expand Down Expand Up @@ -784,6 +835,9 @@ mod tests {
.f64_observable_up_down_counter(name.into(), None, None, Vec::new())
.map(|_| ()),
);
assert(meter.u64_gauge(name.into(), None, None).map(|_| ()));
assert(meter.f64_gauge(name.into(), None, None).map(|_| ()));
assert(meter.i64_gauge(name.into(), None, None).map(|_| ()));
assert(
meter
.u64_observable_gauge(name.into(), None, None, Vec::new())
Expand Down
14 changes: 9 additions & 5 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ fn aggregate_fn<T: Number<T>>(
/// | Histogram | ✓ | | ✓ | ✓ | ✓ |
/// | Observable Counter | ✓ | | ✓ | ✓ | ✓ |
/// | Observable UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
/// | Gauge | ✓ | ✓ | | ✓ | ✓ |
/// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |
fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregation) -> Result<()> {
use aggregation::Aggregation;
Expand All @@ -547,6 +548,7 @@ fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregatio
kind,
InstrumentKind::Counter
| InstrumentKind::UpDownCounter
| InstrumentKind::Gauge
| InstrumentKind::Histogram
| InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter
Expand All @@ -571,12 +573,14 @@ fn is_aggregator_compatible(kind: &InstrumentKind, agg: &aggregation::Aggregatio
}
}
Aggregation::LastValue => {
if kind == &InstrumentKind::ObservableGauge {
return Ok(());
match kind {
InstrumentKind::Gauge | InstrumentKind::ObservableGauge => Ok(()),
_ => {
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
Err(MetricsError::Other("incompatible aggregation".into()))
}
}
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
Err(MetricsError::Other("incompatible aggregation".into()))
}
Aggregation::Drop => Ok(()),
}
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/src/metrics/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ where
/// * Observable Counter ⇨ Sum
/// * UpDownCounter ⇨ Sum
/// * Observable UpDownCounter ⇨ Sum
/// * Gauge ⇨ LastValue
/// * Observable Gauge ⇨ LastValue
/// * Histogram ⇨ ExplicitBucketHistogram
///
Expand All @@ -144,6 +145,7 @@ impl AggregationSelector for DefaultAggregationSelector {
| InstrumentKind::UpDownCounter
| InstrumentKind::ObservableCounter
| InstrumentKind::ObservableUpDownCounter => Aggregation::Sum,
InstrumentKind::Gauge => Aggregation::LastValue,
InstrumentKind::ObservableGauge => Aggregation::LastValue,
InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram {
boundaries: vec![
Expand Down
6 changes: 6 additions & 0 deletions opentelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## vNext

### Added

- [#1410](https://github.com/open-telemetry/opentelemetry-rust/pull/1410) Add experimental synchronous gauge. This is behind the feature flag, and can be enabled by enabling the feature `otel_unstable` for opentelemetry crate.

- [#1410](https://github.com/open-telemetry/opentelemetry-rust/pull/1410) Guidelines to add new unstable/experimental features.

### Changed

- Modified `AnyValue.Map` to be backed by `HashMap` instead of custom `OrderMap`,
Expand Down
1 change: 1 addition & 0 deletions opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ metrics = []
testing = ["trace", "metrics"]
logs = []
logs_level_enabled = ["logs"]
otel_unstable = []

[dev-dependencies]
opentelemetry_sdk = { path = "../opentelemetry-sdk" } # for documentation tests
68 changes: 66 additions & 2 deletions opentelemetry/src/metrics/instruments/gauge.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,76 @@
use crate::{
metrics::{AsyncInstrument, AsyncInstrumentBuilder, MetricsError},
metrics::{AsyncInstrument, AsyncInstrumentBuilder, InstrumentBuilder, MetricsError},
KeyValue,
};
use core::fmt;
use std::sync::Arc;
use std::{any::Any, convert::TryFrom};

/// An instrument that records independent readings.
/// An SDK implemented instrument that records independent values
pub trait SyncGauge<T> {
/// Records an independent value.
fn record(&self, value: T, attributes: &[KeyValue]);
}

/// An instrument that records independent values
#[derive(Clone)]
pub struct Gauge<T>(Arc<dyn SyncGauge<T> + Send + Sync>);

impl<T> fmt::Debug for Gauge<T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("Gauge<{}>", std::any::type_name::<T>()))
}
}

impl<T> Gauge<T> {
/// Create a new gauge.
pub fn new(inner: Arc<dyn SyncGauge<T> + Send + Sync>) -> Self {
Gauge(inner)
}

/// Records an independent value.
pub fn record(&self, value: T, attributes: &[KeyValue]) {
self.0.record(value, attributes)
}
}

impl TryFrom<InstrumentBuilder<'_, Gauge<u64>>> for Gauge<u64> {
type Error = MetricsError;

fn try_from(builder: InstrumentBuilder<'_, Gauge<u64>>) -> Result<Self, Self::Error> {
builder
.meter
.instrument_provider
.u64_gauge(builder.name, builder.description, builder.unit)
}
}

impl TryFrom<InstrumentBuilder<'_, Gauge<f64>>> for Gauge<f64> {
type Error = MetricsError;

fn try_from(builder: InstrumentBuilder<'_, Gauge<f64>>) -> Result<Self, Self::Error> {
builder
.meter
.instrument_provider
.f64_gauge(builder.name, builder.description, builder.unit)
}
}

impl TryFrom<InstrumentBuilder<'_, Gauge<i64>>> for Gauge<i64> {
type Error = MetricsError;

fn try_from(builder: InstrumentBuilder<'_, Gauge<i64>>) -> Result<Self, Self::Error> {
builder
.meter
.instrument_provider
.i64_gauge(builder.name, builder.description, builder.unit)
}
}

/// An async instrument that records independent readings.
#[derive(Clone)]
pub struct ObservableGauge<T>(Arc<dyn AsyncInstrument<T>>);

Expand Down
Loading

0 comments on commit 038b91b

Please sign in to comment.