diff --git a/Cargo.lock b/Cargo.lock index 1fd7bd2dee6..fbb58c2e1be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1223,6 +1223,7 @@ dependencies = [ "chroma-types", "clap", "foyer", + "opentelemetry", "parking_lot", "serde", "serde_yaml", diff --git a/rust/cache/Cargo.toml b/rust/cache/Cargo.toml index d39b5df72b8..c31e928c804 100644 --- a/rust/cache/Cargo.toml +++ b/rust/cache/Cargo.toml @@ -10,6 +10,11 @@ path = "src/lib.rs" clap = { version = "4", features = ["derive"] } foyer = "0.12" anyhow = "1.0" +opentelemetry = { version = "0.26.0", default-features = false, features = [ + "trace", + "metrics", +] } + # TODO(rescrv): Deprecated. Find a suitable replacement for such things. serde_yaml = "0.9" diff --git a/rust/cache/src/foyer.rs b/rust/cache/src/foyer.rs index 184818f270d..2f78311f12f 100644 --- a/rust/cache/src/foyer.rs +++ b/rust/cache/src/foyer.rs @@ -1,3 +1,4 @@ +use opentelemetry::global; use std::hash::Hash; use std::sync::Arc; use std::time::Duration; @@ -223,6 +224,24 @@ impl FoyerCacheConfig { } } +struct Stopwatch<'a>( + &'a opentelemetry::metrics::Histogram, + std::time::Instant, +); + +impl<'a> Stopwatch<'a> { + fn new(histogram: &'a opentelemetry::metrics::Histogram) -> Self { + Self(histogram, std::time::Instant::now()) + } +} + +impl<'a> Drop for Stopwatch<'a> { + fn drop(&mut self) { + let elapsed = self.1.elapsed().as_micros() as u64; + self.0.record(elapsed, &[]); + } +} + #[derive(Clone)] pub struct FoyerHybridCache where @@ -230,6 +249,10 @@ where V: Clone + Send + Sync + StorageValue + Weighted + 'static, { cache: foyer::HybridCache, + get_latency: opentelemetry::metrics::Histogram, + insert_latency: opentelemetry::metrics::Histogram, + remove_latency: opentelemetry::metrics::Histogram, + clear_latency: opentelemetry::metrics::Histogram, } impl FoyerHybridCache @@ -304,7 +327,18 @@ where e ))) as _ })?; - Ok(FoyerHybridCache { cache }) + let meter = global::meter("chroma"); + let get_latency = meter.u64_histogram("get_latency").init(); + let insert_latency = meter.u64_histogram("insert_latency").init(); + let remove_latency = meter.u64_histogram("remove_latency").init(); + let clear_latency = meter.u64_histogram("clear_latency").init(); + Ok(FoyerHybridCache { + cache, + get_latency, + insert_latency, + remove_latency, + clear_latency, + }) } } @@ -316,21 +350,25 @@ where { #[tracing::instrument(skip(self, key))] async fn get(&self, key: &K) -> Result, CacheError> { + let _stopwatch = Stopwatch::new(&self.get_latency); Ok(self.cache.get(key).await?.map(|v| v.value().clone())) } #[tracing::instrument(skip(self, key, value))] async fn insert(&self, key: K, value: V) { + let _stopwatch = Stopwatch::new(&self.insert_latency); self.cache.insert(key, value); } #[tracing::instrument(skip(self, key))] async fn remove(&self, key: &K) { + let _stopwatch = Stopwatch::new(&self.remove_latency); self.cache.remove(key); } #[tracing::instrument(skip(self))] async fn clear(&self) -> Result<(), CacheError> { + let _stopwatch = Stopwatch::new(&self.clear_latency); Ok(self.cache.clear().await?) } } @@ -349,6 +387,10 @@ where V: Clone + Send + Sync + Weighted + 'static, { cache: foyer::Cache, + insert_latency: opentelemetry::metrics::Histogram, + get_latency: opentelemetry::metrics::Histogram, + remove_latency: opentelemetry::metrics::Histogram, + clear_latency: opentelemetry::metrics::Histogram, } impl FoyerPlainCache @@ -363,7 +405,18 @@ where let cache = CacheBuilder::new(config.capacity) .with_shards(config.shards) .build(); - Ok(FoyerPlainCache { cache }) + let meter = global::meter("chroma"); + let insert_latency = meter.u64_histogram("insert_latency").init(); + let get_latency = meter.u64_histogram("get_latency").init(); + let remove_latency = meter.u64_histogram("remove_latency").init(); + let clear_latency = meter.u64_histogram("clear_latency").init(); + Ok(FoyerPlainCache { + cache, + insert_latency, + get_latency, + remove_latency, + clear_latency, + }) } /// Build an in-memory cache that emits keys that get evicted to a channel. @@ -398,7 +451,23 @@ where .with_shards(config.shards) .with_event_listener(Arc::new(evl)) .build(); - Ok(FoyerPlainCache { cache }) + let get_latency = global::meter("chroma").u64_histogram("get_latency").init(); + let insert_latency = global::meter("chroma") + .u64_histogram("insert_latency") + .init(); + let remove_latency = global::meter("chroma") + .u64_histogram("remove_latency") + .init(); + let clear_latency = global::meter("chroma") + .u64_histogram("clear_latency") + .init(); + Ok(FoyerPlainCache { + cache, + insert_latency, + get_latency, + remove_latency, + clear_latency, + }) } } @@ -410,21 +479,25 @@ where { #[tracing::instrument(skip(self, key))] async fn get(&self, key: &K) -> Result, CacheError> { + let _stopwatch = Stopwatch::new(&self.get_latency); Ok(self.cache.get(key).map(|v| v.value().clone())) } #[tracing::instrument(skip(self, key, value))] async fn insert(&self, key: K, value: V) { + let _stopwatch = Stopwatch::new(&self.insert_latency); self.cache.insert(key, value); } #[tracing::instrument(skip(self, key))] async fn remove(&self, key: &K) { + let _stopwatch = Stopwatch::new(&self.remove_latency); self.cache.remove(key); } #[tracing::instrument(skip(self))] async fn clear(&self) -> Result<(), CacheError> { + let _stopwatch = Stopwatch::new(&self.clear_latency); self.cache.clear(); Ok(()) } diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs index d08d33618bd..dac3e089000 100644 --- a/rust/worker/src/tracing/opentelemetry_config.rs +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -161,4 +161,13 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { prev_hook(panic_info); })); + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(otel_endpoint); + let provider = opentelemetry_otlp::new_pipeline() + .metrics(opentelemetry_sdk::runtime::Tokio) + .with_exporter(exporter) + .build() + .expect("Failed to build metrics provider"); + global::set_meter_provider(provider); }