Skip to content

Commit

Permalink
[ENH] Add otel support for query and compaction service (#2122)
Browse files Browse the repository at this point in the history
## Description of changes
Export traces and spans in open telemetry format so that it can be
consumed by backends like Jaeger and Honeycomb. For Jaeger verified that
it works locally. Will test honeycomb once it is merged

## Test plan
All existing tests pass. Verified locally via tilt up and tilt down that
traces are exported to Jaeger

## Documentation Changes
NA

---------
  • Loading branch information
sanketkedia authored May 4, 2024
1 parent 3eb26d5 commit b34f90c
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 19 deletions.
250 changes: 235 additions & 15 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ arrow = "50.0.0"
roaring = "0.10.3"
tantivy = "0.21.1"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-bunyan-formatter = "0.3.3"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] }
opentelemetry-otlp = "0.12.0"

[dev-dependencies]
proptest = "1.4.0"
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# for now we nest it in the worker directory

query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "10.244.0.9"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -32,6 +34,8 @@ query_service:
worker_queue_size: 100

compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "10.244.0.9"
my_port: 50051
assignment_policy:
Expand Down
3 changes: 0 additions & 3 deletions rust/worker/src/bin/query_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,5 @@ use worker::query_service_entrypoint;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
query_service_entrypoint().await;
}
20 changes: 20 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ impl RootConfig {
/// Each submodule that needs to be configured from the config object should implement the Configurable trait and
/// have its own field in this struct for its Config struct.
pub(crate) struct QueryServiceConfig {
pub(crate) service_name: String,
pub(crate) otel_endpoint: String,
pub(crate) my_ip: String,
pub(crate) my_port: u16,
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig,
Expand All @@ -115,6 +117,8 @@ pub(crate) struct QueryServiceConfig {
/// Each submodule that needs to be configured from the config object should implement the Configurable trait and
/// have its own field in this struct for its Config struct.
pub(crate) struct CompactionServiceConfig {
pub(crate) service_name: String,
pub(crate) otel_endpoint: String,
pub(crate) my_ip: String,
pub(crate) my_port: u16,
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig,
Expand Down Expand Up @@ -150,6 +154,8 @@ mod tests {
"chroma_config.yaml",
r#"
query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -178,6 +184,8 @@ mod tests {
worker_queue_size: 100
compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -227,6 +235,8 @@ mod tests {
"random_path.yaml",
r#"
query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -255,6 +265,8 @@ mod tests {
worker_queue_size: 100
compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -322,6 +334,8 @@ mod tests {
"chroma_config.yaml",
r#"
query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -350,6 +364,8 @@ mod tests {
worker_queue_size: 100
compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
my_ip: "192.0.0.1"
my_port: 50051
assignment_policy:
Expand Down Expand Up @@ -401,6 +417,8 @@ mod tests {
"chroma_config.yaml",
r#"
query_service:
service_name: "query-service"
otel_endpoint: "http://jaeger:4317"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
Expand All @@ -427,6 +445,8 @@ mod tests {
worker_queue_size: 100
compaction_service:
service_name: "compaction-service"
otel_endpoint: "http://jaeger:4317"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
Expand Down
13 changes: 13 additions & 0 deletions rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod server;
mod storage;
mod sysdb;
mod system;
mod tracing;
mod types;

use config::Configurable;
Expand All @@ -35,6 +36,12 @@ pub async fn query_service_entrypoint() {
};

let config = config.query_service;

crate::tracing::opentelemetry_config::init_otel_tracing(
&config.service_name,
&config.otel_endpoint,
);

let system: system::System = system::System::new();
let dispatcher =
match execution::dispatcher::Dispatcher::try_from_config(&config.dispatcher).await {
Expand Down Expand Up @@ -94,6 +101,12 @@ pub async fn compaction_service_entrypoint() {
};

let config = config.compaction_service;

crate::tracing::opentelemetry_config::init_otel_tracing(
&config.service_name,
&config.otel_endpoint,
);

let system: system::System = system::System::new();

let mut memberlist = match memberlist::CustomResourceMemberlistProvider::try_from_config(
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/tracing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod opentelemetry_config;
51 changes: 51 additions & 0 deletions rust/worker/src/tracing/opentelemetry_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace;
use opentelemetry_otlp::WithExportConfig;
use tracing_bunyan_formatter::BunyanFormattingLayer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer};

pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {
println!(
"Registering jaeger subscriber for {} at endpoint {}",
service_name, otel_endpoint
);
let resource = opentelemetry::sdk::Resource::new(vec![opentelemetry::KeyValue::new(
"service.name",
service_name.clone(),
)]);
// Prepare trace config.
let trace_config = trace::config()
.with_sampler(opentelemetry::sdk::trace::Sampler::AlwaysOn)
.with_resource(resource);
// Prepare exporter.
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otel_endpoint);
let otlp_tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.with_trace_config(trace_config)
.install_batch(opentelemetry::runtime::Tokio)
.expect("Error - Failed to create tracer.");
// Layer for adding our configured tracer.
// Export everything at this layer. The backend i.e. honeycomb or jaeger will filter at its end.
let exporter_layer = tracing_opentelemetry::layer()
.with_tracer(otlp_tracer)
.with_filter(tracing_subscriber::filter::LevelFilter::TRACE);
// Layer for printing spans to stdout. Only print INFO logs by default.
let stdout_layer =
BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout)
.with_filter(tracing_subscriber::filter::LevelFilter::INFO);
// global filter layer. Don't filter anything at global layer.
let global_layer = EnvFilter::new("TRACE");
// Create subscriber.
let subscriber = tracing_subscriber::registry()
.with(global_layer)
.with(stdout_layer)
.with(exporter_layer);
global::set_text_map_propagator(TraceContextPropagator::new());
tracing::subscriber::set_global_default(subscriber)
.expect("Set global default subscriber failed");
println!("Set global subscriber for {}", service_name);
}

0 comments on commit b34f90c

Please sign in to comment.