From 1bc82ffa532960483f807436a3193355d4a28054 Mon Sep 17 00:00:00 2001 From: MingjiHan Date: Wed, 8 Jun 2022 22:14:30 -0400 Subject: [PATCH] Fix executor metrics in release mode --- src/stream/src/executor/debug.rs | 2 +- src/stream/src/executor/debug/trace.rs | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/stream/src/executor/debug.rs b/src/stream/src/executor/debug.rs index d2fca04a6b870..820dcdb73e680 100644 --- a/src/stream/src/executor/debug.rs +++ b/src/stream/src/executor/debug.rs @@ -94,7 +94,7 @@ impl DebugExecutor { stream: impl MessageStream + 'static, ) -> impl MessageStream + 'static { // Metrics - let stream = trace::metrics(extra.actor_id, extra.metrics, stream); + let stream = trace::metrics(extra.actor_id, extra.executor_id, extra.metrics, stream); // Epoch check let stream = epoch_check::epoch_check(info, stream); diff --git a/src/stream/src/executor/debug/trace.rs b/src/stream/src/executor/debug/trace.rs index 576f56093f5a0..906cec06372d2 100644 --- a/src/stream/src/executor/debug/trace.rs +++ b/src/stream/src/executor/debug/trace.rs @@ -66,9 +66,14 @@ pub async fn trace( /// Streams wrapped by `metrics` will update actor metrics. #[try_stream(ok = Message, error = StreamExecutorError)] -pub async fn metrics(actor_id: ActorId, metrics: Arc, input: impl MessageStream) { +pub async fn metrics( + actor_id: ActorId, + executor_id: u64, + metrics: Arc, + input: impl MessageStream, +) { let actor_id_string = actor_id.to_string(); - + let executor_id_string = executor_id.to_string(); pin_mut!(input); while let Some(message) = input.next().await.transpose()? { @@ -76,7 +81,7 @@ pub async fn metrics(actor_id: ActorId, metrics: Arc, input: i if chunk.cardinality() > 0 { metrics .executor_row_count - .with_label_values(&[&actor_id_string]) + .with_label_values(&[&actor_id_string, &executor_id_string]) .inc_by(chunk.cardinality() as u64); } }