Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC metrics layer to clients and servers #4591

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ members = [
"quickwit-janitor",
"quickwit-lambda",
"quickwit-macros",
"quickwit-macros/impl",
"quickwit-metastore",

# Disabling metastore-utils from the quickwit projects to ease build/deps.
Expand Down Expand Up @@ -49,17 +48,16 @@ default-members = [
"quickwit-common",
"quickwit-config",
"quickwit-control-plane",
"quickwit-index-management",
"quickwit-datetime",
"quickwit-directories",
"quickwit-doc-mapper",
"quickwit-index-management",
"quickwit-indexing",
"quickwit-ingest",
"quickwit-integration-tests",
"quickwit-jaeger",
"quickwit-janitor",
"quickwit-macros",
"quickwit-macros/impl",
"quickwit-metastore",
"quickwit-opentelemetry",
"quickwit-proto",
Expand Down Expand Up @@ -314,7 +312,6 @@ quickwit-integration-tests = { path = "quickwit-integration-tests" }
quickwit-jaeger = { path = "quickwit-jaeger" }
quickwit-janitor = { path = "quickwit-janitor" }
quickwit-macros = { path = "quickwit-macros" }
quickwit-macros-impl = { path = "quickwit-macros/impl" }
quickwit-metastore = { path = "quickwit-metastore" }
quickwit-opentelemetry = { path = "quickwit-opentelemetry" }
quickwit-proto = { path = "quickwit-proto" }
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ impl Default for CliMetrics {
thread_unpark_duration_microseconds: new_histogram_vec(
"thread_unpark_duration_microseconds",
"Duration for which a thread of the main tokio runtime is unparked.",
"quickwit_cli",
"cli",
&[],
[],
),
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-codegen/example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() {
.with_result_type_path("crate::HelloResult")
.with_error_type_path("crate::HelloError")
.generate_extra_service_methods()
.generate_prom_labels_for_requests()
.generate_rpc_name_impls()
.run()
.unwrap();
}
19 changes: 12 additions & 7 deletions quickwit/quickwit-codegen/example/src/codegen/hello.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 21 additions & 37 deletions quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;

use anyhow::ensure;
use heck::{ToSnakeCase, ToUpperCamelCase};
use proc_macro2::TokenStream;
Expand Down Expand Up @@ -112,7 +110,8 @@ impl CodegenBuilder {
self
}

pub fn generate_prom_labels_for_requests(mut self) -> Self {
/// Generates `RpcName` trait implentations for request types.
pub fn generate_rpc_name_impls(mut self) -> Self {
self.generate_prom_labels_for_requests = true;
self
}
Expand Down Expand Up @@ -340,17 +339,6 @@ struct SynMethod {
}

impl SynMethod {
fn request_prom_label(&self) -> String {
self.request_type
.segments
.last()
.unwrap()
.ident
.to_string()
.trim_end_matches("Request")
.to_snake_case()
}

fn request_type(&self, mock: bool) -> TokenStream {
let request_type = if mock {
let request_type = &self.request_type;
Expand Down Expand Up @@ -406,32 +394,28 @@ impl SynMethod {
}

fn generate_prom_labels_impl_for_requests(context: &CodegenContext) -> TokenStream {
let mut stream = TokenStream::new();
stream.extend(quote! {
use quickwit_common::metrics::{PrometheusLabels, OwnedPrometheusLabels};
});
let mut implemented_request_types: HashSet<String> = HashSet::new();
let mut rpc_name_impls = Vec::new();

for syn_method in &context.methods {
if syn_method.client_streaming {
continue;
}
let request_type = syn_method.request_type(false);
let request_type_snake_case = syn_method.request_prom_label();
if implemented_request_types.contains(&request_type_snake_case) {
continue;
} else {
implemented_request_types.insert(request_type_snake_case.clone());
let method = quote! {
impl PrometheusLabels<1> for #request_type {
fn labels(&self) -> OwnedPrometheusLabels<1usize> {
OwnedPrometheusLabels::new([std::borrow::Cow::Borrowed(#request_type_snake_case),])
}
let request_type = syn_method.request_type.to_token_stream();
let rpc_name = &syn_method.name.to_string();
let rpc_name_impl = quote! {
impl RpcName for #request_type {
fn rpc_name() -> &'static str {
#rpc_name
}
};
stream.extend(method);
}
}
};
rpc_name_impls.extend(rpc_name_impl);
}
if rpc_name_impls.is_empty() {
return TokenStream::new();
}
quote! {
use quickwit_common::tower::RpcName;

#(#rpc_name_impls)*
}
stream
}

fn generate_comment_attributes(comments: &Comments) -> Vec<syn::Attribute> {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-common/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl Default for IoMetrics {
"write_bytes",
"Number of bytes written by a given component in [indexer, merger, deleter, \
split_downloader_{merge,delete}]",
"quickwit",
"",
&[],
["component"],
);
Self { write_bytes }
Expand Down
118 changes: 67 additions & 51 deletions quickwit/quickwit-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::{Borrow, Cow};
use std::collections::HashMap;

use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder};
pub use prometheus::{
Histogram, HistogramTimer, HistogramVec as PrometheusHistogramVec, IntCounter,
IntCounterVec as PrometheusIntCounterVec, IntGauge, IntGaugeVec as PrometheusIntGaugeVec,
};

pub struct OwnedPrometheusLabels<const N: usize> {
labels: [Cow<'static, str>; N],
}

impl<const N: usize> OwnedPrometheusLabels<N> {
pub fn new(labels: [Cow<'static, str>; N]) -> Self {
Self { labels }
}

pub fn borrow_labels(&self) -> [&str; N] {
let mut labels = [""; N];

for (i, label) in self.labels.iter().enumerate() {
labels[i] = label.borrow();
}
labels
}
}

pub trait PrometheusLabels<const N: usize> {
fn labels(&self) -> OwnedPrometheusLabels<N>;
}

#[derive(Clone)]
pub struct HistogramVec<const N: usize> {
underlying: PrometheusHistogramVec,
Expand Down Expand Up @@ -81,63 +58,102 @@ impl<const N: usize> IntGaugeVec<N> {
}
}

pub fn new_counter(name: &str, description: &str, namespace: &str) -> IntCounter {
let counter_opts = Opts::new(name, description).namespace(namespace);
let counter = IntCounter::with_opts(counter_opts).expect("Failed to create counter");
prometheus::register(Box::new(counter.clone())).expect("Failed to register counter");
pub fn new_counter(name: &str, help: &str, subsystem: &str) -> IntCounter {
let counter_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem);
let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter");
prometheus::register(Box::new(counter.clone())).expect("failed to register counter");
counter
}

pub fn new_counter_vec<const N: usize>(
name: &str,
description: &str,
namespace: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
label_names: [&str; N],
) -> IntCounterVec<N> {
let counter_opts = Opts::new(name, description).namespace(namespace);
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let counter_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let underlying = PrometheusIntCounterVec::new(counter_opts, &label_names)
.expect("Failed to create counter vec");
prometheus::register(Box::new(underlying.clone())).expect("Failed to register counter vec");
.expect("failed to create counter vec");

let collector = Box::new(underlying.clone());
prometheus::register(collector).expect("failed to register counter vec");

IntCounterVec { underlying }
}

pub fn new_gauge(name: &str, description: &str, namespace: &str) -> IntGauge {
let gauge_opts = Opts::new(name, description).namespace(namespace);
let gauge = IntGauge::with_opts(gauge_opts).expect("Failed to create gauge");
prometheus::register(Box::new(gauge.clone())).expect("Failed to register gauge");
pub fn new_gauge(name: &str, help: &str, subsystem: &str) -> IntGauge {
let gauge_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem);
let gauge = IntGauge::with_opts(gauge_opts).expect("failed to create gauge");
prometheus::register(Box::new(gauge.clone())).expect("failed to register gauge");
gauge
}

pub fn new_gauge_vec<const N: usize>(
name: &str,
description: &str,
namespace: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
label_names: [&str; N],
) -> IntGaugeVec<N> {
let gauge_opts = Opts::new(name, description).namespace(namespace);
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let gauge_opts = Opts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let underlying =
PrometheusIntGaugeVec::new(gauge_opts, &label_names).expect("Failed to create gauge vec");
prometheus::register(Box::new(underlying.clone())).expect("Failed to register gauge vec");
PrometheusIntGaugeVec::new(gauge_opts, &label_names).expect("failed to create gauge vec");

let collector = Box::new(underlying.clone());
prometheus::register(collector).expect("failed to register counter vec");

IntGaugeVec { underlying }
}

pub fn new_histogram(name: &str, description: &str, namespace: &str) -> Histogram {
let histogram_opts = HistogramOpts::new(name, description).namespace(namespace);
let histogram = Histogram::with_opts(histogram_opts).expect("Failed to create histogram");
prometheus::register(Box::new(histogram.clone())).expect("Failed to register counter");
pub fn new_histogram(name: &str, help: &str, subsystem: &str) -> Histogram {
let histogram_opts = HistogramOpts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem);
let histogram = Histogram::with_opts(histogram_opts).expect("failed to create histogram");
prometheus::register(Box::new(histogram.clone())).expect("failed to register histogram");
histogram
}

pub fn new_histogram_vec<const N: usize>(
name: &str,
description: &str,
namespace: &str,
help: &str,
subsystem: &str,
const_labels: &[(&str, &str)],
label_names: [&str; N],
) -> HistogramVec<N> {
let histogram_opts = HistogramOpts::new(name, description).namespace(namespace);
let owned_const_labels: HashMap<String, String> = const_labels
.iter()
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
.collect();
let histogram_opts = HistogramOpts::new(name, help)
.namespace("quickwit")
.subsystem(subsystem)
.const_labels(owned_const_labels);
let underlying = PrometheusHistogramVec::new(histogram_opts, &label_names)
.expect("Failed to create histogram vec");
prometheus::register(Box::new(underlying.clone())).expect("Failed to register histogram vec");
.expect("failed to create histogram vec");

let collector = Box::new(underlying.clone());
prometheus::register(collector).expect("failed to register histogram vec");

HistogramVec { underlying }
}

Expand Down
10 changes: 10 additions & 0 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use tokio::sync::{mpsc, watch};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream, WatchStream};
use tracing::warn;

use crate::tower::RpcName;

pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Unpin + 'static>>;

/// A stream impl for code-generated services with streaming endpoints.
Expand Down Expand Up @@ -195,6 +197,14 @@ where T: Send + 'static
}
}

impl<T> RpcName for ServiceStream<T>
where T: RpcName
{
fn rpc_name() -> &'static str {
T::rpc_name()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading