Skip to content

Commit

Permalink
fix: Fixes pull query latency distribution metrics (#7992)
Browse files Browse the repository at this point in the history
* fix: Fixes pull query latency distribution metrics
  • Loading branch information
AlanConfluent authored Aug 12, 2021
1 parent 58ae6a2 commit c798cd7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class PullQueryExecutorMetrics implements Closeable {

private static final String PULL_QUERY_METRIC_GROUP = "pull-query";
private static final String PULL_REQUESTS = "pull-query-requests";
private static final long MAX_LATENCY_BUCKET_VALUE_MICROS = TimeUnit.SECONDS.toMicros(10);
private static final int NUM_LATENCY_BUCKETS = 1000;

private final List<Sensor> sensors;
private final Sensor localRequestsSensor;
Expand Down Expand Up @@ -523,10 +525,9 @@ private void addRequestMetricsToSensor(
);

sensor.add(new Percentiles(
100,
0,
1000,
BucketSizing.CONSTANT,
4 * NUM_LATENCY_BUCKETS,
MAX_LATENCY_BUCKET_VALUE_MICROS,
BucketSizing.LINEAR,
new Percentile(metrics.metricName(
metricNamePrefix + "-distribution-50",
servicePrefix + PULL_QUERY_METRIC_GROUP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.util.Map;
import java.util.Random;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
Expand Down Expand Up @@ -215,6 +216,38 @@ public void shouldRecordLatency() {
assertThat(detailedTotal, is(1.0));
}

@Test
public void shouldRecordLatencyPercentiles() {
// Given:
when(time.nanoseconds()).thenReturn(600000000L);
pullMetrics.recordLatency(100000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP,
RoutingNodeType.SOURCE_NODE);
pullMetrics.recordLatency(200000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP,
RoutingNodeType.SOURCE_NODE);
pullMetrics.recordLatency(300000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP,
RoutingNodeType.SOURCE_NODE);
pullMetrics.recordLatency(400000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP,
RoutingNodeType.SOURCE_NODE);
pullMetrics.recordLatency(500000000L, PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP,
RoutingNodeType.SOURCE_NODE);

// When:
final double detailed50 = getMetricValue("-detailed-distribution-50",
PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE);
final double detailed75 = getMetricValue("-detailed-distribution-75",
PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE);
final double detailed90 = getMetricValue("-detailed-distribution-90",
PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE);
final double detailed99 = getMetricValue("-detailed-distribution-99",
PullSourceType.NON_WINDOWED, PullPhysicalPlanType.KEY_LOOKUP, RoutingNodeType.SOURCE_NODE);

// Then:
assertThat(detailed50, closeTo(297857.85, 0.1));
assertThat(detailed75, closeTo(398398.39, 0.1));
assertThat(detailed90, closeTo(495555.55, 0.1));
assertThat(detailed99, closeTo(495555.55, 0.1));
}

@Test
public void shouldRecordStatus() {
// Given:
Expand Down

0 comments on commit c798cd7

Please sign in to comment.