Skip to content

Commit

Permalink
Setup GC tool and service
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Jan 9, 2025
1 parent 74146be commit 1f645a4
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 1 deletion.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]
resolver = "2"

members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/index", "rust/load", "rust/storage", "rust/types", "rust/worker"]
members = ["rust/benchmark", "rust/blockstore", "rust/cache", "rust/chroma", "rust/config", "rust/distance", "rust/error", "rust/garbage_collector", "rust/index", "rust/load", "rust/storage", "rust/types", "rust/worker"]

[workspace.dependencies]
arrow = "52.2.0"
Expand Down
35 changes: 35 additions & 0 deletions rust/garbage_collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "garbage_collector"
version = "0.1.0"
edition = "2021"

[lib]
name = "garbage_collector_library"
path = "src/lib.rs"

[[bin]]
name = "garbage_collector_service"
path = "src/bin/garbage_collector_service.rs"

[[bin]]
name = "garbage_collector_tool"
path = "src/bin/garbage_collector_tool.rs"

[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
figment = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }

tracing = { workspace = true }
tracing-bunyan-formatter = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true }
opentelemetry-http = { workspace = true }
opentelemetry_sdk = { workspace = true }
10 changes: 10 additions & 0 deletions rust/garbage_collector/garbage_collector_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
service_name: "garbage-collector"
otel_endpoint: "http://otel-collector:4317"
cutoff_time_hours: 12 # GC all versions created at time < now() - cutoff_time_hours
max_collections_to_gc: 1000 # Maximum number of collections to GC in one run
gc_interval_mins: 120 # Run GC every x mins
sysdb_connection:
host: "sysdb.chroma"
port: 50051
connect_timeout_ms: 60000
request_timeout_ms: 60000
6 changes: 6 additions & 0 deletions rust/garbage_collector/src/bin/garbage_collector_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use garbage_collector_library::garbage_collector_service_entrypoint;

#[tokio::main]
async fn main() {
garbage_collector_service_entrypoint().await
}
6 changes: 6 additions & 0 deletions rust/garbage_collector/src/bin/garbage_collector_tool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use garbage_collector_library::garbage_collector_service_entrypoint;

#[tokio::main]
async fn main() {
garbage_collector_service_entrypoint().await
}
43 changes: 43 additions & 0 deletions rust/garbage_collector/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use figment::providers::{Env, Format, Yaml};

const DEFAULT_CONFIG_PATH: &str = "./garbage_collector_config.yaml";

#[derive(Debug, serde::Deserialize)]
pub(super) struct GarbageCollectorConfig {
pub(super) service_name: String,
pub(super) otel_endpoint: String,
cutoff_time_hours: u32,
max_collections_to_gc: u32,
gc_interval_mins: u32,
sysdb_connection: SysdbConnectionConfig,
}

#[derive(Debug, serde::Deserialize)]
pub(super) struct SysdbConnectionConfig {
host: String,
port: u32,
connect_timeout_ms: u32,
request_timeout_ms: u32,
}

impl GarbageCollectorConfig {
pub(super) fn load() -> Self {
Self::load_from_path(DEFAULT_CONFIG_PATH)
}

pub(super) fn load_from_path(path: &str) -> Self {
// Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them.
// Excluding our own environment variables, which are prefixed with CHROMA_.
let mut f = figment::Figment::from(
Env::prefixed("CHROMA_GC_").map(|k| k.as_str().replace("__", ".").into()),
);
if std::path::Path::new(path).exists() {
f = figment::Figment::from(Yaml::file(path)).merge(f);
}
let res = f.extract();
match res {
Ok(config) => config,
Err(e) => panic!("Error loading config: {}", e),
}
}
}
16 changes: 16 additions & 0 deletions rust/garbage_collector/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use config::GarbageCollectorConfig;
use opentelemetry_config::init_otel_tracing;

mod config;
mod opentelemetry_config;

pub async fn garbage_collector_service_entrypoint() {
// Parse configuration. Configuration includes sysdb connection details, and
// otel details.
let config = GarbageCollectorConfig::load();
// Enable OTEL tracing.
init_otel_tracing(&config.service_name, &config.otel_endpoint);

// Start a background task to periodically check for garbage.
todo!()
}
101 changes: 101 additions & 0 deletions rust/garbage_collector/src/opentelemetry_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// NOTE: This is a copy of the file of the same name in the
// worker/src/tracing/opentelemetry_config.rs file.
//
// Keep them in-sync manually.

use opentelemetry::global;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use tracing_bunyan_formatter::BunyanFormattingLayer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer};

pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {
println!(
"Registering jaeger subscriber for {} at endpoint {}",
service_name, otel_endpoint
);
let resource = opentelemetry_sdk::Resource::new(vec![opentelemetry::KeyValue::new(
"service.name",
service_name.clone(),
)]);
// Prepare trace config.
let trace_config = opentelemetry_sdk::trace::Config::default()
.with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOn)
.with_resource(resource.clone());

// Prepare tracer.
let tracing_span_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(otel_endpoint)
.build()
.expect("could not build span exporter for tracing");
let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
.with_batch_exporter(tracing_span_exporter, opentelemetry_sdk::runtime::Tokio)
.with_config(trace_config)
.build();
let tracer = tracer_provider.tracer(service_name.clone());

// Prepare meter.
let metric_exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(otel_endpoint)
.build()
.expect("could not build metric exporter");

let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(
metric_exporter,
opentelemetry_sdk::runtime::Tokio,
)
.build();
let meter_provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(resource.clone())
.build();
global::set_meter_provider(meter_provider);

// Layer for adding our configured tracer.
// Export everything at this layer. The backend i.e. honeycomb or jaeger will filter at its end.
let exporter_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer)
.with_filter(tracing_subscriber::filter::LevelFilter::TRACE);
// Layer for printing spans to stdout. Only print INFO logs by default.
let stdout_layer =
BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout)
.with_filter(tracing_subscriber::filter::LevelFilter::INFO);
// global filter layer. Don't filter anything at above trace at the global layer for gc.
let global_layer = EnvFilter::new("none,garbage_collector=trace");

// Create subscriber.
let subscriber = tracing_subscriber::registry()
.with(global_layer)
.with(stdout_layer)
.with(exporter_layer);
global::set_text_map_propagator(TraceContextPropagator::new());
tracing::subscriber::set_global_default(subscriber)
.expect("Set global default subscriber failed");
println!("Set global subscriber for {}", service_name);

// Add panics to tracing
let prev_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
let payload = panic_info.payload();

#[allow(clippy::manual_map)]
let payload = if let Some(s) = payload.downcast_ref::<&str>() {
Some(&**s)
} else if let Some(s) = payload.downcast_ref::<String>() {
Some(s.as_str())
} else {
None
};

tracing::error!(
panic.payload = payload,
panic.location = panic_info.location().map(|l| l.to_string()),
panic.backtrace = tracing::field::display(std::backtrace::Backtrace::capture()),
"A panic occurred"
);

prev_hook(panic_info);
}));
}

0 comments on commit 1f645a4

Please sign in to comment.