Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics for prom route query #1123

Merged
merged 3 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions analytic_engine/src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,17 @@ pub enum Error {
#[snafu(display("Fail to iter in reverse order, err:{}", source))]
IterReverse { source: GenericError },

#[snafu(display("Timeout when iter memtable.\nBacktrace:\n{}", backtrace))]
IterTimeout { backtrace: Backtrace },
#[snafu(display(
"Timeout when iter memtable, now:{:?}, deadline:{:?}.\nBacktrace:\n{}",
now,
deadline,
backtrace
))]
IterTimeout {
now: Instant,
deadline: Instant,
backtrace: Backtrace,
},
}

define_result!(Error);
Expand Down
6 changes: 3 additions & 3 deletions analytic_engine/src/memtable/skiplist/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use generic_error::BoxError;
use log::trace;
use skiplist::{ArenaSlice, IterRef, Skiplist};
use snafu::ResultExt;
use time_ext::InstantExt;

use crate::memtable::{
key::{self, KeySequence},
Expand Down Expand Up @@ -168,8 +167,9 @@ impl<A: Arena<Stats = BasicStats> + Clone + Sync + Send> ColumnarIterImpl<A> {

if num_rows > 0 {
if let Some(deadline) = self.deadline {
if deadline.check_deadline() {
return IterTimeout {}.fail();
let now = Instant::now();
if now.duration_since(deadline).is_zero() {
return IterTimeout { now, deadline }.fail();
}
}

Expand Down
1 change: 1 addition & 0 deletions proxy/src/grpc/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{

impl<Q: QueryExecutor + 'static> Proxy<Q> {
/// Implement prometheus query in grpc service.
/// Note: not used in prod now.
pub async fn handle_prom_query(
&self,
ctx: Context,
Expand Down
6 changes: 5 additions & 1 deletion proxy/src/grpc/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use ceresdbproto::storage::{RouteRequest, RouteResponse};
use query_engine::executor::Executor as QueryExecutor;

use crate::{error, Context, Proxy};
use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_route(&self, _ctx: Context, req: RouteRequest) -> RouteResponse {
Expand All @@ -12,10 +12,14 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let mut resp = RouteResponse::default();
match routes {
Err(e) => {
GRPC_HANDLER_COUNTER_VEC.route_failed.inc();

error!("Failed to handle route, err:{e}");
resp.header = Some(error::build_err_header(e));
}
Ok(v) => {
GRPC_HANDLER_COUNTER_VEC.route_succeeded.inc();

resp.header = Some(error::build_ok_header());
resp.routes = v;
}
Expand Down
8 changes: 6 additions & 2 deletions proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ const STREAM_QUERY_CHANNEL_LEN: usize = 20;

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_sql_query(&self, ctx: Context, req: SqlQueryRequest) -> SqlQueryResponse {
GRPC_HANDLER_COUNTER_VEC.query.inc();
// Incoming query maybe larger than query_failed + query_succeeded for some
// corner case, like lots of time-consuming queries come in at the same time and
// cause server OOM.
GRPC_HANDLER_COUNTER_VEC.incoming_query.inc();

self.hotspot_recorder.inc_sql_query_reqs(&req).await;
match self.handle_sql_query_internal(ctx, req).await {
Err(e) => {
error!("Failed to handle sql query, err:{e}");
GRPC_HANDLER_COUNTER_VEC.query_failed.inc();
error!("Failed to handle sql query, err:{e}");
SqlQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
Expand Down
85 changes: 52 additions & 33 deletions proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_types::{
use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use log::debug;
use log::{debug, error};
use prom_remote_api::types::{
Label, LabelMatcher, Query, QueryResult, RemoteStorage, Sample, TimeSeries, WriteRequest,
};
Expand All @@ -52,9 +52,8 @@ impl reject::Reject for Error {}

impl<Q: QueryExecutor + 'static> Proxy<Q> {
/// Handle write samples to remote storage with remote storage protocol.
async fn handle_prom_write(&self, ctx: RequestContext, req: WriteRequest) -> Result<()> {
async fn handle_prom_remote_write(&self, ctx: RequestContext, req: WriteRequest) -> Result<()> {
let write_table_requests = convert_write_request(req)?;

let num_rows: usize = write_table_requests
.iter()
.map(|req| {
Expand Down Expand Up @@ -85,6 +84,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
HTTP_HANDLER_COUNTER_VEC
.write_failed_row
.inc_by(result.failed as u64);

ErrNoCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("fail to write storage, failed rows:{:?}", result.failed),
Expand All @@ -99,13 +99,14 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
HTTP_HANDLER_COUNTER_VEC
.write_failed_row
.inc_by(num_rows as u64);

Err(e)
}
}
}

/// Handle one query with remote storage protocol.
async fn handle_prom_process_query(
async fn handle_prom_remote_query(
&self,
ctx: &RequestContext,
metric: String,
Expand Down Expand Up @@ -187,7 +188,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
msg: "build request context failed",
})?;

self.handle_prom_process_query(&ctx, metric, query)
self.handle_prom_remote_query(&ctx, metric, query)
.await
.map(|v| PrometheusRemoteQueryResponse {
header: Some(build_ok_header()),
Expand All @@ -202,44 +203,62 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for Proxy<Q> {
type Err = Error;

async fn write(&self, ctx: Self::Context, req: WriteRequest) -> StdResult<(), Self::Err> {
self.handle_prom_write(ctx, req).await
self.handle_prom_remote_write(ctx, req).await
}

async fn process_query(
&self,
ctx: &Self::Context,
query: Query,
) -> StdResult<QueryResult, Self::Err> {
let metric = find_metric(&query.matchers)?;
let remote_req = PrometheusRemoteQueryRequest {
context: Some(ceresdbproto::storage::RequestContext {
database: ctx.schema.to_string(),
}),
query: query.encode_to_vec(),
};
if let Some(resp) = self
.maybe_forward_prom_remote_query(metric.clone(), remote_req)
.await
.map_err(|e| {
log::info!("remote_req forward error {:?}", e);
e
})?
{
match resp {
ForwardResult::Forwarded(resp) => {
return resp.and_then(|v| {
QueryResult::decode(v.response.as_ref())
.box_err()
.context(Internal {
msg: "decode QueryResult failed",
})
});
HTTP_HANDLER_COUNTER_VEC.incoming_prom_query.inc();

let do_query = || async {
let metric = find_metric(&query.matchers)?;
let remote_req = PrometheusRemoteQueryRequest {
context: Some(ceresdbproto::storage::RequestContext {
database: ctx.schema.to_string(),
}),
query: query.encode_to_vec(),
};
if let Some(resp) = self
.maybe_forward_prom_remote_query(metric.clone(), remote_req)
.await
.map_err(|e| {
error!("Forward prom remote query failed, err:{e}");
e
})?
{
match resp {
ForwardResult::Forwarded(resp) => {
return resp.and_then(|v| {
QueryResult::decode(v.response.as_ref())
.box_err()
.context(Internal {
msg: "decode QueryResult failed",
})
});
}
ForwardResult::Local => {}
}
ForwardResult::Local => {}
}
}

self.handle_prom_process_query(ctx, metric, query).await
self.handle_prom_remote_query(ctx, metric, query).await
};

match do_query().await {
Ok(v) => {
HTTP_HANDLER_COUNTER_VEC.prom_query_succeeded.inc();

Ok(v)
}
Err(e) => {
HTTP_HANDLER_COUNTER_VEC.prom_query_failed.inc();

error!("Prom remote query failed, err:{e}");
Err(e)
}
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion proxy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ make_auto_flush_static_metric! {
pub label_enum GrpcTypeKind {
write_succeeded,
write_failed,
incoming_query,
query_succeeded,
query_failed,
query,
route_succeeded,
route_failed,
stream_query_succeeded,
stream_query_failed,
stream_query,
Expand All @@ -29,6 +31,9 @@ make_auto_flush_static_metric! {
pub label_enum HttpTypeKind {
write_failed,
write_failed_row,
incoming_prom_query,
prom_query_succeeded,
prom_query_failed,
}

pub struct HttpHandlerCounterVec: LocalIntCounter {
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let deadline = ctx.timeout.map(|t| begin_instant + t);
let catalog = self.instance.catalog_manager.default_catalog_name();

info!("Handle sql query, request_id:{request_id}, schema:{schema}, sql:{sql}");
info!("Handle sql query, request_id:{request_id}, deadline:{deadline:?}, schema:{schema}, sql:{sql}");

let instance = &self.instance;
// TODO(yingwen): Privilege check, cannot access data of other tenant
Expand Down