Skip to content

Commit

Permalink
[ENH] Latency histograms for get/insert/remove/clear of cache. (#3018)
Browse files Browse the repository at this point in the history
## Description of changes

* This wires up the cache to give us latency metrics for all operations.

## Test plan

I manually verified that the traces are exported if I change the
otel_endpoint to otel-collector:4317 instead of jaeger:4317

- [ X] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust
  • Loading branch information
rescrv authored Oct 29, 2024
1 parent 43f13bc commit a64633a
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rust/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ path = "src/lib.rs"
clap = { version = "4", features = ["derive"] }
foyer = "0.12"
anyhow = "1.0"
opentelemetry = { version = "0.26.0", default-features = false, features = [
"trace",
"metrics",
] }

# TODO(rescrv): Deprecated. Find a suitable replacement for such things.
serde_yaml = "0.9"

Expand Down
79 changes: 76 additions & 3 deletions rust/cache/src/foyer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use opentelemetry::global;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -223,13 +224,35 @@ impl FoyerCacheConfig {
}
}

struct Stopwatch<'a>(
&'a opentelemetry::metrics::Histogram<u64>,
std::time::Instant,
);

impl<'a> Stopwatch<'a> {
fn new(histogram: &'a opentelemetry::metrics::Histogram<u64>) -> Self {
Self(histogram, std::time::Instant::now())
}
}

impl<'a> Drop for Stopwatch<'a> {
fn drop(&mut self) {
let elapsed = self.1.elapsed().as_micros() as u64;
self.0.record(elapsed, &[]);
}
}

#[derive(Clone)]
pub struct FoyerHybridCache<K, V>
where
K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static,
V: Clone + Send + Sync + StorageValue + Weighted + 'static,
{
cache: foyer::HybridCache<K, V>,
get_latency: opentelemetry::metrics::Histogram<u64>,
insert_latency: opentelemetry::metrics::Histogram<u64>,
remove_latency: opentelemetry::metrics::Histogram<u64>,
clear_latency: opentelemetry::metrics::Histogram<u64>,
}

impl<K, V> FoyerHybridCache<K, V>
Expand Down Expand Up @@ -304,7 +327,18 @@ where
e
))) as _
})?;
Ok(FoyerHybridCache { cache })
let meter = global::meter("chroma");
let get_latency = meter.u64_histogram("get_latency").init();
let insert_latency = meter.u64_histogram("insert_latency").init();
let remove_latency = meter.u64_histogram("remove_latency").init();
let clear_latency = meter.u64_histogram("clear_latency").init();
Ok(FoyerHybridCache {
cache,
get_latency,
insert_latency,
remove_latency,
clear_latency,
})
}
}

Expand All @@ -316,21 +350,25 @@ where
{
#[tracing::instrument(skip(self, key))]
async fn get(&self, key: &K) -> Result<Option<V>, CacheError> {
let _stopwatch = Stopwatch::new(&self.get_latency);
Ok(self.cache.get(key).await?.map(|v| v.value().clone()))
}

#[tracing::instrument(skip(self, key, value))]
async fn insert(&self, key: K, value: V) {
let _stopwatch = Stopwatch::new(&self.insert_latency);
self.cache.insert(key, value);
}

#[tracing::instrument(skip(self, key))]
async fn remove(&self, key: &K) {
let _stopwatch = Stopwatch::new(&self.remove_latency);
self.cache.remove(key);
}

#[tracing::instrument(skip(self))]
async fn clear(&self) -> Result<(), CacheError> {
let _stopwatch = Stopwatch::new(&self.clear_latency);
Ok(self.cache.clear().await?)
}
}
Expand All @@ -349,6 +387,10 @@ where
V: Clone + Send + Sync + Weighted + 'static,
{
cache: foyer::Cache<K, V>,
insert_latency: opentelemetry::metrics::Histogram<u64>,
get_latency: opentelemetry::metrics::Histogram<u64>,
remove_latency: opentelemetry::metrics::Histogram<u64>,
clear_latency: opentelemetry::metrics::Histogram<u64>,
}

impl<K, V> FoyerPlainCache<K, V>
Expand All @@ -363,7 +405,18 @@ where
let cache = CacheBuilder::new(config.capacity)
.with_shards(config.shards)
.build();
Ok(FoyerPlainCache { cache })
let meter = global::meter("chroma");
let insert_latency = meter.u64_histogram("insert_latency").init();
let get_latency = meter.u64_histogram("get_latency").init();
let remove_latency = meter.u64_histogram("remove_latency").init();
let clear_latency = meter.u64_histogram("clear_latency").init();
Ok(FoyerPlainCache {
cache,
insert_latency,
get_latency,
remove_latency,
clear_latency,
})
}

/// Build an in-memory cache that emits keys that get evicted to a channel.
Expand Down Expand Up @@ -398,7 +451,23 @@ where
.with_shards(config.shards)
.with_event_listener(Arc::new(evl))
.build();
Ok(FoyerPlainCache { cache })
let get_latency = global::meter("chroma").u64_histogram("get_latency").init();
let insert_latency = global::meter("chroma")
.u64_histogram("insert_latency")
.init();
let remove_latency = global::meter("chroma")
.u64_histogram("remove_latency")
.init();
let clear_latency = global::meter("chroma")
.u64_histogram("clear_latency")
.init();
Ok(FoyerPlainCache {
cache,
insert_latency,
get_latency,
remove_latency,
clear_latency,
})
}
}

Expand All @@ -410,21 +479,25 @@ where
{
#[tracing::instrument(skip(self, key))]
async fn get(&self, key: &K) -> Result<Option<V>, CacheError> {
let _stopwatch = Stopwatch::new(&self.get_latency);
Ok(self.cache.get(key).map(|v| v.value().clone()))
}

#[tracing::instrument(skip(self, key, value))]
async fn insert(&self, key: K, value: V) {
let _stopwatch = Stopwatch::new(&self.insert_latency);
self.cache.insert(key, value);
}

#[tracing::instrument(skip(self, key))]
async fn remove(&self, key: &K) {
let _stopwatch = Stopwatch::new(&self.remove_latency);
self.cache.remove(key);
}

#[tracing::instrument(skip(self))]
async fn clear(&self) -> Result<(), CacheError> {
let _stopwatch = Stopwatch::new(&self.clear_latency);
self.cache.clear();
Ok(())
}
Expand Down
9 changes: 9 additions & 0 deletions rust/worker/src/tracing/opentelemetry_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,13 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {

prev_hook(panic_info);
}));
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otel_endpoint);
let provider = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(exporter)
.build()
.expect("Failed to build metrics provider");
global::set_meter_provider(provider);
}

0 comments on commit a64633a

Please sign in to comment.