Skip to content

Commit

Permalink
Add gRPC metrics layer to clients and servers
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Feb 15, 2024
1 parent 882058d commit 9a959a9
Show file tree
Hide file tree
Showing 37 changed files with 585 additions and 758 deletions.
11 changes: 0 additions & 11 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ members = [
"quickwit-janitor",
"quickwit-lambda",
"quickwit-macros",
"quickwit-macros/impl",
"quickwit-metastore",

# Disabling metastore-utils from the quickwit projects to ease build/deps.
Expand Down Expand Up @@ -49,17 +48,16 @@ default-members = [
"quickwit-common",
"quickwit-config",
"quickwit-control-plane",
"quickwit-index-management",
"quickwit-datetime",
"quickwit-directories",
"quickwit-doc-mapper",
"quickwit-index-management",
"quickwit-indexing",
"quickwit-ingest",
"quickwit-integration-tests",
"quickwit-jaeger",
"quickwit-janitor",
"quickwit-macros",
"quickwit-macros/impl",
"quickwit-metastore",
"quickwit-opentelemetry",
"quickwit-proto",
Expand Down Expand Up @@ -314,7 +312,6 @@ quickwit-integration-tests = { path = "quickwit-integration-tests" }
quickwit-jaeger = { path = "quickwit-jaeger" }
quickwit-janitor = { path = "quickwit-janitor" }
quickwit-macros = { path = "quickwit-macros" }
quickwit-macros-impl = { path = "quickwit-macros/impl" }
quickwit-metastore = { path = "quickwit-metastore" }
quickwit-opentelemetry = { path = "quickwit-opentelemetry" }
quickwit-proto = { path = "quickwit-proto" }
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ impl Default for CliMetrics {
thread_unpark_duration_microseconds: new_histogram_vec(
"thread_unpark_duration_microseconds",
"Duration for which a thread of the main tokio runtime is unparked.",
"quickwit_cli",
"cli",
&[],
[],
),
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-codegen/example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() {
.with_result_type_path("crate::HelloResult")
.with_error_type_path("crate::HelloError")
.generate_extra_service_methods()
.generate_prom_labels_for_requests()
.generate_rpc_name_impls()
.run()
.unwrap();
}
19 changes: 12 additions & 7 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 21 additions & 37 deletions quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;

use anyhow::ensure;
use heck::{ToSnakeCase, ToUpperCamelCase};
use proc_macro2::TokenStream;
Expand Down Expand Up @@ -112,7 +110,8 @@ impl CodegenBuilder {
self
}

pub fn generate_prom_labels_for_requests(mut self) -> Self {
/// Generates `RpcName` trait implentations for request types.
pub fn generate_rpc_name_impls(mut self) -> Self {
self.generate_prom_labels_for_requests = true;
self
}
Expand Down Expand Up @@ -340,17 +339,6 @@ struct SynMethod {
}

impl SynMethod {
fn request_prom_label(&self) -> String {
self.request_type
.segments
.last()
.unwrap()
.ident
.to_string()
.trim_end_matches("Request")
.to_snake_case()
}

fn request_type(&self, mock: bool) -> TokenStream {
let request_type = if mock {
let request_type = &self.request_type;
Expand Down Expand Up @@ -406,32 +394,28 @@ impl SynMethod {
}

fn generate_prom_labels_impl_for_requests(context: &CodegenContext) -> TokenStream {
let mut stream = TokenStream::new();
stream.extend(quote! {
use quickwit_common::metrics::{PrometheusLabels, OwnedPrometheusLabels};
});
let mut implemented_request_types: HashSet<String> = HashSet::new();
let mut rpc_name_impls = Vec::new();

for syn_method in &context.methods {
if syn_method.client_streaming {
continue;
}
let request_type = syn_method.request_type(false);
let request_type_snake_case = syn_method.request_prom_label();
if implemented_request_types.contains(&request_type_snake_case) {
continue;
} else {
implemented_request_types.insert(request_type_snake_case.clone());
let method = quote! {
impl PrometheusLabels<1> for #request_type {
fn labels(&self) -> OwnedPrometheusLabels<1usize> {
OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed(#request_type_snake_case),])
}
let request_type = syn_method.request_type.to_token_stream();
let rpc_name = &syn_method.name.to_string();
let rpc_name_impl = quote! {
impl RpcName for #request_type {
fn rpc_name() -> &'static str {
#rpc_name
}
};
stream.extend(method);
}
}
};
rpc_name_impls.extend(rpc_name_impl);
}
if rpc_name_impls.is_empty() {
return TokenStream::new();
}
quote! {
use quickwit_common::tower::RpcName;

#(#rpc_name_impls)*
}
stream
}

fn generate_comment_attributes(comments: &Comments) -> Vec<syn::Attribute> {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-common/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl Default for IoMetrics {
"write_bytes",
"Number of bytes written by a given component in [indexer, merger, deleter, \
split_downloader_{merge,delete}]",
"quickwit",
"",
&[],
["component"],
);
Self { write_bytes }
Expand Down
118 changes: 67 additions & 51 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::{Borrow, Cow};
use std::collections::HashMap;

use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder};
pub use prometheus::{
Histogram, HistogramTimer, HistogramVec as PrometheusHistogramVec, IntCounter,
IntCounterVec as PrometheusIntCounterVec, IntGauge, IntGaugeVec as PrometheusIntGaugeVec,
};

pub struct OwnedPrometheusLabels<const N: usize> {
labels: [Cow<'static, str>; N],
}

impl<const N: usize> OwnedPrometheusLabels<N> {
pub fn new(labels: [Cow<'static, str>; N]) -> Self {
Self { labels }
}

pub fn borrow_labels(&self) -> [&str; N] {
let mut labels = [""; N];

for (i, label) in self.labels.iter().enumerate() {
labels[i] = label.borrow();
}
labels
}
}

pub trait PrometheusLabels<const N: usize> {
fn labels(&self) -> OwnedPrometheusLabels<N>;
}

#[derive(Clone)]
pub struct HistogramVec<const N: usize> {
underlying: PrometheusHistogramVec,
Expand Down Expand Up @@ -81,63 +58,102 @@ impl<const N: usize> IntGaugeVec<N> {
}
}

pub fn new_counter(name: &str, description: &str, namespace: &str) -> IntCounter {
let counter_opts = Opts::new(name, description).namespace(namespace);
let counter = IntCounter::with_opts(counter_opts).expect("Failed to create counter");
prometheus::register(Box::new(counter.clone())).expect("Failed to register counter");
pub fn new_counter(name: &str, help: &str, subsystem: &str) -> IntCounter {
let counter_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem);
let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter");
prometheus::register(Box::new(counter.clone())).expect("failed to register counter");
counter
}

pub fn new_counter_vec<const N: usize>(
name: &str,
description: &str,
namespace: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
label_names: [&str; N],
) -> IntCounterVec<N> {
let counter_opts = Opts::new(name, description).namespace(namespace);
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let counter_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let underlying = PrometheusIntCounterVec::new(counter_opts, &label_names)
.expect("Failed to create counter vec");
prometheus::register(Box::new(underlying.clone())).expect("Failed to register counter vec");
.expect("failed to create counter vec");

let collector = Box::new(underlying.clone());
prometheus::register(collector).expect("failed to register counter vec");

IntCounterVec { underlying }
}

pub fn new_gauge(name: &str, description: &str, namespace: &str) -> IntGauge {
let gauge_opts = Opts::new(name, description).namespace(namespace);
let gauge = IntGauge::with_opts(gauge_opts).expect("Failed to create gauge");
prometheus::register(Box::new(gauge.clone())).expect("Failed to register gauge");
pub fn new_gauge(name: &str, help: &str, subsystem: &str) -> IntGauge {
let gauge_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem);
let gauge = IntGauge::with_opts(gauge_opts).expect("failed to create gauge");
prometheus::register(Box::new(gauge.clone())).expect("failed to register gauge");
gauge
}

pub fn new_gauge_vec<const N: usize>(
name: &str,
description: &str,
namespace: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
label_names: [&str; N],
) -> IntGaugeVec<N> {
let gauge_opts = Opts::new(name, description).namespace(namespace);
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let gauge_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let underlying =
PrometheusIntGaugeVec::new(gauge_opts, &label_names).expect("Failed to create gauge vec");
prometheus::register(Box::new(underlying.clone())).expect("Failed to register gauge vec");
PrometheusIntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec");

let collector = Box::new(underlying.clone());
prometheus::register(collector).expect("failed to register counter vec");

IntGaugeVec { underlying }
}

pub fn new_histogram(name: &str, description: &str, namespace: &str) -> Histogram {
let histogram_opts = HistogramOpts::new(name, description).namespace(namespace);
let histogram = Histogram::with_opts(histogram_opts).expect("Failed to create histogram");
prometheus::register(Box::new(histogram.clone())).expect("Failed to register counter");
pub fn new_histogram(name: &str, help: &str, subsystem: &str) -> Histogram {
let histogram_opts = HistogramOpts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem);
let histogram = Histogram::with_opts(histogram_opts).expect("failed to create histogram");
prometheus::register(Box::new(histogram.clone())).expect("failed to register histogram");
histogram
}

pub fn new_histogram_vec<const N: usize>(
name: &str,
description: &str,
namespace: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
label_names: [&str; N],
) -> HistogramVec<N> {
let histogram_opts = HistogramOpts::new(name, description).namespace(namespace);
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let histogram_opts = HistogramOpts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let underlying = PrometheusHistogramVec::new(histogram_opts, &label_names)
.expect("Failed to create histogram vec");
prometheus::register(Box::new(underlying.clone())).expect("Failed to register histogram vec");
.expect("failed to create histogram vec");

let collector = Box::new(underlying.clone());
prometheus::register(collector).expect("failed to register histogram vec");

HistogramVec { underlying }
}

Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use tokio::sync::{mpsc, watch};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream};
use tracing::warn;

use crate::tower::RpcName;

pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Unpin + 'static>>;

/// A stream impl for code-generated services with streaming endpoints.
Expand Down Expand Up @@ -195,6 +197,14 @@ where T: Send + 'static
}
}

impl<T> RpcName for ServiceStream<T>
where T: RpcName
{
fn rpc_name() -> &'static str {
T::rpc_name()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 9a959a9

Please sign in to comment.