diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java index 0e5a88344d40..3c95a5af98c6 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/internal/QueryStateMetricsReportingListener.java @@ -120,17 +120,21 @@ private static class PerQueryListener { this.metrics = Objects.requireNonNull(metrics, "metrics cannot be null."); this.ticker = Objects.requireNonNull(ticker, "ticker"); + final String type = queryId.toLowerCase().contains("transient") ? "transient_" : "query_" ; + + final String tag = "_confluent-ksql-" + groupPrefix + type + queryId; + this.stateMetricName = metrics.metricName( "query-status", groupPrefix + "ksql-queries", "The current status of the given query.", - Collections.singletonMap("status", queryId)); + Collections.singletonMap("status", tag)); errorMetricName = metrics.metricName( "error-status", groupPrefix + "ksql-queries", "The current error status of the given query, if the state is in ERROR state", - Collections.singletonMap("status", queryId) + Collections.singletonMap("status", tag) ); this.metrics.addMetric(stateMetricName, (Gauge) (config, now) -> state); this.metrics.addMetric(errorMetricName, (Gauge) (config, now) -> error); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java index 4004ff948cae..ae68efdb5043 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/internal/QueryStateMetricsReportingListenerTest.java @@ -53,6 +53,7 @@ public class QueryStateMetricsReportingListenerTest { private static final MetricName METRIC_NAME_2 = new MetricName("dylan", "g1", "d1", ImmutableMap.of()); private static final QueryId QUERY_ID = new QueryId("foo"); + private static final String TAG = "_confluent-ksql-" + "some-prefix-" + "query_" + QUERY_ID.toString(); @Mock private Metrics metrics; @@ -73,7 +74,7 @@ public void setUp() { .thenReturn(METRIC_NAME_2); when(query.getQueryId()).thenReturn(QUERY_ID); - listener = new QueryStateMetricsReportingListener(metrics, ""); + listener = new QueryStateMetricsReportingListener(metrics, "some-prefix-"); } @Test @@ -87,12 +88,12 @@ public void shouldAddMetricOnCreation() { listener.onCreate(serviceContext, metaStore, query); // Then: - verify(metrics).metricName("query-status", "ksql-queries", + verify(metrics).metricName("query-status", "some-prefix-ksql-queries", "The current status of the given query.", - ImmutableMap.of("status", QUERY_ID.toString())); - verify(metrics).metricName("error-status", "ksql-queries", + ImmutableMap.of("status", TAG)); + verify(metrics).metricName("error-status", "some-prefix-ksql-queries", "The current error status of the given query, if the state is in ERROR state", - ImmutableMap.of("status", QUERY_ID.toString())); + ImmutableMap.of("status", TAG)); verify(metrics).addMetric(eq(METRIC_NAME_1), isA(Gauge.class)); verify(metrics).addMetric(eq(METRIC_NAME_2), isA(Gauge.class)); @@ -132,10 +133,10 @@ public void shouldAddMetricWithSuppliedPrefix() { // Then: verify(metrics).metricName("query-status", groupPrefix + "ksql-queries", "The current status of the given query.", - ImmutableMap.of("status", QUERY_ID.toString())); + ImmutableMap.of("status", TAG)); verify(metrics).metricName("error-status", groupPrefix + "ksql-queries", "The current error status of the given query, if the state is in ERROR state", - ImmutableMap.of("status", QUERY_ID.toString())); + ImmutableMap.of("status", TAG)); } @Test