diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 071011f7f03..a96b476948e 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6060,17 +6060,6 @@ dependencies = [ name = "quickwit-macros" version = "0.7.1" dependencies = [ - "proc-macro2", - "quickwit-macros-impl", - "quote", - "syn 2.0.48", -] - -[[package]] -name = "quickwit-macros-impl" -version = "0.7.1" -dependencies = [ - "heck", "proc-macro2", "quote", "syn 2.0.48", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 57fe290c6d4..d4294f09146 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -21,7 +21,6 @@ members = [ "quickwit-janitor", "quickwit-lambda", "quickwit-macros", - "quickwit-macros/impl", "quickwit-metastore", # Disabling metastore-utils from the quickwit projects to ease build/deps. @@ -49,17 +48,16 @@ default-members = [ "quickwit-common", "quickwit-config", "quickwit-control-plane", - "quickwit-index-management", "quickwit-datetime", "quickwit-directories", "quickwit-doc-mapper", + "quickwit-index-management", "quickwit-indexing", "quickwit-ingest", "quickwit-integration-tests", "quickwit-jaeger", "quickwit-janitor", "quickwit-macros", - "quickwit-macros/impl", "quickwit-metastore", "quickwit-opentelemetry", "quickwit-proto", @@ -314,7 +312,6 @@ quickwit-integration-tests = { path = "quickwit-integration-tests" } quickwit-jaeger = { path = "quickwit-jaeger" } quickwit-janitor = { path = "quickwit-janitor" } quickwit-macros = { path = "quickwit-macros" } -quickwit-macros-impl = { path = "quickwit-macros/impl" } quickwit-metastore = { path = "quickwit-metastore" } quickwit-opentelemetry = { path = "quickwit-opentelemetry" } quickwit-proto = { path = "quickwit-proto" } diff --git a/quickwit/quickwit-cli/src/metrics.rs b/quickwit/quickwit-cli/src/metrics.rs index 7ff71fa120b..5fbd5b6e647 100644 --- a/quickwit/quickwit-cli/src/metrics.rs +++ b/quickwit/quickwit-cli/src/metrics.rs @@ -30,7 +30,8 @@ impl Default for CliMetrics { thread_unpark_duration_microseconds: new_histogram_vec( "thread_unpark_duration_microseconds", "Duration for which a thread of the main tokio runtime is unparked.", - "quickwit_cli", + "cli", + &[], [], ), } diff --git a/quickwit/quickwit-codegen/example/build.rs b/quickwit/quickwit-codegen/example/build.rs index ae388ee3cd6..3a6594691b4 100644 --- a/quickwit/quickwit-codegen/example/build.rs +++ b/quickwit/quickwit-codegen/example/build.rs @@ -26,7 +26,7 @@ fn main() { .with_result_type_path("crate::HelloResult") .with_error_type_path("crate::HelloError") .generate_extra_service_methods() - .generate_prom_labels_for_requests() + .generate_rpc_name_impls() .run() .unwrap(); } diff --git a/quickwit/quickwit-codegen/example/src/codegen/hello.rs b/quickwit/quickwit-codegen/example/src/codegen/hello.rs index e484ebf21f8..cd799b68fdb 100644 --- a/quickwit/quickwit-codegen/example/src/codegen/hello.rs +++ b/quickwit/quickwit-codegen/example/src/codegen/hello.rs @@ -44,15 +44,20 @@ pub struct PingResponse { #[allow(unused_imports)] use std::str::FromStr; use tower::{Layer, Service, ServiceExt}; -use quickwit_common::metrics::{PrometheusLabels, OwnedPrometheusLabels}; -impl PrometheusLabels<1> for HelloRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("hello")]) +use quickwit_common::tower::RpcName; +impl RpcName for HelloRequest { + fn rpc_name() -> &'static str { + "hello" } } -impl PrometheusLabels<1> for GoodbyeRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("goodbye")]) +impl RpcName for GoodbyeRequest { + fn rpc_name() -> &'static str { + "goodbye" + } +} +impl RpcName for PingRequest { + fn rpc_name() -> &'static str { + "ping" } } pub type HelloStream = quickwit_common::ServiceStream>; diff --git a/quickwit/quickwit-codegen/src/codegen.rs b/quickwit/quickwit-codegen/src/codegen.rs index 34319992b44..0425a798e7f 100644 --- a/quickwit/quickwit-codegen/src/codegen.rs +++ b/quickwit/quickwit-codegen/src/codegen.rs @@ -17,8 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashSet; - use anyhow::ensure; use heck::{ToSnakeCase, ToUpperCamelCase}; use proc_macro2::TokenStream; @@ -112,7 +110,8 @@ impl CodegenBuilder { self } - pub fn generate_prom_labels_for_requests(mut self) -> Self { + /// Generates `RpcName` trait implentations for request types. + pub fn generate_rpc_name_impls(mut self) -> Self { self.generate_prom_labels_for_requests = true; self } @@ -340,17 +339,6 @@ struct SynMethod { } impl SynMethod { - fn request_prom_label(&self) -> String { - self.request_type - .segments - .last() - .unwrap() - .ident - .to_string() - .trim_end_matches("Request") - .to_snake_case() - } - fn request_type(&self, mock: bool) -> TokenStream { let request_type = if mock { let request_type = &self.request_type; @@ -406,32 +394,28 @@ impl SynMethod { } fn generate_prom_labels_impl_for_requests(context: &CodegenContext) -> TokenStream { - let mut stream = TokenStream::new(); - stream.extend(quote! { - use quickwit_common::metrics::{PrometheusLabels, OwnedPrometheusLabels}; - }); - let mut implemented_request_types: HashSet = HashSet::new(); + let mut rpc_name_impls = Vec::new(); + for syn_method in &context.methods { - if syn_method.client_streaming { - continue; - } - let request_type = syn_method.request_type(false); - let request_type_snake_case = syn_method.request_prom_label(); - if implemented_request_types.contains(&request_type_snake_case) { - continue; - } else { - implemented_request_types.insert(request_type_snake_case.clone()); - let method = quote! { - impl PrometheusLabels<1> for #request_type { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed(#request_type_snake_case),]) - } + let request_type = syn_method.request_type.to_token_stream(); + let rpc_name = &syn_method.name.to_string(); + let rpc_name_impl = quote! { + impl RpcName for #request_type { + fn rpc_name() -> &'static str { + #rpc_name } - }; - stream.extend(method); - } + } + }; + rpc_name_impls.extend(rpc_name_impl); + } + if rpc_name_impls.is_empty() { + return TokenStream::new(); + } + quote! { + use quickwit_common::tower::RpcName; + + #(#rpc_name_impls)* } - stream } fn generate_comment_attributes(comments: &Comments) -> Vec { diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs index afaec023e7c..f25245e0ece 100644 --- a/quickwit/quickwit-common/src/io.rs +++ b/quickwit/quickwit-common/src/io.rs @@ -63,7 +63,8 @@ impl Default for IoMetrics { "write_bytes", "Number of bytes written by a given component in [indexer, merger, deleter, \ split_downloader_{merge,delete}]", - "quickwit", + "", + &[], ["component"], ); Self { write_bytes } diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 11617199f9a..247b01746c1 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -17,7 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::borrow::{Borrow, Cow}; +use std::collections::HashMap; use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder}; pub use prometheus::{ @@ -25,29 +25,6 @@ pub use prometheus::{ IntCounterVec as PrometheusIntCounterVec, IntGauge, IntGaugeVec as PrometheusIntGaugeVec, }; -pub struct OwnedPrometheusLabels { - labels: [Cow<'static, str>; N], -} - -impl OwnedPrometheusLabels { - pub fn new(labels: [Cow<'static, str>; N]) -> Self { - Self { labels } - } - - pub fn borrow_labels(&self) -> [&str; N] { - let mut labels = [""; N]; - - for (i, label) in self.labels.iter().enumerate() { - labels[i] = label.borrow(); - } - labels - } -} - -pub trait PrometheusLabels { - fn labels(&self) -> OwnedPrometheusLabels; -} - #[derive(Clone)] pub struct HistogramVec { underlying: PrometheusHistogramVec, @@ -81,63 +58,102 @@ impl IntGaugeVec { } } -pub fn new_counter(name: &str, description: &str, namespace: &str) -> IntCounter { - let counter_opts = Opts::new(name, description).namespace(namespace); - let counter = IntCounter::with_opts(counter_opts).expect("Failed to create counter"); - prometheus::register(Box::new(counter.clone())).expect("Failed to register counter"); +pub fn new_counter(name: &str, help: &str, subsystem: &str) -> IntCounter { + let counter_opts = Opts::new(name, help) + .namespace("quickwit") + .subsystem(subsystem); + let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter"); + prometheus::register(Box::new(counter.clone())).expect("failed to register counter"); counter } pub fn new_counter_vec( name: &str, - description: &str, - namespace: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], label_names: [&str; N], ) -> IntCounterVec { - let counter_opts = Opts::new(name, description).namespace(namespace); + let owned_const_labels: HashMap = const_labels + .iter() + .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) + .collect(); + let counter_opts = Opts::new(name, help) + .namespace("quickwit") + .subsystem(subsystem) + .const_labels(owned_const_labels); let underlying = PrometheusIntCounterVec::new(counter_opts, &label_names) - .expect("Failed to create counter vec"); - prometheus::register(Box::new(underlying.clone())).expect("Failed to register counter vec"); + .expect("failed to create counter vec"); + + let collector = Box::new(underlying.clone()); + prometheus::register(collector).expect("failed to register counter vec"); + IntCounterVec { underlying } } -pub fn new_gauge(name: &str, description: &str, namespace: &str) -> IntGauge { - let gauge_opts = Opts::new(name, description).namespace(namespace); - let gauge = IntGauge::with_opts(gauge_opts).expect("Failed to create gauge"); - prometheus::register(Box::new(gauge.clone())).expect("Failed to register gauge"); +pub fn new_gauge(name: &str, help: &str, subsystem: &str) -> IntGauge { + let gauge_opts = Opts::new(name, help) + .namespace("quickwit") + .subsystem(subsystem); + let gauge = IntGauge::with_opts(gauge_opts).expect("failed to create gauge"); + prometheus::register(Box::new(gauge.clone())).expect("failed to register gauge"); gauge } pub fn new_gauge_vec( name: &str, - description: &str, - namespace: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], label_names: [&str; N], ) -> IntGaugeVec { - let gauge_opts = Opts::new(name, description).namespace(namespace); + let owned_const_labels: HashMap = const_labels + .iter() + .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) + .collect(); + let gauge_opts = Opts::new(name, help) + .namespace("quickwit") + .subsystem(subsystem) + .const_labels(owned_const_labels); let underlying = - PrometheusIntGaugeVec::new(gauge_opts, &label_names).expect("Failed to create gauge vec"); - prometheus::register(Box::new(underlying.clone())).expect("Failed to register gauge vec"); + PrometheusIntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec"); + + let collector = Box::new(underlying.clone()); + prometheus::register(collector).expect("failed to register counter vec"); + IntGaugeVec { underlying } } -pub fn new_histogram(name: &str, description: &str, namespace: &str) -> Histogram { - let histogram_opts = HistogramOpts::new(name, description).namespace(namespace); - let histogram = Histogram::with_opts(histogram_opts).expect("Failed to create histogram"); - prometheus::register(Box::new(histogram.clone())).expect("Failed to register counter"); +pub fn new_histogram(name: &str, help: &str, subsystem: &str) -> Histogram { + let histogram_opts = HistogramOpts::new(name, help) + .namespace("quickwit") + .subsystem(subsystem); + let histogram = Histogram::with_opts(histogram_opts).expect("failed to create histogram"); + prometheus::register(Box::new(histogram.clone())).expect("failed to register histogram"); histogram } pub fn new_histogram_vec( name: &str, - description: &str, - namespace: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], label_names: [&str; N], ) -> HistogramVec { - let histogram_opts = HistogramOpts::new(name, description).namespace(namespace); + let owned_const_labels: HashMap = const_labels + .iter() + .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) + .collect(); + let histogram_opts = HistogramOpts::new(name, help) + .namespace("quickwit") + .subsystem(subsystem) + .const_labels(owned_const_labels); let underlying = PrometheusHistogramVec::new(histogram_opts, &label_names) - .expect("Failed to create histogram vec"); - prometheus::register(Box::new(underlying.clone())).expect("Failed to register histogram vec"); + .expect("failed to create histogram vec"); + + let collector = Box::new(underlying.clone()); + prometheus::register(collector).expect("failed to register histogram vec"); + HistogramVec { underlying } } diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index 20a1c55d7a8..61b204a8372 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -26,6 +26,8 @@ use tokio::sync::{mpsc, watch}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream}; use tracing::warn; +use crate::tower::RpcName; + pub type BoxStream = Pin + Send + Unpin + 'static>>; /// A stream impl for code-generated services with streaming endpoints. @@ -195,6 +197,14 @@ where T: Send + 'static } } +impl RpcName for ServiceStream +where T: RpcName +{ + fn rpc_name() -> &'static str { + T::rpc_name() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/quickwit/quickwit-common/src/tower/metrics.rs b/quickwit/quickwit-common/src/tower/metrics.rs index f082a9c50c2..d3daa484ed4 100644 --- a/quickwit/quickwit-common/src/tower/metrics.rs +++ b/quickwit/quickwit-common/src/tower/metrics.rs @@ -26,88 +26,95 @@ use pin_project::pin_project; use tower::{Layer, Service}; use crate::metrics::{ - new_counter_vec, new_histogram_vec, HistogramVec, IntCounterVec, OwnedPrometheusLabels, - PrometheusLabels, + new_counter_vec, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounterVec, IntGaugeVec, }; +pub trait RpcName { + fn rpc_name() -> &'static str; +} + #[derive(Clone)] -pub struct PrometheusMetrics { +pub struct GrpcMetrics { inner: S, - requests_total: IntCounterVec, - request_errors_total: IntCounterVec, - request_duration_seconds: HistogramVec, + requests_total: IntCounterVec<2>, + requests_in_flight: IntGaugeVec<1>, + request_duration_seconds: HistogramVec<2>, } -impl Service for PrometheusMetrics +impl Service for GrpcMetrics where S: Service, - R: PrometheusLabels, + R: RpcName, { type Response = S::Response; type Error = S::Error; - type Future = ResponseFuture; + type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) } fn call(&mut self, request: R) -> Self::Future { - let labels = request.labels(); - self.requests_total - .with_label_values(labels.borrow_labels()) - .inc(); let start = Instant::now(); + let rpc_name = R::rpc_name(); let inner = self.inner.call(request); + + self.requests_in_flight.with_label_values([rpc_name]).inc(); + ResponseFuture { inner, start, - labels, - request_errors_total: self.request_errors_total.clone(), + rpc_name, + requests_total: self.requests_total.clone(), + requests_in_flight: self.requests_in_flight.clone(), request_duration_seconds: self.request_duration_seconds.clone(), } } } #[derive(Clone)] -pub struct PrometheusMetricsLayer { - requests_total: IntCounterVec, - request_errors_total: IntCounterVec, - request_duration_seconds: HistogramVec, +pub struct GrpcMetricsLayer { + requests_total: IntCounterVec<2>, + requests_in_flight: IntGaugeVec<1>, + request_duration_seconds: HistogramVec<2>, } -impl PrometheusMetricsLayer { - pub fn new(namespace: &'static str, label_names: [&'static str; N]) -> Self { +impl GrpcMetricsLayer { + pub fn new(subsystem: &'static str, kind: &'static str) -> Self { Self { requests_total: new_counter_vec( - "requests_total", - "Total number of requests", - namespace, - label_names, + "grpc_requests_total", + "Total number of gRPC requests processed.", + subsystem, + &[("kind", kind)], + ["rpc", "status"], ), - request_errors_total: new_counter_vec( - "request_errors_total", - "Total number of failed requests", - namespace, - label_names, + requests_in_flight: new_gauge_vec( + "grpc_requests_in_flight", + "Number of gRPC requests in flight.", + subsystem, + &[("kind", kind)], + ["rpc"], ), request_duration_seconds: new_histogram_vec( - "request_duration_seconds", - "Duration of request in seconds", - namespace, - label_names, + "grpc_request_duration_seconds", + "Duration of request in seconds.", + subsystem, + &[("kind", kind)], + ["rpc", "status"], ), } } } -impl Layer for PrometheusMetricsLayer { - type Service = PrometheusMetrics; +impl Layer for GrpcMetricsLayer { + type Service = GrpcMetrics; fn layer(&self, inner: S) -> Self::Service { - PrometheusMetrics { + GrpcMetrics { inner, requests_total: self.requests_total.clone(), - request_errors_total: self.request_errors_total.clone(), + requests_in_flight: self.requests_in_flight.clone(), request_duration_seconds: self.request_duration_seconds.clone(), } } @@ -115,16 +122,17 @@ impl Layer for PrometheusMetricsLayer { /// Response future for [`PrometheusMetrics`]. #[pin_project] -pub struct ResponseFuture { +pub struct ResponseFuture { #[pin] inner: F, start: Instant, - labels: OwnedPrometheusLabels, - request_errors_total: IntCounterVec, - request_duration_seconds: HistogramVec, + rpc_name: &'static str, + requests_total: IntCounterVec<2>, + requests_in_flight: IntGaugeVec<1>, + request_duration_seconds: HistogramVec<2>, } -impl Future for ResponseFuture +impl Future for ResponseFuture where F: Future> { type Output = Result; @@ -132,16 +140,17 @@ where F: Future> fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let response = ready!(this.inner.poll(cx)); - let elapsed = this.start.elapsed(); + let elapsed = this.start.elapsed().as_secs_f64(); - if response.is_err() { - this.request_errors_total - .with_label_values(this.labels.borrow_labels()) - .inc(); - } + let rpc_name = this.rpc_name; + let status = if response.is_ok() { "success" } else { "error" }; + let label_values = [rpc_name, status]; + + this.requests_total.with_label_values(label_values).inc(); + this.requests_in_flight.with_label_values([rpc_name]).dec(); this.request_duration_seconds - .with_label_values(this.labels.borrow_labels()) - .observe(elapsed.as_secs_f64()); + .with_label_values(label_values) + .observe(elapsed); Poll::Ready(Ok(response?)) } @@ -149,25 +158,27 @@ where F: Future> #[cfg(test)] mod tests { - use quickwit_macros::PrometheusLabels; - use super::*; - #[derive(PrometheusLabels)] - struct HelloRequest { - #[prometheus_label] - name: String, + struct HelloRequest; + + impl RpcName for HelloRequest { + fn rpc_name() -> &'static str { + "hello" + } } - #[derive(PrometheusLabels)] - struct GoodbyeRequest { - #[prometheus_label] - name: String, + struct GoodbyeRequest; + + impl RpcName for GoodbyeRequest { + fn rpc_name() -> &'static str { + "goodbye" + } } #[tokio::test] - async fn test_prometheus_metrics() { - let layer = PrometheusMetricsLayer::new("test", ["request", "name"]); + async fn test_grpc_metrics() { + let layer = GrpcMetricsLayer::new("quickwit_test", "server"); let mut hello_service = layer @@ -181,36 +192,30 @@ mod tests { .layer(tower::service_fn(|request: GoodbyeRequest| async move { Ok::<_, ()>(request) })); - let request = HelloRequest { - name: "world".to_string(), - }; - hello_service.call(request).await.unwrap(); + hello_service.call(HelloRequest).await.unwrap(); assert_eq!( layer .requests_total - .with_label_values(["hello", "world"]) + .with_label_values(["hello", "success"]) .get(), 1 ); assert_eq!( layer .requests_total - .with_label_values(["goodbye", "world"]) + .with_label_values(["goodbye", "success"]) .get(), 0 ); - let request = GoodbyeRequest { - name: "world".to_string(), - }; - goodbye_service.call(request).await.unwrap(); + goodbye_service.call(GoodbyeRequest).await.unwrap(); assert_eq!( layer .requests_total - .with_label_values(["goodbye", "world"]) + .with_label_values(["goodbye", "success"]) .get(), 1 ); diff --git a/quickwit/quickwit-common/src/tower/mod.rs b/quickwit/quickwit-common/src/tower/mod.rs index 13ba0f93bad..dbd9b75d5f4 100644 --- a/quickwit/quickwit-common/src/tower/mod.rs +++ b/quickwit/quickwit-common/src/tower/mod.rs @@ -41,7 +41,7 @@ pub use change::Change; pub use estimate_rate::{EstimateRate, EstimateRateLayer}; pub use event_listener::{EventListener, EventListenerLayer}; use futures::Future; -pub use metrics::{PrometheusMetrics, PrometheusMetricsLayer}; +pub use metrics::{GrpcMetrics, GrpcMetricsLayer, RpcName}; pub use pool::Pool; pub use rate::{ConstantRate, Rate}; pub use rate_estimator::{RateEstimator, SmaRateEstimator}; diff --git a/quickwit/quickwit-control-plane/src/metrics.rs b/quickwit/quickwit-control-plane/src/metrics.rs index f6e20cb9ba4..6b2167ccaec 100644 --- a/quickwit/quickwit-control-plane/src/metrics.rs +++ b/quickwit/quickwit-control-plane/src/metrics.rs @@ -34,29 +34,30 @@ impl Default for ControlPlaneMetrics { restart_total: new_counter( "restart_total", "Number of control plane restart.", - "quickwit_control_plane", + "control_plane", ), schedule_total: new_counter( "schedule_total", "Number of control plane `schedule` operation.", - "quickwit_control_plane", + "control_plane", ), metastore_error_aborted: new_counter( "metastore_error_aborted", "Number of aborted metastore transaction (= do not trigger a control plane \ restart)", - "quickwit_control_plane", + "control_plane", ), metastore_error_maybe_executed: new_counter( "metastore_error_maybe_executed", "Number of metastore transaction with an uncertain outcome (= do trigger a \ control plane restart)", - "quickwit_control_plane", + "control_plane", ), open_shards_total: new_gauge_vec( "open_shards_total", "Number of open shards per source.", - "quickwit_control_plane", + "control_plane", + &[], ["index_id", "source_id"], ), } diff --git a/quickwit/quickwit-indexing/src/metrics.rs b/quickwit/quickwit-indexing/src/metrics.rs index e24cc3872d0..9dc6393609e 100644 --- a/quickwit/quickwit-indexing/src/metrics.rs +++ b/quickwit/quickwit-indexing/src/metrics.rs @@ -39,43 +39,47 @@ impl Default for IndexerMetrics { "processed_docs_total", "Number of processed docs by index, source and processed status in [valid, \ schema_error, parse_error, transform_error]", - "quickwit_indexing", + "indexing", + &[], ["index", "docs_processed_status"], ), processed_bytes: new_counter_vec( "processed_bytes", "Number of bytes of processed documents by index, source and processed status in \ [valid, schema_error, parse_error, transform_error]", - "quickwit_indexing", + "indexing", + &[], ["index", "docs_processed_status"], ), backpressure_micros: new_counter_vec( "backpressure_micros", "Amount of time spent in backpressure (in micros). This time only includes the \ amount of time spent waiting for a place in the queue of another actor.", - "quickwit_indexing", + "indexing", + &[], ["actor_name"], ), available_concurrent_upload_permits: new_gauge_vec( "concurrent_upload_available_permits_num", "Number of available concurrent upload permits by component in [merger, indexer]", - "quickwit_indexing", + "indexing", + &[], ["component"], ), ongoing_merge_operations: new_gauge( "ongoing_merge_operations", "Number of ongoing merge operations", - "quickwit_indexing", + "indexing", ), pending_merge_operations: new_gauge( "pending_merge_operations", "Number of pending merge operations", - "quickwit_indexing", + "indexing", ), pending_merge_bytes: new_gauge( "pending_merge_bytes", "Number of pending merge bytes", - "quickwit_indexing", + "indexing", ), } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 05ef6f5f301..1245dae3316 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -77,7 +77,7 @@ use super::replication::{ use super::state::{IngesterState, InnerIngesterState, WeakIngesterState}; use super::IngesterPool; use crate::metrics::INGEST_METRICS; -use crate::{estimate_size, with_lock_metrics, with_request_metrics, FollowerId}; +use crate::{estimate_size, with_lock_metrics, FollowerId}; /// Minimum interval between two reset shards operations. const MIN_RESET_SHARDS_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) { @@ -371,14 +371,9 @@ impl Ingester { .ok_or(IngestV2Error::IngesterUnavailable { ingester_id: follower_id.clone(), })?; - let mut ack_replication_stream = with_request_metrics!( - ingester - .open_replication_stream(syn_replication_stream) - .await, - "ingester", - "client", - "open_replication_stream" - )?; + let mut ack_replication_stream = ingester + .open_replication_stream(syn_replication_stream) + .await?; ack_replication_stream .next() .await @@ -976,12 +971,7 @@ impl Ingester { ingester_id: follower_id, } })?; - with_request_metrics!( - ingester.ping(ping_request).await, - "ingester", - "client", - "ping" - )?; + ingester.ping(ping_request).await?; let ping_response = PingResponse {}; Ok(ping_response) } @@ -1010,63 +1000,38 @@ impl IngesterService for Ingester { &mut self, persist_request: PersistRequest, ) -> IngestV2Result { - with_request_metrics!( - self.persist_inner(persist_request).await, - "ingester", - "server", - "persist" - ) + self.persist_inner(persist_request).await } async fn open_replication_stream( &mut self, syn_replication_stream: quickwit_common::ServiceStream, ) -> IngestV2Result> { - with_request_metrics!( - self.open_replication_stream_inner(syn_replication_stream) - .await, - "ingester", - "server", - "open_replication_stream" - ) + self.open_replication_stream_inner(syn_replication_stream) + .await } async fn open_fetch_stream( &mut self, open_fetch_stream_request: OpenFetchStreamRequest, ) -> IngestV2Result>> { - with_request_metrics!( - self.open_fetch_stream_inner(open_fetch_stream_request) - .await, - "ingester", - "server", - "open_fetch_stream" - ) + self.open_fetch_stream_inner(open_fetch_stream_request) + .await } async fn open_observation_stream( &mut self, open_observation_stream_request: OpenObservationStreamRequest, ) -> IngestV2Result> { - with_request_metrics!( - self.open_observation_stream_inner(open_observation_stream_request) - .await, - "ingester", - "server", - "open_observation_stream" - ) + self.open_observation_stream_inner(open_observation_stream_request) + .await } async fn init_shards( &mut self, init_shards_request: InitShardsRequest, ) -> IngestV2Result { - with_request_metrics!( - self.init_shards_inner(init_shards_request).await, - "ingester", - "server", - "init_shards" - ) + self.init_shards_inner(init_shards_request).await } async fn retain_shards( @@ -1106,45 +1071,25 @@ impl IngesterService for Ingester { &mut self, truncate_shards_request: TruncateShardsRequest, ) -> IngestV2Result { - with_request_metrics!( - self.truncate_shards_inner(truncate_shards_request).await, - "ingester", - "server", - "truncate_shards" - ) + self.truncate_shards_inner(truncate_shards_request).await } async fn close_shards( &mut self, close_shards_request: CloseShardsRequest, ) -> IngestV2Result { - with_request_metrics!( - self.close_shards_inner(close_shards_request).await, - "ingester", - "server", - "close_shards" - ) + self.close_shards_inner(close_shards_request).await } async fn ping(&mut self, ping_request: PingRequest) -> IngestV2Result { - with_request_metrics!( - self.ping_inner(ping_request).await, - "ingester", - "server", - "ping" - ) + self.ping_inner(ping_request).await } async fn decommission( &mut self, decommission_request: DecommissionRequest, ) -> IngestV2Result { - with_request_metrics!( - self.decommission_inner(decommission_request).await, - "ingester", - "server", - "decommission" - ) + self.decommission_inner(decommission_request).await } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs index b41546ba9d5..8cec728a99f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/metrics.rs @@ -24,9 +24,6 @@ use quickwit_common::metrics::{ }; pub(super) struct IngestV2Metrics { - pub grpc_requests_total: IntCounterVec<4>, - pub grpc_requests_in_flight: IntGaugeVec<3>, - pub grpc_request_duration_secs: HistogramVec<4>, pub reset_shards_operations_total: IntCounterVec<1>, pub shards: IntGaugeVec<2>, pub wal_acquire_lock_requests_in_flight: IntGaugeVec<2>, @@ -38,57 +35,43 @@ pub(super) struct IngestV2Metrics { impl Default for IngestV2Metrics { fn default() -> Self { Self { - grpc_requests_total: new_counter_vec( - "grpc_requests_total", - "Total number of gRPC requests processed.", - "quickwit_ingest", - ["component", "kind", "operation", "status"], - ), - grpc_requests_in_flight: new_gauge_vec( - "grpc_requests_in_flight", - "Number of gRPC requests in flight.", - "quickwit_ingest", - ["component", "kind", "operation"], - ), - grpc_request_duration_secs: new_histogram_vec( - "grpc_request_duration_secs", - "Duration of gRPC requests in seconds.", - "quickwit_ingest", - ["component", "kind", "operation", "status"], - ), reset_shards_operations_total: new_counter_vec( "reset_shards_operations_total", "Total number of reset shards operations performed.", - "quickwit_ingest", + "ingest", + &[], ["status"], ), shards: new_gauge_vec( "shards", "Number of shards.", - "quickwit_ingest", + "ingest", + &[], ["state", "index_id"], ), wal_acquire_lock_requests_in_flight: new_gauge_vec( "wal_acquire_lock_requests_in_flight", "Number of acquire lock requests in flight.", - "quickwit_ingest", + "ingest", + &[], ["operation", "type"], ), wal_acquire_lock_request_duration_secs: new_histogram_vec( "wal_acquire_lock_request_duration_secs", "Duration of acquire lock requests in seconds.", - "quickwit_ingest", + "ingest", + &[], ["operation", "type"], ), wal_disk_usage_bytes: new_gauge( "wal_disk_usage_bytes", "WAL disk usage in bytes.", - "quickwit_ingest", + "ingest", ), wal_memory_usage_bytes: new_gauge( "wal_memory_usage_bytes", "WAL memory usage in bytes.", - "quickwit_ingest", + "ingest", ), } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 58eb6901cc8..2cf48225c45 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -42,7 +42,7 @@ use super::mrecordlog_utils::check_enough_capacity; use super::state::IngesterState; use crate::ingest_v2::metrics::INGEST_V2_METRICS; use crate::metrics::INGEST_METRICS; -use crate::{estimate_size, with_lock_metrics, with_request_metrics}; +use crate::{estimate_size, with_lock_metrics}; pub(super) const SYN_REPLICATION_STREAM_CAPACITY: usize = 5; @@ -301,15 +301,6 @@ pub(super) enum ReplicationError { Timeout, } -impl ReplicationError { - pub(super) fn label_value(&self) -> &'static str { - match self { - Self::Timeout { .. } => "timeout", - _ => "error", - } - } -} - // DO NOT derive or implement `Clone` for this object. #[derive(Debug)] pub(super) struct ReplicationClient { @@ -334,19 +325,15 @@ impl ReplicationClient { let replication_request = ReplicationRequest::Init(init_replica_request); async { - with_request_metrics!( - self.submit(replication_request).await, - "ingester", - "client", - "init_replica" - ) - .map(|replication_response| { - if let ReplicationResponse::Init(init_replica_response) = replication_response { - init_replica_response - } else { - panic!("response should be an init replica response") - } - }) + self.submit(replication_request) + .await + .map(|replication_response| { + if let ReplicationResponse::Init(init_replica_response) = replication_response { + init_replica_response + } else { + panic!("response should be an init replica response") + } + }) } } @@ -369,19 +356,16 @@ impl ReplicationClient { let replication_request = ReplicationRequest::Replicate(replicate_request); async { - with_request_metrics!( - self.submit(replication_request).await, - "ingester", - "client", - "replicate" - ) - .map(|replication_response| { - if let ReplicationResponse::Replicate(replicate_response) = replication_response { - replicate_response - } else { - panic!("response should be a replicate response") - } - }) + self.submit(replication_request) + .await + .map(|replication_response| { + if let ReplicationResponse::Replicate(replicate_response) = replication_response + { + replicate_response + } else { + panic!("response should be a replicate response") + } + }) } } @@ -691,24 +675,14 @@ impl ReplicationTask { Some(syn_replication_message::Message::OpenRequest(_)) => { panic!("TODO: this should not happen, internal error"); } - Some(syn_replication_message::Message::InitRequest(init_replica_request)) => { - with_request_metrics!( - self.init_replica(init_replica_request).await, - "ingester", - "server", - "init_replica" - ) - .map(AckReplicationMessage::new_init_replica_response) - } - Some(syn_replication_message::Message::ReplicateRequest(replicate_request)) => { - with_request_metrics!( - self.replicate(replicate_request).await, - "ingester", - "server", - "replicate" - ) - .map(AckReplicationMessage::new_replicate_response) - } + Some(syn_replication_message::Message::InitRequest(init_replica_request)) => self + .init_replica(init_replica_request) + .await + .map(AckReplicationMessage::new_init_replica_response), + Some(syn_replication_message::Message::ReplicateRequest(replicate_request)) => self + .replicate(replicate_request) + .await + .map(AckReplicationMessage::new_replicate_response), None => { warn!("received empty SYN replication message"); continue; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 40fa7bcd0ea..f9f85013321 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -46,7 +46,7 @@ use super::routing_table::RoutingTable; use super::workbench::IngestWorkbench; use super::IngesterPool; use crate::semaphore_with_waiter::SemaphoreWithMaxWaiters; -use crate::{with_request_metrics, LeaderId}; +use crate::LeaderId; /// Duration after which ingest requests time out with [`IngestV2Error::Timeout`]. pub(super) const INGEST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) { @@ -181,12 +181,7 @@ impl IngestRouter { if request.subrequests.is_empty() { return; } - let response_result = with_request_metrics!( - self.control_plane.get_or_create_open_shards(request).await, - "router", - "client", - "get_or_create_open_shards" - ); + let response_result = self.control_plane.get_or_create_open_shards(request).await; let response = match response_result { Ok(response) => response, Err(control_plane_error) => { @@ -366,17 +361,12 @@ impl IngestRouter { commit_type: commit_type as i32, }; let persist_future = async move { - let persist_result = with_request_metrics!( - tokio::time::timeout( - PERSIST_REQUEST_TIMEOUT, - ingester.persist(persist_request), - ) - .await - .unwrap_or_else(|_| Err(IngestV2Error::Timeout)), - "router", - "client", - "persist" - ); + let persist_result = tokio::time::timeout( + PERSIST_REQUEST_TIMEOUT, + ingester.persist(persist_request), + ) + .await + .unwrap_or_else(|_| Err(IngestV2Error::Timeout)); (persist_summary, persist_result) }; persist_futures.push(persist_future); @@ -429,13 +419,8 @@ impl IngestRouterService for IngestRouter { .acquire() .await .map_err(|()| IngestV2Error::TooManyRequests)?; - with_request_metrics!( - self.ingest_timeout(ingest_request, INGEST_REQUEST_TIMEOUT) - .await, - "router", - "server", - "ingest" - ) + self.ingest_timeout(ingest_request, INGEST_REQUEST_TIMEOUT) + .await } } diff --git a/quickwit/quickwit-ingest/src/lib.rs b/quickwit/quickwit-ingest/src/lib.rs index 12312d45b13..8766717c3b0 100644 --- a/quickwit/quickwit-ingest/src/lib.rs +++ b/quickwit/quickwit-ingest/src/lib.rs @@ -125,39 +125,6 @@ impl CommitType { } } -#[macro_export] -macro_rules! with_request_metrics { - ($future:expr, $($label:tt),*) => { - { - let now = std::time::Instant::now(); - - $crate::ingest_v2::metrics::INGEST_V2_METRICS - .grpc_requests_in_flight - .with_label_values([$($label),*]) - .inc(); - - let result = $future; - - $crate::ingest_v2::metrics::INGEST_V2_METRICS - .grpc_requests_in_flight - .with_label_values([$($label),*]) - .dec(); - let status_label = match &result { - Ok(_) => "success", - Err(error) => error.label_value(), - }; - $crate::ingest_v2::metrics::INGEST_V2_METRICS.grpc_requests_total - .with_label_values([$($label),*, status_label]) - .inc(); - $crate::ingest_v2::metrics::INGEST_V2_METRICS.grpc_request_duration_secs - .with_label_values([$($label),*, status_label]) - .observe(now.elapsed().as_secs_f64()); - - result - } - } -} - #[macro_export] macro_rules! with_lock_metrics { ($future:expr, $($label:tt),*) => { diff --git a/quickwit/quickwit-ingest/src/metrics.rs b/quickwit/quickwit-ingest/src/metrics.rs index 212431b74de..3f088ae2e79 100644 --- a/quickwit/quickwit-ingest/src/metrics.rs +++ b/quickwit/quickwit-ingest/src/metrics.rs @@ -34,28 +34,24 @@ impl Default for IngestMetrics { ingested_num_bytes: new_counter( "ingested_num_bytes", "Total size of the docs ingested in bytes", - "quickwit_ingest", + "ingest", ), ingested_num_docs: new_counter( "ingested_num_docs", "Number of docs received to be ingested", - "quickwit_ingest", + "ingest", ), replicated_num_bytes_total: new_counter( "replicated_num_bytes_total", "Total size in bytes of the replicated docs.", - "quickwit_ingest", + "ingest", ), replicated_num_docs_total: new_counter( "replicated_num_docs_total", "Total number of docs replicated.", - "quickwit_ingest", - ), - queue_count: new_gauge( - "queue_count", - "Number of queues currently active", - "quickwit_ingest", + "ingest", ), + queue_count: new_gauge("queue_count", "Number of queues currently active", "ingest"), } } } diff --git a/quickwit/quickwit-jaeger/src/metrics.rs b/quickwit/quickwit-jaeger/src/metrics.rs index 5b4a3ebb93d..3c8dde0579e 100644 --- a/quickwit/quickwit-jaeger/src/metrics.rs +++ b/quickwit/quickwit-jaeger/src/metrics.rs @@ -35,37 +35,43 @@ impl Default for JaegerServiceMetrics { requests_total: new_counter_vec( "requests_total", "Number of requests", - "quickwit_jaeger", + "jaeger", + &[], ["operation", "index"], ), request_errors_total: new_counter_vec( "request_errors_total", "Number of failed requests", - "quickwit_jaeger", + "jaeger", + &[], ["operation", "index"], ), request_duration_seconds: new_histogram_vec( "request_duration_seconds", "Duration of requests", - "quickwit_jaeger", + "jaeger", + &[], ["operation", "index", "error"], ), fetched_traces_total: new_counter_vec( "fetched_traces_total", "Number of traces retrieved from storage", - "quickwit_jaeger", + "jaeger", + &[], ["operation", "index"], ), fetched_spans_total: new_counter_vec( "fetched_spans_total", "Number of spans retrieved from storage", - "quickwit_jaeger", + "jaeger", + &[], ["operation", "index"], ), transferred_bytes_total: new_counter_vec( "transferred_bytes_total", "Number of bytes transferred", - "quickwit_jaeger", + "jaeger", + &[], ["operation", "index"], ), } diff --git a/quickwit/quickwit-janitor/src/metrics.rs b/quickwit/quickwit-janitor/src/metrics.rs index c46ae23cd93..d3392af7b3f 100644 --- a/quickwit/quickwit-janitor/src/metrics.rs +++ b/quickwit/quickwit-janitor/src/metrics.rs @@ -31,6 +31,7 @@ impl Default for JanitorMetrics { "ongoing_num_delete_operations_total", "Num of ongoing delete operations (per index).", "quickwit_janitor", + &[], ["index"], ), } diff --git a/quickwit/quickwit-macros/Cargo.toml b/quickwit/quickwit-macros/Cargo.toml index f15fec646d5..32ae36c57a8 100644 --- a/quickwit/quickwit-macros/Cargo.toml +++ b/quickwit/quickwit-macros/Cargo.toml @@ -17,5 +17,3 @@ proc-macro = true proc-macro2 = { workspace = true } quote = { workspace = true } syn = { workspace = true } - -quickwit-macros-impl = { workspace = true } diff --git a/quickwit/quickwit-macros/impl/Cargo.toml b/quickwit/quickwit-macros/impl/Cargo.toml deleted file mode 100644 index 75e2cd9eac3..00000000000 --- a/quickwit/quickwit-macros/impl/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "quickwit-macros-impl" -description = "Proc macro implementations" - -version.workspace = true -edition.workspace = true -homepage.workspace = true -documentation.workspace = true -repository.workspace = true -authors.workspace = true -license.workspace = true - -[dependencies] -heck = { workspace = true } -proc-macro2 = { workspace = true } -quote = { workspace = true } -syn = { workspace = true } diff --git a/quickwit/quickwit-macros/impl/src/lib.rs b/quickwit/quickwit-macros/impl/src/lib.rs deleted file mode 100644 index 8fdb20fe68c..00000000000 --- a/quickwit/quickwit-macros/impl/src/lib.rs +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright (C) 2024 Quickwit, Inc. -// -// Quickwit is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at hello@quickwit.io. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use heck::ToSnakeCase; -use proc_macro2::TokenStream; -use quote::quote; -use syn::{parse2, Data, DeriveInput, Index}; - -pub fn derive_prometheus_labels_impl(input: TokenStream) -> TokenStream { - let DeriveInput { ident, data, .. } = parse2(input).unwrap(); - - let snake_case_ident = ident - .to_string() - .trim_end_matches("Request") - .to_snake_case(); - - let struct_label = quote! { - std::borrow::Cow::Borrowed(#snake_case_ident) - }; - - let Data::Struct(data_struct) = data else { - panic!("`PrometheusLabels` can only be derived for structs.") - }; - let mut field_labels = Vec::new(); - - for (i, field) in data_struct.fields.iter().enumerate() { - if field - .attrs - .iter() - .any(|attr| attr.path().is_ident("prometheus_label")) - { - let field_label = if let Some(field_ident) = &field.ident { - quote! { - std::borrow::Cow::Owned(self.#field_ident.to_string()) - } - } else { - let field_index = Index::from(i); - quote! { - std::borrow::Cow::Owned(self.#field_index.to_string()) - } - }; - field_labels.push(field_label); - } - } - - let num_labels = field_labels.len() + 1; - - let codegen = quote! { - impl PrometheusLabels<#num_labels> for #ident { - fn labels(&self) -> OwnedPrometheusLabels<#num_labels> { - OwnedPrometheusLabels::new([#struct_label, #( #field_labels ),*]) - } - } - }; - codegen -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_unit_struct() { - let unit_struct = quote! { - struct MyUnitRequest; - }; - let codegen = derive_prometheus_labels_impl(unit_struct); - - let expected_codegen = quote! { - impl PrometheusLabels<1usize> for MyUnitRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("my_unit"),]) - } - } - }; - assert_eq!(codegen.to_string(), expected_codegen.to_string()); - } - - #[test] - fn test_unamed_fields_struct() { - let unamed_fields_struct = quote! { - struct MyUnamedFieldsRequest(#[prometheus_label] String, String); - }; - let codegen = derive_prometheus_labels_impl(unamed_fields_struct); - - let expected_codegen = quote! { - impl PrometheusLabels<2usize> for MyUnamedFieldsRequest { - fn labels(&self) -> OwnedPrometheusLabels<2usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("my_unamed_fields"), std::borrow::Cow::Owned(self.0.to_string())]) - } - } - }; - assert_eq!(codegen.to_string(), expected_codegen.to_string()); - } - - #[test] - fn test_named_fields_struct() { - let named_fields_struct = quote! { - struct MyNamedFieldsRequest { - #[prometheus_label] - foo: String, - bar: String, - } - }; - let codegen = derive_prometheus_labels_impl(named_fields_struct); - - let expected_codegen = quote! { - impl PrometheusLabels<2usize> for MyNamedFieldsRequest { - fn labels(&self) -> OwnedPrometheusLabels<2usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("my_named_fields"), std::borrow::Cow::Owned(self.foo.to_string())]) - } - } - }; - assert_eq!(codegen.to_string(), expected_codegen.to_string()); - } -} diff --git a/quickwit/quickwit-macros/src/lib.rs b/quickwit/quickwit-macros/src/lib.rs index 9bf816fd4bb..3556017eb69 100644 --- a/quickwit/quickwit-macros/src/lib.rs +++ b/quickwit/quickwit-macros/src/lib.rs @@ -21,7 +21,6 @@ use std::mem; use proc_macro::TokenStream; use proc_macro2::{Span, TokenStream as TokenStream2}; -use quickwit_macros_impl::derive_prometheus_labels_impl; use quote::quote; use syn::parse::{Parse, ParseStream, Parser}; use syn::punctuated::Punctuated; @@ -30,11 +29,6 @@ use syn::{ Token, Visibility, }; -#[proc_macro_derive(PrometheusLabels, attributes(prometheus_label))] -pub fn derive_prometheus_labels(input: TokenStream) -> TokenStream { - derive_prometheus_labels_impl(input.into()).into() -} - #[proc_macro_attribute] pub fn serde_multikey(attr: TokenStream, item: TokenStream) -> TokenStream { match serde_multikey_inner(attr, item) { diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs index 7781822513e..cbc883595ae 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_metastore_factory.rs @@ -32,7 +32,6 @@ use regex::Regex; use tokio::sync::Mutex; use tracing::debug; -use crate::metastore::instrument_metastore; use crate::{FileBackedMetastore, MetastoreFactory, MetastoreResolverError}; /// A file-backed metastore factory. @@ -143,9 +142,9 @@ impl MetastoreFactory for FileBackedMetastoreFactory { })?; let file_backed_metastore = FileBackedMetastore::try_new(storage, polling_interval_opt) .await + .map(MetastoreServiceClient::new) .map_err(MetastoreResolverError::Initialization)?; - let instrumented_metastore = instrument_metastore(file_backed_metastore); - let unique_metastore_for_uri = self.cache_metastore(uri, instrumented_metastore).await; + let unique_metastore_for_uri = self.cache_metastore(uri, file_backed_metastore).await; Ok(unique_metastore_for_uri) } } diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index b55b2052b64..1ab698fd6d7 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -30,8 +30,6 @@ use async_trait::async_trait; use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; -use once_cell::sync::Lazy; -use quickwit_common::tower::PrometheusMetricsLayer; use quickwit_config::{IndexConfig, SourceConfig}; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_proto::metastore::{ @@ -49,17 +47,6 @@ use crate::{Split, SplitMetadata, SplitState}; /// Splits batch size returned by the stream splits API pub(crate) const STREAM_SPLITS_CHUNK_SIZE: usize = 100; -static METASTORE_METRICS_LAYER: Lazy> = - Lazy::new(|| PrometheusMetricsLayer::new("quickwit_metastore", ["request"])); - -pub(crate) fn instrument_metastore( - metastore_impl: impl MetastoreService, -) -> MetastoreServiceClient { - MetastoreServiceClient::tower() - .stack_layer(METASTORE_METRICS_LAYER.clone()) - .build(metastore_impl) -} - /// An extended trait for [`MetastoreService`]. #[async_trait] pub trait MetastoreServiceExt: MetastoreService { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs b/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs index 32f3fcb5da2..483eef84a6c 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/factory.rs @@ -27,7 +27,6 @@ use quickwit_proto::metastore::MetastoreServiceClient; use tokio::sync::Mutex; use tracing::debug; -use crate::metastore::instrument_metastore; use crate::{MetastoreFactory, MetastoreResolverError, PostgresqlMetastore}; #[derive(Clone, Default)] @@ -90,10 +89,10 @@ impl MetastoreFactory for PostgresqlMetastoreFactory { })?; let postgresql_metastore = PostgresqlMetastore::new(postgresql_metastore_config, uri) .await + .map(MetastoreServiceClient::new) .map_err(MetastoreResolverError::Initialization)?; - let instrumented_metastore = instrument_metastore(postgresql_metastore); let unique_metastore_for_uri = self - .cache_metastore(uri.clone(), instrumented_metastore) + .cache_metastore(uri.clone(), postgresql_metastore) .await; Ok(unique_metastore_for_uri) } diff --git a/quickwit/quickwit-opentelemetry/src/otlp/metrics.rs b/quickwit/quickwit-opentelemetry/src/otlp/metrics.rs index 8beb7932110..fa39789300e 100644 --- a/quickwit/quickwit-opentelemetry/src/otlp/metrics.rs +++ b/quickwit/quickwit-opentelemetry/src/otlp/metrics.rs @@ -35,37 +35,43 @@ impl Default for OtlpServiceMetrics { requests_total: new_counter_vec( "requests_total", "Number of requests", - "quickwit_otlp", + "otlp", + &[], ["service", "index", "transport", "format"], ), request_errors_total: new_counter_vec( "request_errors_total", "Number of failed requests", - "quickwit_otlp", + "otlp", + &[], ["service", "index", "transport", "format"], ), request_duration_seconds: new_histogram_vec( "request_duration_seconds", "Duration of requests", - "quickwit_otlp", + "otlp", + &[], ["service", "index", "transport", "format", "error"], ), ingested_log_records_total: new_counter_vec( "ingested_log_records_total", "Number of log records ingested", - "quickwit_otlp", + "otlp", + &[], ["service", "index", "transport", "format"], ), ingested_spans_total: new_counter_vec( "ingested_spans_total", "Number of spans ingested", - "quickwit_otlp", + "otlp", + &[], ["service", "index", "transport", "format"], ), ingested_bytes_total: new_counter_vec( "ingested_bytes_total", "Number of bytes ingested", - "quickwit_otlp", + "otlp", + &[], ["service", "index", "transport", "format"], ), } diff --git a/quickwit/quickwit-proto/build.rs b/quickwit/quickwit-proto/build.rs index 651a93dcb54..bfddf365eab 100644 --- a/quickwit/quickwit-proto/build.rs +++ b/quickwit/quickwit-proto/build.rs @@ -85,7 +85,7 @@ fn main() -> Result<(), Box> { .with_result_type_path("crate::metastore::MetastoreResult") .with_error_type_path("crate::metastore::MetastoreError") .generate_extra_service_methods() - .generate_prom_labels_for_requests() + .generate_rpc_name_impls() .run() .unwrap(); @@ -128,6 +128,7 @@ fn main() -> Result<(), Box> { .with_output_dir("src/codegen/quickwit") .with_result_type_path("crate::ingest::IngestV2Result") .with_error_type_path("crate::ingest::IngestV2Error") + .generate_rpc_name_impls() .run() .unwrap(); diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs index f8f4dfa966a..cca100efe84 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.ingester.rs @@ -530,6 +530,57 @@ impl IngesterStatus { #[allow(unused_imports)] use std::str::FromStr; use tower::{Layer, Service, ServiceExt}; +use quickwit_common::tower::RpcName; +impl RpcName for PersistRequest { + fn rpc_name() -> &'static str { + "persist" + } +} +impl RpcName for SynReplicationMessage { + fn rpc_name() -> &'static str { + "open_replication_stream" + } +} +impl RpcName for OpenFetchStreamRequest { + fn rpc_name() -> &'static str { + "open_fetch_stream" + } +} +impl RpcName for OpenObservationStreamRequest { + fn rpc_name() -> &'static str { + "open_observation_stream" + } +} +impl RpcName for InitShardsRequest { + fn rpc_name() -> &'static str { + "init_shards" + } +} +impl RpcName for RetainShardsRequest { + fn rpc_name() -> &'static str { + "retain_shards" + } +} +impl RpcName for TruncateShardsRequest { + fn rpc_name() -> &'static str { + "truncate_shards" + } +} +impl RpcName for CloseShardsRequest { + fn rpc_name() -> &'static str { + "close_shards" + } +} +impl RpcName for PingRequest { + fn rpc_name() -> &'static str { + "ping" + } +} +impl RpcName for DecommissionRequest { + fn rpc_name() -> &'static str { + "decommission" + } +} pub type IngesterServiceStream = quickwit_common::ServiceStream< crate::ingest::IngestV2Result, >; diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs index 0a2dbddc04e..422ab658274 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.ingest.router.rs @@ -114,6 +114,12 @@ impl IngestFailureReason { #[allow(unused_imports)] use std::str::FromStr; use tower::{Layer, Service, ServiceExt}; +use quickwit_common::tower::RpcName; +impl RpcName for IngestRequestV2 { + fn rpc_name() -> &'static str { + "ingest" + } +} #[cfg_attr(any(test, feature = "testsuite"), mockall::automock)] #[async_trait::async_trait] pub trait IngestRouterService: std::fmt::Debug + dyn_clone::DynClone + Send + Sync + 'static { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index 67e19b81f68..5d86f3f2c96 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -525,150 +525,140 @@ impl SourceType { #[allow(unused_imports)] use std::str::FromStr; use tower::{Layer, Service, ServiceExt}; -use quickwit_common::metrics::{PrometheusLabels, OwnedPrometheusLabels}; -impl PrometheusLabels<1> for CreateIndexRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("create_index")]) +use quickwit_common::tower::RpcName; +impl RpcName for CreateIndexRequest { + fn rpc_name() -> &'static str { + "create_index" } } -impl PrometheusLabels<1> for IndexMetadataRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("index_metadata")]) +impl RpcName for IndexMetadataRequest { + fn rpc_name() -> &'static str { + "index_metadata" } } -impl PrometheusLabels<1> for ListIndexesMetadataRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("list_indexes_metadata")]) +impl RpcName for ListIndexesMetadataRequest { + fn rpc_name() -> &'static str { + "list_indexes_metadata" } } -impl PrometheusLabels<1> for DeleteIndexRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("delete_index")]) +impl RpcName for DeleteIndexRequest { + fn rpc_name() -> &'static str { + "delete_index" } } -impl PrometheusLabels<1> for ListSplitsRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("list_splits")]) +impl RpcName for ListSplitsRequest { + fn rpc_name() -> &'static str { + "list_splits" } } -impl PrometheusLabels<1> for StageSplitsRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("stage_splits")]) +impl RpcName for StageSplitsRequest { + fn rpc_name() -> &'static str { + "stage_splits" } } -impl PrometheusLabels<1> for PublishSplitsRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("publish_splits")]) +impl RpcName for PublishSplitsRequest { + fn rpc_name() -> &'static str { + "publish_splits" } } -impl PrometheusLabels<1> for MarkSplitsForDeletionRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([ - std::borrow::Cow::Borrowed("mark_splits_for_deletion"), - ]) +impl RpcName for MarkSplitsForDeletionRequest { + fn rpc_name() -> &'static str { + "mark_splits_for_deletion" } } -impl PrometheusLabels<1> for DeleteSplitsRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("delete_splits")]) +impl RpcName for DeleteSplitsRequest { + fn rpc_name() -> &'static str { + "delete_splits" } } -impl PrometheusLabels<1> for AddSourceRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("add_source")]) +impl RpcName for AddSourceRequest { + fn rpc_name() -> &'static str { + "add_source" } } -impl PrometheusLabels<1> for ToggleSourceRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("toggle_source")]) +impl RpcName for ToggleSourceRequest { + fn rpc_name() -> &'static str { + "toggle_source" } } -impl PrometheusLabels<1> for DeleteSourceRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("delete_source")]) +impl RpcName for DeleteSourceRequest { + fn rpc_name() -> &'static str { + "delete_source" } } -impl PrometheusLabels<1> for ResetSourceCheckpointRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([ - std::borrow::Cow::Borrowed("reset_source_checkpoint"), - ]) +impl RpcName for ResetSourceCheckpointRequest { + fn rpc_name() -> &'static str { + "reset_source_checkpoint" } } -impl PrometheusLabels<1> for LastDeleteOpstampRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("last_delete_opstamp")]) +impl RpcName for LastDeleteOpstampRequest { + fn rpc_name() -> &'static str { + "last_delete_opstamp" } } -impl PrometheusLabels<1> for DeleteQuery { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("delete_query")]) +impl RpcName for DeleteQuery { + fn rpc_name() -> &'static str { + "create_delete_task" } } -impl PrometheusLabels<1> for UpdateSplitsDeleteOpstampRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([ - std::borrow::Cow::Borrowed("update_splits_delete_opstamp"), - ]) +impl RpcName for UpdateSplitsDeleteOpstampRequest { + fn rpc_name() -> &'static str { + "update_splits_delete_opstamp" } } -impl PrometheusLabels<1> for ListDeleteTasksRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("list_delete_tasks")]) +impl RpcName for ListDeleteTasksRequest { + fn rpc_name() -> &'static str { + "list_delete_tasks" } } -impl PrometheusLabels<1> for ListStaleSplitsRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("list_stale_splits")]) +impl RpcName for ListStaleSplitsRequest { + fn rpc_name() -> &'static str { + "list_stale_splits" } } -impl PrometheusLabels<1> for OpenShardsRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("open_shards")]) +impl RpcName for OpenShardsRequest { + fn rpc_name() -> &'static str { + "open_shards" } } -impl PrometheusLabels<1> for AcquireShardsRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("acquire_shards")]) +impl RpcName for AcquireShardsRequest { + fn rpc_name() -> &'static str { + "acquire_shards" } } -impl PrometheusLabels<1> for DeleteShardsRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("delete_shards")]) +impl RpcName for DeleteShardsRequest { + fn rpc_name() -> &'static str { + "delete_shards" } } -impl PrometheusLabels<1> for ListShardsRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("list_shards")]) +impl RpcName for ListShardsRequest { + fn rpc_name() -> &'static str { + "list_shards" } } -impl PrometheusLabels<1> for CreateIndexTemplateRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("create_index_template")]) +impl RpcName for CreateIndexTemplateRequest { + fn rpc_name() -> &'static str { + "create_index_template" } } -impl PrometheusLabels<1> for GetIndexTemplateRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("get_index_template")]) +impl RpcName for GetIndexTemplateRequest { + fn rpc_name() -> &'static str { + "get_index_template" } } -impl PrometheusLabels<1> for FindIndexTemplateMatchesRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([ - std::borrow::Cow::Borrowed("find_index_template_matches"), - ]) +impl RpcName for FindIndexTemplateMatchesRequest { + fn rpc_name() -> &'static str { + "find_index_template_matches" } } -impl PrometheusLabels<1> for ListIndexTemplatesRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed("list_index_templates")]) +impl RpcName for ListIndexTemplatesRequest { + fn rpc_name() -> &'static str { + "list_index_templates" } } -impl PrometheusLabels<1> for DeleteIndexTemplatesRequest { - fn labels(&self) -> OwnedPrometheusLabels<1usize> { - OwnedPrometheusLabels::new([ - std::borrow::Cow::Borrowed("delete_index_templates"), - ]) +impl RpcName for DeleteIndexTemplatesRequest { + fn rpc_name() -> &'static str { + "delete_index_templates" } } pub type MetastoreServiceStream = quickwit_common::ServiceStream< diff --git a/quickwit/quickwit-proto/src/control_plane/mod.rs b/quickwit/quickwit-proto/src/control_plane/mod.rs index 35426a73788..b77ecaf851e 100644 --- a/quickwit/quickwit-proto/src/control_plane/mod.rs +++ b/quickwit/quickwit-proto/src/control_plane/mod.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use quickwit_actors::AskError; +use quickwit_common::tower::RpcName; use thiserror; use crate::metastore::MetastoreError; @@ -99,3 +100,21 @@ impl ServiceError for ControlPlaneError { } } } + +impl RpcName for GetOrCreateOpenShardsRequest { + fn rpc_name() -> &'static str { + "get_or_create_open_shards" + } +} + +impl RpcName for AdviseResetShardsRequest { + fn rpc_name() -> &'static str { + "advise_reset_shards" + } +} + +impl RpcName for GetDebugStateRequest { + fn rpc_name() -> &'static str { + "get_debug_state" + } +} diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index 2465b5686f1..4af856eb3ad 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -25,6 +25,7 @@ use std::{fmt, io}; use anyhow::anyhow; use quickwit_actors::AskError; use quickwit_common::pubsub::Event; +use quickwit_common::tower::RpcName; use serde::{Deserialize, Serialize}; use thiserror; @@ -344,6 +345,12 @@ impl IndexingTask { } } +impl RpcName for ApplyIndexingPlanRequest { + fn rpc_name() -> &'static str { + "apply_indexing_plan" + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index b6d46f992b2..7380f6531c3 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -34,7 +34,7 @@ use quickwit_proto::tonic::transport::Server; use tracing::*; use crate::search_api::GrpcSearchAdapter; -use crate::QuickwitServices; +use crate::{QuickwitServices, INDEXING_GRPC_SERVER_METRICS_LAYER}; /// Starts and binds gRPC services to `grpc_listen_addr`. pub(crate) async fn start_grpc_server( @@ -61,7 +61,9 @@ pub(crate) async fn start_grpc_server( { if let Some(indexing_service) = services.indexing_service_opt.clone() { enabled_grpc_services.insert("indexing"); - let indexing_service = IndexingServiceClient::from_mailbox(indexing_service); + let indexing_service = IndexingServiceClient::tower() + .stack_layer(INDEXING_GRPC_SERVER_METRICS_LAYER.clone()) + .build_from_mailbox(indexing_service); Some(indexing_service.as_grpc_service(max_message_size)) } else { None diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 0750d2be424..33fd11c503e 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -57,6 +57,7 @@ use bytesize::ByteSize; pub use format::BodyFormat; use futures::{Stream, StreamExt}; use itertools::Itertools; +use once_cell::sync::Lazy; use quickwit_actors::{ActorExitStatus, Mailbox, Universe}; use quickwit_cluster::{ start_cluster_service, Cluster, ClusterChange, ClusterMember, ListenerHandle, @@ -67,7 +68,8 @@ use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::spawn_named_task; use quickwit_common::tower::{ BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, ConstantRate, EstimateRateLayer, - EventListenerLayer, RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, + EventListenerLayer, GrpcMetricsLayer, RateLimitLayer, RetryLayer, RetryPolicy, + SmaRateEstimator, }; use quickwit_common::uri::Uri; use quickwit_config::service::QuickwitService; @@ -142,6 +144,26 @@ fn get_metastore_client_max_concurrency() -> usize { .unwrap_or(DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY) } +static CP_GRPC_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("control_plane", "client")); +static CP_GRPC_SERVER_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("control_plane", "server")); + +static INDEXING_GRPC_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("indexing", "client")); +pub(crate) static INDEXING_GRPC_SERVER_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("indexing", "server")); + +static INGEST_GRPC_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("ingest", "client")); +static INGEST_GRPC_SERVER_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("ingest", "server")); + +static METASTORE_GRPC_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("metastore", "client")); +static METASTORE_GRPC_SERVER_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("metastore", "server")); + struct QuickwitServices { pub node_config: Arc, pub cluster: Cluster, @@ -278,16 +300,17 @@ async fn start_control_plane_if_needed( replication_factor, ) .await?; - Ok(ControlPlaneServiceClient::from_mailbox( - control_plane_mailbox, - )) + let server = ControlPlaneServiceClient::tower() + .stack_layer(CP_GRPC_SERVER_METRICS_LAYER.clone()) + .build_from_mailbox(control_plane_mailbox); + Ok(server) } else { let balance_channel = balance_channel_for_service(cluster, QuickwitService::ControlPlane).await; - Ok(ControlPlaneServiceClient::from_balance_channel( - balance_channel, - node_config.grpc_config.max_message_size, - )) + let client = ControlPlaneServiceClient::tower() + .stack_layer(CP_GRPC_CLIENT_METRICS_LAYER.clone()) + .build_from_balance_channel(balance_channel, node_config.grpc_config.max_message_size); + Ok(client) } } @@ -319,6 +342,7 @@ pub async fn serve_quickwit( .stack_add_source_layer(broker_layer.clone()) .stack_delete_source_layer(broker_layer.clone()) .stack_toggle_source_layer(broker_layer) + .stack_layer(METASTORE_GRPC_SERVER_METRICS_LAYER.clone()) .build(metastore); Some(metastore) } else { @@ -342,22 +366,19 @@ pub async fn serve_quickwit( run --service metastore`" ) } - let balance_channel = balance_channel_for_service(&cluster, QuickwitService::Metastore).await; - let metastore_client = MetastoreServiceClient::from_balance_channel( - balance_channel, - grpc_config.max_message_size, - ); - let retry_layer = RetryLayer::new(RetryPolicy::default()); - MetastoreServiceClient::tower() - .stack_layer(retry_layer) - .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( + let layers = ServiceBuilder::new() + .layer(RetryLayer::new(RetryPolicy::default())) + .layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone()) + .layer(tower::limit::GlobalConcurrencyLimitLayer::new( get_metastore_client_max_concurrency(), )) - .build(metastore_client) + .into_inner(); + MetastoreServiceClient::tower() + .stack_layer(layers) + .build_from_balance_channel(balance_channel, grpc_config.max_message_size) }; - // Instantiate a control plane server if the `control-plane` role is enabled on the node. // Otherwise, instantiate a control plane client. let control_plane_service: ControlPlaneServiceClient = start_control_plane_if_needed( @@ -682,7 +703,12 @@ async fn setup_ingest_v2( replication_factor, ); ingest_router.subscribe(event_broker); - let ingest_router_service = IngestRouterServiceClient::new(ingest_router); + + // Any node can serve ingest requests, so we always instantiate an ingest router. + // TODO: I'm not sure that's such a good idea. + let ingest_router_service = IngestRouterServiceClient::tower() + .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) + .build(ingest_router); // We compute the burst limit as something a bit larger than the content length limit, because // we actually rewrite the `\n-delimited format into a tiny bit larger buffer, where the @@ -694,7 +720,7 @@ async fn setup_ingest_v2( ..Default::default() }; // Instantiate ingester. - let ingester_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) { + let ingester_opt = if node_config.is_service_enabled(QuickwitService::Indexer) { let wal_dir_path = node_config.data_dir_path.join("wal"); fs::create_dir_all(&wal_dir_path)?; let ingester = Ingester::try_new( @@ -709,17 +735,16 @@ async fn setup_ingest_v2( ) .await?; ingester.subscribe(event_broker); - let ingester_service = IngesterServiceClient::new(ingester); - Some(ingester_service) + Some(ingester) } else { None }; // Setup ingester pool change stream. - let ingester_service_opt_clone = ingester_service_opt.clone(); + let ingester_opt_clone = ingester_opt.clone(); let cluster_change_stream = cluster.ready_nodes_change_stream().await; let max_message_size = node_config.grpc_config.max_message_size; let ingester_change_stream = cluster_change_stream.filter_map(move |cluster_change| { - let ingester_service_opt = ingester_service_opt_clone.clone(); + let ingester_opt_clone_clone = ingester_opt_clone.clone(); Box::pin(async move { match cluster_change { ClusterChange::Add(node) @@ -728,15 +753,27 @@ async fn setup_ingest_v2( let node_id: NodeId = node.node_id().into(); if node.is_self_node() { - let ingester_service = - ingester_service_opt.expect("ingester service should be initialized"); + // Here, since the service is available locally, we bypass the network stack + // and use the instance directly. However, we still want client-side + // metrics, so we use both metrics layers. + let ingester = ingester_opt_clone_clone + .expect("ingester service should be initialized"); + let layers = ServiceBuilder::new() + .layer(INGEST_GRPC_CLIENT_METRICS_LAYER.clone()) + .layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) + .into_inner(); + let ingester_service = IngesterServiceClient::tower() + .stack_layer(layers) + .build(ingester); Some(Change::Insert(node_id, ingester_service)) } else { - let ingester_service = IngesterServiceClient::from_channel( - node.grpc_advertise_addr(), - node.channel(), - max_message_size, - ); + let ingester_service = IngesterServiceClient::tower() + .stack_layer(INGEST_GRPC_CLIENT_METRICS_LAYER.clone()) + .build_from_channel( + node.grpc_advertise_addr(), + node.channel(), + max_message_size, + ); Some(Change::Insert(node_id, ingester_service)) } } @@ -746,6 +783,12 @@ async fn setup_ingest_v2( }) }); ingester_pool.listen_for_changes(ingester_change_stream); + + let ingester_service_opt = ingester_opt.map(|ingester| { + IngesterServiceClient::tower() + .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) + .build(ingester) + }); Ok((ingest_router_service, ingester_service_opt)) } @@ -853,37 +896,46 @@ fn setup_indexer_pool( let node_id = node.node_id().to_string(); let indexing_tasks = node.indexing_tasks().to_vec(); let indexing_capacity = node.indexing_capacity(); + if node.is_self_node() { - if let Some(indexing_service_clone) = indexing_service_clone_opt { - let client = - IndexingServiceClient::from_mailbox(indexing_service_clone); - Some(Change::Insert( - node_id, - IndexerNodeInfo { - client, - indexing_tasks, - indexing_capacity, - }, - )) - } else { - // That means that cluster thinks we are supposed to have an indexer, - // but we actually don't. - None - } - } else { - let client = IndexingServiceClient::from_channel( - node.grpc_advertise_addr(), - node.channel(), - max_message_size, + // Here, since the service is available locally, we bypass the network stack + // and use the mailbox directly. However, we still want client-side metrics, + // so we use both metrics layers. + let indexing_service_mailbox = indexing_service_clone_opt + .expect("indexing service should be initialized"); + let layers = ServiceBuilder::new() + .layer(INDEXING_GRPC_CLIENT_METRICS_LAYER.clone()) + .layer(INDEXING_GRPC_SERVER_METRICS_LAYER.clone()) + .into_inner(); + let client = IndexingServiceClient::tower() + .stack_layer(layers) + .build_from_mailbox(indexing_service_mailbox); + let change = Change::Insert( + node_id, + IndexerNodeInfo { + client, + indexing_tasks, + indexing_capacity, + }, ); - Some(Change::Insert( + Some(change) + } else { + let client = IndexingServiceClient::tower() + .stack_layer(INDEXING_GRPC_CLIENT_METRICS_LAYER.clone()) + .build_from_channel( + node.grpc_advertise_addr(), + node.channel(), + max_message_size, + ); + let change = Change::Insert( node_id, IndexerNodeInfo { client, indexing_tasks, indexing_capacity, }, - )) + ); + Some(change) } } ClusterChange::Remove(node) => Some(Change::Remove(node.node_id().to_string())),