From 32874a6795bdacdc3d0770207554acab7fd69ba1 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 28 Oct 2024 12:26:14 -0700 Subject: [PATCH 1/7] [ENH] In-progress filtering of foyer traces under 1ms. --- .../src/tracing/opentelemetry_config.rs | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs index eb769a810d8..55202cfa33c 100644 --- a/rust/worker/src/tracing/opentelemetry_config.rs +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -5,6 +5,50 @@ use opentelemetry_sdk::propagation::TraceContextPropagator; use tracing_bunyan_formatter::BunyanFormattingLayer; use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer}; +#[derive(Clone, Debug, Default)] +struct ChromaShouldSample; + +const BUSY_NS: opentelemetry::Key = opentelemetry::Key::from_static_str("busy_ns"); +const IDLE_NS: opentelemetry::Key = opentelemetry::Key::from_static_str("idle_ns"); + +fn is_slow(attributes: &[opentelemetry::KeyValue]) -> bool { + let mut nanos = 0i64; + for attr in attributes { + if attr.key == BUSY_NS || attr.key == IDLE_NS { + if let opentelemetry::Value::I64(ns) = attr.value { + nanos += ns; + } + } + } + nanos > 1_000_000 +} + +impl opentelemetry_sdk::trace::ShouldSample for ChromaShouldSample { + fn should_sample( + &self, + parent_context: Option<&opentelemetry::Context>, + trace_id: opentelemetry::trace::TraceId, + name: &str, + span_kind: &opentelemetry::trace::SpanKind, + attributes: &[opentelemetry::KeyValue], + links: &[opentelemetry::trace::Link], + ) -> opentelemetry::trace::SamplingResult { + if (name != "get" && name != "insert") || is_slow(attributes) { + opentelemetry::trace::SamplingResult { + decision: opentelemetry::trace::SamplingDecision::RecordAndSample, + attributes: attributes.to_vec(), + trace_state: opentelemetry::trace::TraceState::default(), + } + } else { + opentelemetry::trace::SamplingResult { + decision: opentelemetry::trace::SamplingDecision::Drop, + attributes: vec![], + trace_state: opentelemetry::trace::TraceState::default(), + } + } + } +} + pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { println!( "Registering jaeger subscriber for {} at endpoint {}", @@ -16,7 +60,7 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { )]); // Prepare trace config. let trace_config = opentelemetry_sdk::trace::Config::default() - .with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOn) + .with_sampler(ChromaShouldSample) .with_resource(resource); // Prepare exporter. let exporter = opentelemetry_otlp::new_exporter() @@ -36,14 +80,23 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { // Layer for printing spans to stdout. Only print INFO logs by default. let stdout_layer = BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout) + .with_filter(tracing_subscriber::filter::FilterFn::new(|metadata| { + !(metadata + .module_path() + .unwrap_or("") + .starts_with("chroma_cache") + && metadata.name() != "clear") + })) .with_filter(tracing_subscriber::filter::LevelFilter::INFO); // global filter layer. Don't filter anything at above trace at the global layer for chroma. // And enable errors for every other library. let global_layer = EnvFilter::new(std::env::var("RUST_LOG").unwrap_or_else(|_| { "error,".to_string() + &vec![ + "chroma", "chroma-blockstore", "chroma-config", + "chroma-cache", "chroma-distance", "chroma-error", "chroma-index", From 7372fe59b67f93ab22dacfabbacbb7f1974e0f95 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 28 Oct 2024 12:31:42 -0700 Subject: [PATCH 2/7] empty commit to bump pr title From c11a1ae8197a7c6c9b02c277ed366de5bc66bfc6 Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 28 Oct 2024 12:35:32 -0700 Subject: [PATCH 3/7] satisfy clippy --- rust/worker/src/tracing/opentelemetry_config.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs index 55202cfa33c..f6c159a7970 100644 --- a/rust/worker/src/tracing/opentelemetry_config.rs +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -26,12 +26,12 @@ fn is_slow(attributes: &[opentelemetry::KeyValue]) -> bool { impl opentelemetry_sdk::trace::ShouldSample for ChromaShouldSample { fn should_sample( &self, - parent_context: Option<&opentelemetry::Context>, - trace_id: opentelemetry::trace::TraceId, + _: Option<&opentelemetry::Context>, + _: opentelemetry::trace::TraceId, name: &str, - span_kind: &opentelemetry::trace::SpanKind, + _: &opentelemetry::trace::SpanKind, attributes: &[opentelemetry::KeyValue], - links: &[opentelemetry::trace::Link], + _: &[opentelemetry::trace::Link], ) -> opentelemetry::trace::SamplingResult { if (name != "get" && name != "insert") || is_slow(attributes) { opentelemetry::trace::SamplingResult { From d60804cf2d84dcbe93fed9cc518b93c5e5e1c4ec Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 28 Oct 2024 12:52:37 -0700 Subject: [PATCH 4/7] Add comments for reviewer feedback. --- rust/worker/src/tracing/opentelemetry_config.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs index f6c159a7970..dcb1efdf3ad 100644 --- a/rust/worker/src/tracing/opentelemetry_config.rs +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -33,6 +33,9 @@ impl opentelemetry_sdk::trace::ShouldSample for ChromaShouldSample { attributes: &[opentelemetry::KeyValue], _: &[opentelemetry::trace::Link], ) -> opentelemetry::trace::SamplingResult { + // If the name is not get and not insert, or the request is slow, sample it. + // Otherwise, drop. + // This filters filters foyer calls in-process so they won't be overwhelming the tracing. if (name != "get" && name != "insert") || is_slow(attributes) { opentelemetry::trace::SamplingResult { decision: opentelemetry::trace::SamplingDecision::RecordAndSample, @@ -81,6 +84,8 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { let stdout_layer = BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout) .with_filter(tracing_subscriber::filter::FilterFn::new(|metadata| { + // This filter ensures that we don't cache calls for get/insert on stdout, but will + // still see the clear call. !(metadata .module_path() .unwrap_or("") From b851fc773528642c0ad0e7a6c35e8bb877eb8c3f Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 28 Oct 2024 13:59:06 -0700 Subject: [PATCH 5/7] notes about hacks --- rust/worker/src/tracing/opentelemetry_config.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs index dcb1efdf3ad..2c166954ab4 100644 --- a/rust/worker/src/tracing/opentelemetry_config.rs +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -33,6 +33,9 @@ impl opentelemetry_sdk::trace::ShouldSample for ChromaShouldSample { attributes: &[opentelemetry::KeyValue], _: &[opentelemetry::trace::Link], ) -> opentelemetry::trace::SamplingResult { + // NOTE(rescrv): THIS IS A HACK! If you find yourself seriously extending it, it's time + // to investigate honeycomb's sampling capabilities. + // If the name is not get and not insert, or the request is slow, sample it. // Otherwise, drop. // This filters filters foyer calls in-process so they won't be overwhelming the tracing. @@ -84,6 +87,10 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { let stdout_layer = BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout) .with_filter(tracing_subscriber::filter::FilterFn::new(|metadata| { + // NOTE(rescrv): This is a hack, too. Not an uppercase hack, just a hack. This + // one's localized to the cache module. There's not much to do to unify it with + // the otel filter because these are different output layers from the tracing. + // This filter ensures that we don't cache calls for get/insert on stdout, but will // still see the clear call. !(metadata From 681299a2c3f9726104d3f3deaeb8be4d11e5d38d Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 28 Oct 2024 14:21:47 -0700 Subject: [PATCH 6/7] do not duplicate tags in opentelemetry config --- rust/worker/src/tracing/opentelemetry_config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs index 2c166954ab4..d08d33618bd 100644 --- a/rust/worker/src/tracing/opentelemetry_config.rs +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -42,7 +42,7 @@ impl opentelemetry_sdk::trace::ShouldSample for ChromaShouldSample { if (name != "get" && name != "insert") || is_slow(attributes) { opentelemetry::trace::SamplingResult { decision: opentelemetry::trace::SamplingDecision::RecordAndSample, - attributes: attributes.to_vec(), + attributes: vec![], trace_state: opentelemetry::trace::TraceState::default(), } } else { From e08a03fcb87746d294a03396eb5bc164d7e2d68b Mon Sep 17 00:00:00 2001 From: Robert Escriva Date: Mon, 28 Oct 2024 16:32:39 -0700 Subject: [PATCH 7/7] [ENH] Latency histograms for get/insert/remove/clear of cache. --- Cargo.lock | 1 + rust/cache/Cargo.toml | 5 ++ rust/cache/src/foyer.rs | 79 ++++++++++++++++++- .../src/tracing/opentelemetry_config.rs | 9 +++ 4 files changed, 91 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1fd7bd2dee6..fbb58c2e1be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1223,6 +1223,7 @@ dependencies = [ "chroma-types", "clap", "foyer", + "opentelemetry", "parking_lot", "serde", "serde_yaml", diff --git a/rust/cache/Cargo.toml b/rust/cache/Cargo.toml index d39b5df72b8..c31e928c804 100644 --- a/rust/cache/Cargo.toml +++ b/rust/cache/Cargo.toml @@ -10,6 +10,11 @@ path = "src/lib.rs" clap = { version = "4", features = ["derive"] } foyer = "0.12" anyhow = "1.0" +opentelemetry = { version = "0.26.0", default-features = false, features = [ + "trace", + "metrics", +] } + # TODO(rescrv): Deprecated. Find a suitable replacement for such things. serde_yaml = "0.9" diff --git a/rust/cache/src/foyer.rs b/rust/cache/src/foyer.rs index 184818f270d..2f78311f12f 100644 --- a/rust/cache/src/foyer.rs +++ b/rust/cache/src/foyer.rs @@ -1,3 +1,4 @@ +use opentelemetry::global; use std::hash::Hash; use std::sync::Arc; use std::time::Duration; @@ -223,6 +224,24 @@ impl FoyerCacheConfig { } } +struct Stopwatch<'a>( + &'a opentelemetry::metrics::Histogram, + std::time::Instant, +); + +impl<'a> Stopwatch<'a> { + fn new(histogram: &'a opentelemetry::metrics::Histogram) -> Self { + Self(histogram, std::time::Instant::now()) + } +} + +impl<'a> Drop for Stopwatch<'a> { + fn drop(&mut self) { + let elapsed = self.1.elapsed().as_micros() as u64; + self.0.record(elapsed, &[]); + } +} + #[derive(Clone)] pub struct FoyerHybridCache where @@ -230,6 +249,10 @@ where V: Clone + Send + Sync + StorageValue + Weighted + 'static, { cache: foyer::HybridCache, + get_latency: opentelemetry::metrics::Histogram, + insert_latency: opentelemetry::metrics::Histogram, + remove_latency: opentelemetry::metrics::Histogram, + clear_latency: opentelemetry::metrics::Histogram, } impl FoyerHybridCache @@ -304,7 +327,18 @@ where e ))) as _ })?; - Ok(FoyerHybridCache { cache }) + let meter = global::meter("chroma"); + let get_latency = meter.u64_histogram("get_latency").init(); + let insert_latency = meter.u64_histogram("insert_latency").init(); + let remove_latency = meter.u64_histogram("remove_latency").init(); + let clear_latency = meter.u64_histogram("clear_latency").init(); + Ok(FoyerHybridCache { + cache, + get_latency, + insert_latency, + remove_latency, + clear_latency, + }) } } @@ -316,21 +350,25 @@ where { #[tracing::instrument(skip(self, key))] async fn get(&self, key: &K) -> Result, CacheError> { + let _stopwatch = Stopwatch::new(&self.get_latency); Ok(self.cache.get(key).await?.map(|v| v.value().clone())) } #[tracing::instrument(skip(self, key, value))] async fn insert(&self, key: K, value: V) { + let _stopwatch = Stopwatch::new(&self.insert_latency); self.cache.insert(key, value); } #[tracing::instrument(skip(self, key))] async fn remove(&self, key: &K) { + let _stopwatch = Stopwatch::new(&self.remove_latency); self.cache.remove(key); } #[tracing::instrument(skip(self))] async fn clear(&self) -> Result<(), CacheError> { + let _stopwatch = Stopwatch::new(&self.clear_latency); Ok(self.cache.clear().await?) } } @@ -349,6 +387,10 @@ where V: Clone + Send + Sync + Weighted + 'static, { cache: foyer::Cache, + insert_latency: opentelemetry::metrics::Histogram, + get_latency: opentelemetry::metrics::Histogram, + remove_latency: opentelemetry::metrics::Histogram, + clear_latency: opentelemetry::metrics::Histogram, } impl FoyerPlainCache @@ -363,7 +405,18 @@ where let cache = CacheBuilder::new(config.capacity) .with_shards(config.shards) .build(); - Ok(FoyerPlainCache { cache }) + let meter = global::meter("chroma"); + let insert_latency = meter.u64_histogram("insert_latency").init(); + let get_latency = meter.u64_histogram("get_latency").init(); + let remove_latency = meter.u64_histogram("remove_latency").init(); + let clear_latency = meter.u64_histogram("clear_latency").init(); + Ok(FoyerPlainCache { + cache, + insert_latency, + get_latency, + remove_latency, + clear_latency, + }) } /// Build an in-memory cache that emits keys that get evicted to a channel. @@ -398,7 +451,23 @@ where .with_shards(config.shards) .with_event_listener(Arc::new(evl)) .build(); - Ok(FoyerPlainCache { cache }) + let get_latency = global::meter("chroma").u64_histogram("get_latency").init(); + let insert_latency = global::meter("chroma") + .u64_histogram("insert_latency") + .init(); + let remove_latency = global::meter("chroma") + .u64_histogram("remove_latency") + .init(); + let clear_latency = global::meter("chroma") + .u64_histogram("clear_latency") + .init(); + Ok(FoyerPlainCache { + cache, + insert_latency, + get_latency, + remove_latency, + clear_latency, + }) } } @@ -410,21 +479,25 @@ where { #[tracing::instrument(skip(self, key))] async fn get(&self, key: &K) -> Result, CacheError> { + let _stopwatch = Stopwatch::new(&self.get_latency); Ok(self.cache.get(key).map(|v| v.value().clone())) } #[tracing::instrument(skip(self, key, value))] async fn insert(&self, key: K, value: V) { + let _stopwatch = Stopwatch::new(&self.insert_latency); self.cache.insert(key, value); } #[tracing::instrument(skip(self, key))] async fn remove(&self, key: &K) { + let _stopwatch = Stopwatch::new(&self.remove_latency); self.cache.remove(key); } #[tracing::instrument(skip(self))] async fn clear(&self) -> Result<(), CacheError> { + let _stopwatch = Stopwatch::new(&self.clear_latency); self.cache.clear(); Ok(()) } diff --git a/rust/worker/src/tracing/opentelemetry_config.rs b/rust/worker/src/tracing/opentelemetry_config.rs index d08d33618bd..dac3e089000 100644 --- a/rust/worker/src/tracing/opentelemetry_config.rs +++ b/rust/worker/src/tracing/opentelemetry_config.rs @@ -161,4 +161,13 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) { prev_hook(panic_info); })); + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(otel_endpoint); + let provider = opentelemetry_otlp::new_pipeline() + .metrics(opentelemetry_sdk::runtime::Tokio) + .with_exporter(exporter) + .build() + .expect("Failed to build metrics provider"); + global::set_meter_provider(provider); }