Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add running metrics for grpc #639

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ prost = { version = "0.11", optional = true }
bytes = { version = "1.0", optional = true }
log = "0.4"
parking_lot = "0.12"
prometheus = { version = "0.13", default-features = false }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
prometheus = { version = "0.13", default-features = false }
prometheus = { version = "0.13", default-features = false, optional = true }

lazy_static = "1"

[workspace]
members = [
Expand All @@ -42,8 +44,9 @@ members = [
exclude = ["xtask"]

[features]
default = ["protobuf-codec", "boringssl"]
default = ["protobuf-codec", "boringssl","prometheus"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should not enable the "prometheus" by default.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It maybe bring the resouce usage.

_secure = []
prometheus = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
prometheus = []
prometheus = ["dep:prometheus"]

protobuf-codec = ["protobuf"]
protobufv3-codec = ["protobufv3"]
prost-codec = ["prost", "bytes"]
Expand Down
117 changes: 109 additions & 8 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,113 @@ use std::sync::mpsc;
use std::sync::Arc;
use std::thread::{Builder as ThreadBuilder, JoinHandle};

use crate::grpc_sys;

use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
use crate::grpc_sys;
use crate::task::CallTag;

// event loop
#[cfg(feature = "prometheus")]
use {
crate::metrics::{
GRPC_POOL_CQ_NEXT_DURATION, GRPC_POOL_EVENT_COUNT_VEC, GRPC_POOL_EXECUTE_DURATION,
GRPC_TASK_WAIT_DURATION,
},
crate::task::resolve,
prometheus::{
core::{AtomicU64, GenericCounter},
Histogram,
},
std::time::Instant,
};

#[cfg(feature = "prometheus")]
pub struct GRPCRunner {
cq_next_duration_his: Histogram,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better use LocalHistogram and periodically flush(e.g. for every 1k calls) to reduce the overhead.

execute_duration_his: Histogram,
wait_duration_his: Histogram,
event_counter: [GenericCounter<AtomicU64>; 6],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
event_counter: [GenericCounter<AtomicU64>; 6],
event_counter: [Counter; 6],

}

#[cfg(feature = "prometheus")]
impl GRPCRunner {
pub fn new(name: &String) -> GRPCRunner {
let cq_next_duration_his = GRPC_POOL_CQ_NEXT_DURATION.with_label_values(&[name]);
let execute_duration_his = GRPC_POOL_EXECUTE_DURATION.with_label_values(&[name]);
let wait_duration_his = GRPC_TASK_WAIT_DURATION.with_label_values(&[name]);
let event_counter = ["batch", "request", "unary", "abort", "action", "spawn"]
.map(|event| GRPC_POOL_EVENT_COUNT_VEC.with_label_values(&[name, event]));
GRPCRunner {
cq_next_duration_his,
execute_duration_his,
wait_duration_his,
event_counter,
}
}

// event loop
pub fn run(&self, tx: mpsc::Sender<CompletionQueue>) {
let cq = Arc::new(CompletionQueueHandle::new());
let worker_info = Arc::new(WorkQueue::new());
let cq = CompletionQueue::new(cq, worker_info);
tx.send(cq.clone()).expect("send back completion queue");
loop {
let now = Instant::now();
let e = cq.next();
self.cq_next_duration_his
.observe(now.elapsed().as_secs_f64());
let now = Instant::now();
match e.type_ {
EventType::GRPC_QUEUE_SHUTDOWN => break,
// timeout should not happen in theory.
EventType::GRPC_QUEUE_TIMEOUT => continue,
EventType::GRPC_OP_COMPLETE => {}
}

let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };
self.resolve(tag, &cq, e.success != 0);
while let Some(work) = unsafe { cq.worker.pop_work() } {
work.finish();
}
self.execute_duration_his
.observe(now.elapsed().as_secs_f64());
}
}

fn resolve(&self, tag: Box<CallTag>, cq: &CompletionQueue, success: bool) {
match *tag {
CallTag::Batch(prom) => {
self.event_counter[0].inc();
prom.resolve(success)
}
CallTag::Request(cb) => {
self.event_counter[1].inc();
cb.resolve(cq, success)
}
CallTag::UnaryRequest(cb) => {
self.event_counter[2].inc();
cb.resolve(cq, success)
}
CallTag::Abort(_) => self.event_counter[3].inc(),
CallTag::Action(prom) => {
self.event_counter[4].inc();
prom.resolve(success)
}
CallTag::Spawn(task) => {
self.event_counter[5].inc();
self.wait_duration_his
.observe(task.reset_push_time().elapsed().as_secs_f64());
resolve(task, success)
}
}
}
}

#[cfg(not(feature = "prometheus"))]
fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
let cq = Arc::new(CompletionQueueHandle::new());
let worker_info = Arc::new(WorkQueue::new());
let cq = CompletionQueue::new(cq, worker_info);
tx.send(cq.clone()).expect("send back completion queue");

loop {
let e = cq.next();
match e.type_ {
Expand All @@ -24,9 +120,7 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
EventType::GRPC_QUEUE_TIMEOUT => continue,
EventType::GRPC_OP_COMPLETE => {}
}

let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };

tag.resolve(&cq, e.success != 0);
while let Some(work) = unsafe { cq.worker.pop_work() } {
work.finish();
Expand Down Expand Up @@ -94,16 +188,23 @@ impl EnvBuilder {
for i in 0..self.cq_count {
let tx_i = tx.clone();
let mut builder = ThreadBuilder::new();
if let Some(ref prefix) = self.name_prefix {
builder = builder.name(format!("{prefix}-{i}"));
}
let name = self
.name_prefix
.as_ref()
.map_or(format!("grpc-pool-{i}"), |prefix| format!("{prefix}-{i}"));
#[cfg(feature = "prometheus")]
let runner = GRPCRunner::new(&name);
builder = builder.name(name);
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
let handle = builder
.spawn(move || {
if let Some(f) = after_start {
f();
}
#[cfg(feature = "prometheus")]
runner.run(tx_i);
#[cfg(not(feature = "prometheus"))]
poll_queue(tx_i);
if let Some(f) = before_stop {
f();
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod env;
mod error;
mod log_util;
mod metadata;
mod metrics;
mod quota;
mod security;
mod server;
Expand Down
43 changes: 43 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

//! Metrics of the grpc pool.

use lazy_static::lazy_static;
use prometheus::*;

lazy_static! {
/// Grpc wait duration of one task.
pub static ref GRPC_TASK_WAIT_DURATION: HistogramVec = register_histogram_vec!(
"grpc_task_wait_duration",
"Bucketed histogram of grpc wait time only for Spawn task",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
)
.unwrap();

// Grpc pool io handle duration .
pub static ref GRPC_POOL_CQ_NEXT_DURATION: HistogramVec = register_histogram_vec!(
"grpc_pool_cp_next_duration",
"Bucketed histogram of grpc pool wait duration from the completion queue",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
)
.unwrap();

// Grpc handle execute duration
pub static ref GRPC_POOL_EXECUTE_DURATION: HistogramVec = register_histogram_vec!(
"grpc_pool_execute_duration",
"Bucketed histogram of grpc pool execute duration for every time",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100ms upper bound is too small here, maybe set the range to 10us~10s is better here.

)
.unwrap();

// Grpc pool event count task .
pub static ref GRPC_POOL_EVENT_COUNT_VEC: IntCounterVec = register_int_counter_vec!(
"grpc_pool_event_task_count",
"Total event task count in grpc pool",
&["name","event"]
)
.unwrap();
}
10 changes: 9 additions & 1 deletion src/task/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! Apparently, to minimize context switch, it's better to bind the future to the
//! same completion queue as its inner call. Hence method `Executor::spawn` is provided.

use std::cell::UnsafeCell;
use std::cell::{RefCell, UnsafeCell};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU8, Ordering};
Expand All @@ -21,6 +21,7 @@ use crate::call::Call;
use crate::cq::{CompletionQueue, WorkQueue};
use crate::error::{Error, Result};
use crate::grpc_sys::{self, grpc_call_error};
use std::time::Instant;

/// A handle to a `Spawn`.
/// Inner future is expected to be polled in the same thread as cq.
Expand Down Expand Up @@ -88,6 +89,7 @@ pub struct SpawnTask {
state: AtomicU8,
kicker: Kicker,
queue: Arc<WorkQueue>,
push_time: RefCell<Instant>,
}

/// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
Expand All @@ -102,9 +104,14 @@ impl SpawnTask {
state: AtomicU8::new(IDLE),
kicker,
queue,
push_time: RefCell::new(Instant::now()),
}
}

pub fn reset_push_time(&self) -> Instant {
self.push_time.replace(Instant::now())
}

/// Marks the state of this task to NOTIFIED.
///
/// Returns true means the task was IDLE, needs to be scheduled.
Expand Down Expand Up @@ -154,6 +161,7 @@ impl ArcWake for SpawnTask {

// It can lead to deadlock if poll the future immediately. So we need to
// defer the work instead.
task.reset_push_time();
if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
// If the queue is shutdown, then the tag will be notified
Expand Down
7 changes: 5 additions & 2 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};

use crate::cq::CompletionQueue;
use parking_lot::Mutex;

use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryRequestCallback};
use self::executor::SpawnTask;
use self::promise::{Action as ActionPromise, Batch as BatchPromise};
use crate::call::server::RequestContext;
use crate::call::{BatchContext, Call};
use crate::cq::CompletionQueue;
use crate::error::{Error, Result};
use crate::server::RequestCallContext;

pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};

#[cfg(feature = "prometheus")]
pub(crate) use self::executor::resolve;
pub(crate) use self::promise::BatchResult;
pub use self::promise::BatchType;

Expand Down Expand Up @@ -170,7 +173,7 @@ impl CallTag {
}
}

/// Resolve the CallTag with given status.
#[allow(dead_code)]
pub fn resolve(self, cq: &CompletionQueue, success: bool) {
match self {
CallTag::Batch(prom) => prom.resolve(success),
Expand Down
Loading