Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Latency histograms for get/insert/remove/clear of cache. #3018

Merged
merged 7 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rust/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ path = "src/lib.rs"
clap = { version = "4", features = ["derive"] }
foyer = "0.12"
anyhow = "1.0"
opentelemetry = { version = "0.26.0", default-features = false, features = [
"trace",
"metrics",
] }

# TODO(rescrv): Deprecated. Find a suitable replacement for such things.
serde_yaml = "0.9"

Expand Down
79 changes: 76 additions & 3 deletions rust/cache/src/foyer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use opentelemetry::global;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -223,13 +224,35 @@ impl FoyerCacheConfig {
}
}

struct Stopwatch<'a>(
&'a opentelemetry::metrics::Histogram<u64>,
std::time::Instant,
);

impl<'a> Stopwatch<'a> {
fn new(histogram: &'a opentelemetry::metrics::Histogram<u64>) -> Self {
Self(histogram, std::time::Instant::now())
}
}

impl<'a> Drop for Stopwatch<'a> {
fn drop(&mut self) {
let elapsed = self.1.elapsed().as_micros() as u64;
self.0.record(elapsed, &[]);
}
}

#[derive(Clone)]
pub struct FoyerHybridCache<K, V>
where
K: Clone + Send + Sync + StorageKey + Eq + PartialEq + Hash + 'static,
V: Clone + Send + Sync + StorageValue + Weighted + 'static,
{
cache: foyer::HybridCache<K, V>,
get_latency: opentelemetry::metrics::Histogram<u64>,
insert_latency: opentelemetry::metrics::Histogram<u64>,
remove_latency: opentelemetry::metrics::Histogram<u64>,
clear_latency: opentelemetry::metrics::Histogram<u64>,
}

impl<K, V> FoyerHybridCache<K, V>
Expand Down Expand Up @@ -304,7 +327,18 @@ where
e
))) as _
})?;
Ok(FoyerHybridCache { cache })
let meter = global::meter("chroma");
let get_latency = meter.u64_histogram("get_latency").init();
let insert_latency = meter.u64_histogram("insert_latency").init();
let remove_latency = meter.u64_histogram("remove_latency").init();
let clear_latency = meter.u64_histogram("clear_latency").init();
Ok(FoyerHybridCache {
cache,
get_latency,
insert_latency,
remove_latency,
clear_latency,
})
}
}

Expand All @@ -316,21 +350,25 @@ where
{
#[tracing::instrument(skip(self, key))]
async fn get(&self, key: &K) -> Result<Option<V>, CacheError> {
let _stopwatch = Stopwatch::new(&self.get_latency);
Ok(self.cache.get(key).await?.map(|v| v.value().clone()))
}

#[tracing::instrument(skip(self, key, value))]
async fn insert(&self, key: K, value: V) {
let _stopwatch = Stopwatch::new(&self.insert_latency);
Comment on lines 357 to +359
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why this is measured separately instead of looking at the span duration?

self.cache.insert(key, value);
}

#[tracing::instrument(skip(self, key))]
async fn remove(&self, key: &K) {
let _stopwatch = Stopwatch::new(&self.remove_latency);
self.cache.remove(key);
}

#[tracing::instrument(skip(self))]
async fn clear(&self) -> Result<(), CacheError> {
let _stopwatch = Stopwatch::new(&self.clear_latency);
Ok(self.cache.clear().await?)
}
}
Expand All @@ -349,6 +387,10 @@ where
V: Clone + Send + Sync + Weighted + 'static,
{
cache: foyer::Cache<K, V>,
insert_latency: opentelemetry::metrics::Histogram<u64>,
get_latency: opentelemetry::metrics::Histogram<u64>,
remove_latency: opentelemetry::metrics::Histogram<u64>,
clear_latency: opentelemetry::metrics::Histogram<u64>,
}

impl<K, V> FoyerPlainCache<K, V>
Expand All @@ -363,7 +405,18 @@ where
let cache = CacheBuilder::new(config.capacity)
.with_shards(config.shards)
.build();
Ok(FoyerPlainCache { cache })
let meter = global::meter("chroma");
let insert_latency = meter.u64_histogram("insert_latency").init();
let get_latency = meter.u64_histogram("get_latency").init();
let remove_latency = meter.u64_histogram("remove_latency").init();
let clear_latency = meter.u64_histogram("clear_latency").init();
Ok(FoyerPlainCache {
cache,
insert_latency,
get_latency,
remove_latency,
clear_latency,
})
}

/// Build an in-memory cache that emits keys that get evicted to a channel.
Expand Down Expand Up @@ -398,7 +451,23 @@ where
.with_shards(config.shards)
.with_event_listener(Arc::new(evl))
.build();
Ok(FoyerPlainCache { cache })
let get_latency = global::meter("chroma").u64_histogram("get_latency").init();
let insert_latency = global::meter("chroma")
.u64_histogram("insert_latency")
.init();
let remove_latency = global::meter("chroma")
.u64_histogram("remove_latency")
.init();
let clear_latency = global::meter("chroma")
.u64_histogram("clear_latency")
.init();
Ok(FoyerPlainCache {
cache,
insert_latency,
get_latency,
remove_latency,
clear_latency,
})
}
}

Expand All @@ -410,21 +479,25 @@ where
{
#[tracing::instrument(skip(self, key))]
async fn get(&self, key: &K) -> Result<Option<V>, CacheError> {
let _stopwatch = Stopwatch::new(&self.get_latency);
Ok(self.cache.get(key).map(|v| v.value().clone()))
}

#[tracing::instrument(skip(self, key, value))]
async fn insert(&self, key: K, value: V) {
let _stopwatch = Stopwatch::new(&self.insert_latency);
self.cache.insert(key, value);
}

#[tracing::instrument(skip(self, key))]
async fn remove(&self, key: &K) {
let _stopwatch = Stopwatch::new(&self.remove_latency);
self.cache.remove(key);
}

#[tracing::instrument(skip(self))]
async fn clear(&self) -> Result<(), CacheError> {
let _stopwatch = Stopwatch::new(&self.clear_latency);
self.cache.clear();
Ok(())
}
Expand Down
76 changes: 75 additions & 1 deletion rust/worker/src/tracing/opentelemetry_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,56 @@ use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_bunyan_formatter::BunyanFormattingLayer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer};

#[derive(Clone, Debug, Default)]
struct ChromaShouldSample;

const BUSY_NS: opentelemetry::Key = opentelemetry::Key::from_static_str("busy_ns");
const IDLE_NS: opentelemetry::Key = opentelemetry::Key::from_static_str("idle_ns");

fn is_slow(attributes: &[opentelemetry::KeyValue]) -> bool {
let mut nanos = 0i64;
for attr in attributes {
if attr.key == BUSY_NS || attr.key == IDLE_NS {
if let opentelemetry::Value::I64(ns) = attr.value {
nanos += ns;
}
}
}
nanos > 1_000_000
}

impl opentelemetry_sdk::trace::ShouldSample for ChromaShouldSample {
fn should_sample(
&self,
_: Option<&opentelemetry::Context>,
_: opentelemetry::trace::TraceId,
name: &str,
_: &opentelemetry::trace::SpanKind,
attributes: &[opentelemetry::KeyValue],
_: &[opentelemetry::trace::Link],
) -> opentelemetry::trace::SamplingResult {
// NOTE(rescrv): THIS IS A HACK! If you find yourself seriously extending it, it's time
// to investigate honeycomb's sampling capabilities.

// If the name is not get and not insert, or the request is slow, sample it.
// Otherwise, drop.
// This filters filters foyer calls in-process so they won't be overwhelming the tracing.
if (name != "get" && name != "insert") || is_slow(attributes) {
opentelemetry::trace::SamplingResult {
decision: opentelemetry::trace::SamplingDecision::RecordAndSample,
attributes: vec![],
trace_state: opentelemetry::trace::TraceState::default(),
}
} else {
opentelemetry::trace::SamplingResult {
decision: opentelemetry::trace::SamplingDecision::Drop,
attributes: vec![],
trace_state: opentelemetry::trace::TraceState::default(),
}
}
}
}

pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {
println!(
"Registering jaeger subscriber for {} at endpoint {}",
Expand All @@ -16,7 +66,7 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {
)]);
// Prepare trace config.
let trace_config = opentelemetry_sdk::trace::Config::default()
.with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOn)
.with_sampler(ChromaShouldSample)
.with_resource(resource);
// Prepare exporter.
let exporter = opentelemetry_otlp::new_exporter()
Expand All @@ -36,14 +86,29 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {
// Layer for printing spans to stdout. Only print INFO logs by default.
let stdout_layer =
BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout)
.with_filter(tracing_subscriber::filter::FilterFn::new(|metadata| {
// NOTE(rescrv): This is a hack, too. Not an uppercase hack, just a hack. This
// one's localized to the cache module. There's not much to do to unify it with
// the otel filter because these are different output layers from the tracing.

// This filter ensures that we don't cache calls for get/insert on stdout, but will
// still see the clear call.
!(metadata
.module_path()
.unwrap_or("")
.starts_with("chroma_cache")
&& metadata.name() != "clear")
}))
.with_filter(tracing_subscriber::filter::LevelFilter::INFO);
// global filter layer. Don't filter anything at above trace at the global layer for chroma.
// And enable errors for every other library.
let global_layer = EnvFilter::new(std::env::var("RUST_LOG").unwrap_or_else(|_| {
"error,".to_string()
+ &vec![
"chroma",
"chroma-blockstore",
"chroma-config",
"chroma-cache",
"chroma-distance",
"chroma-error",
"chroma-index",
Expand Down Expand Up @@ -96,4 +161,13 @@ pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {

prev_hook(panic_info);
}));
let exporter = opentelemetry_otlp::new_exporter()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my learning what is this new exporter and provider for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics. They get installed as the exporter and provider for the global metrics. Do you see a way to reuse the other exporter? I'd like that, too.

.tonic()
.with_endpoint(otel_endpoint);
let provider = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(exporter)
.build()
.expect("Failed to build metrics provider");
global::set_meter_provider(provider);
}
Loading