Skip to content

Commit

Permalink
KAFKA-18232: Add share group state topic prune metrics. (#18174)
Browse files Browse the repository at this point in the history
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield <[email protected]>
  • Loading branch information
smjn authored Jan 20, 2025
1 parent 71495a2 commit 06a5e25
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ private CompletableFuture<Void> performRecordPruning(TopicPartition tp) {
fut.completeExceptionally(exp);
return;
}
shareCoordinatorMetrics.recordPrune(
off,
tp
);
fut.complete(null);
// Best effort prevention of issuing duplicate delete calls.
lastPrunedOffsets.put(tp, off);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
Expand All @@ -46,6 +47,8 @@ public class ShareCoordinatorMetrics extends CoordinatorMetrics implements AutoC

public static final String SHARE_COORDINATOR_WRITE_SENSOR_NAME = "ShareCoordinatorWrite";
public static final String SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME = "ShareCoordinatorWriteLatency";
public static final String SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME = "ShareCoordinatorStateTopicPruneSensorName";
private Map<TopicPartition, ShareGroupPruneMetrics> pruneMetrics = new ConcurrentHashMap<>();

/**
* Global sensors. These are shared across all metrics shards.
Expand Down Expand Up @@ -92,6 +95,7 @@ public void close() throws Exception {
SHARE_COORDINATOR_WRITE_SENSOR_NAME,
SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME
).forEach(metrics::removeSensor);
pruneMetrics.values().forEach(v -> metrics.removeSensor(v.pruneSensor.name()));
}

@Override
Expand Down Expand Up @@ -153,4 +157,31 @@ public void record(String sensorName) {
globalSensors.get(sensorName).record();
}
}

public void recordPrune(double value, TopicPartition tp) {
pruneMetrics.computeIfAbsent(tp, k -> new ShareGroupPruneMetrics(tp))
.pruneSensor.record(value);
}

private class ShareGroupPruneMetrics {
private final Sensor pruneSensor;

ShareGroupPruneMetrics(TopicPartition tp) {
String sensorNameSuffix = tp.toString();
Map<String, String> tags = Map.of(
"topic", tp.topic(),
"partition", Integer.toString(tp.partition())
);

pruneSensor = metrics.sensor(SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME + sensorNameSuffix);

pruneSensor.add(
metrics.metricName("last-pruned-offset",
METRICS_GROUP,
"The offset at which the share-group state topic was last pruned.",
tags),
new Value()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -971,11 +972,13 @@ public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception {
CompletableFuture.completedFuture(Optional.of(11L))
);

Metrics metrics = new Metrics();

ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
Expand Down Expand Up @@ -1007,6 +1010,10 @@ public void testRecordPruningTaskPeriodicityWithAllSuccess() throws Exception {

verify(writer, times(2))
.deleteRecords(any(), anyLong());

checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);

service.shutdown();
}

Expand Down Expand Up @@ -1058,11 +1065,13 @@ public void testRecordPruningTaskPeriodicityWithSomeFailures() throws Exception
CompletableFuture.completedFuture(Optional.of(21L))
);

Metrics metrics = new Metrics();

ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
Expand Down Expand Up @@ -1094,6 +1103,11 @@ public void testRecordPruningTaskPeriodicityWithSomeFailures() throws Exception

verify(writer, times(4))
.deleteRecords(any(), anyLong());

checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1, false);

service.shutdown();
}

Expand All @@ -1111,11 +1125,13 @@ public void testRecordPruningTaskException() throws Exception {
any()
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));

Metrics metrics = new Metrics();

ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
Expand All @@ -1139,6 +1155,10 @@ public void testRecordPruningTaskException() throws Exception {

verify(writer, times(0))
.deleteRecords(any(), anyLong());

checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, false);

service.shutdown();
}

Expand All @@ -1156,11 +1176,13 @@ public void testRecordPruningTaskSuccess() throws Exception {
any()
)).thenReturn(CompletableFuture.completedFuture(Optional.of(20L)));

Metrics metrics = new Metrics();

ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
Expand All @@ -1184,6 +1206,9 @@ public void testRecordPruningTaskSuccess() throws Exception {

verify(writer, times(1))
.deleteRecords(any(), eq(20L));

checkMetrics(metrics);

service.shutdown();
}

Expand All @@ -1201,11 +1226,12 @@ public void testRecordPruningTaskEmptyOffsetReturned() throws Exception {
any()
)).thenReturn(CompletableFuture.completedFuture(Optional.empty()));

Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
Expand All @@ -1229,6 +1255,10 @@ public void testRecordPruningTaskEmptyOffsetReturned() throws Exception {

verify(writer, times(0))
.deleteRecords(any(), anyLong());

checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, false);

service.shutdown();
}

Expand Down Expand Up @@ -1257,11 +1287,12 @@ public void testRecordPruningTaskRepeatedSameOffsetForTopic() throws Exception {
CompletableFuture.completedFuture(Optional.of(10L))
);

Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
Expand Down Expand Up @@ -1293,6 +1324,10 @@ public void testRecordPruningTaskRepeatedSameOffsetForTopic() throws Exception {

verify(writer, times(1))
.deleteRecords(any(), anyLong());

checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);

service.shutdown();
}

Expand Down Expand Up @@ -1325,11 +1360,13 @@ public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic() throws Exce
CompletableFuture.completedFuture(Optional.of(10L))
);

Metrics metrics = new Metrics();

ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
new ShareCoordinatorMetrics(),
new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
Expand Down Expand Up @@ -1361,6 +1398,36 @@ public void testRecordPruningTaskRetriesRepeatedSameOffsetForTopic() throws Exce

verify(writer, times(2))
.deleteRecords(any(), anyLong());

checkMetrics(metrics);
checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);

service.shutdown();
}

private void checkMetrics(Metrics metrics) {
Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP),
metrics.metricName("write-total", ShareCoordinatorMetrics.METRICS_GROUP)
));

usualMetrics.forEach(metric -> assertTrue(metrics.metrics().containsKey(metric)));
}

private void checkPruneMetric(Metrics metrics, String topic, int partition, boolean checkPresence) {
boolean isPresent = metrics.metrics().containsKey(
metrics.metricName(
"last-pruned-offset",
ShareCoordinatorMetrics.METRICS_GROUP,
"The offset at which the share-group state topic was last pruned.",
Map.of(
"topic", topic,
"partition", Integer.toString(partition)
)
)
);
assertEquals(checkPresence, isPresent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
Expand All @@ -27,10 +28,12 @@

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;

import static org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME;
import static org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShareCoordinatorMetricsTest {
Expand All @@ -46,14 +49,21 @@ public void testMetricNames() {
metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP)
));

ShareCoordinatorMetrics ignored = new ShareCoordinatorMetrics(metrics);
ShareCoordinatorMetrics coordMetrics = new ShareCoordinatorMetrics(metrics);
for (MetricName metricName : expectedMetrics) {
assertTrue(metrics.metrics().containsKey(metricName));
}

assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
coordMetrics.recordPrune(
10.0,
new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)
);
assertTrue(metrics.metrics().containsKey(pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
}

@Test
public void testGlobalSensors() {
public void testShardGlobalSensors() {
MockTime time = new MockTime();
Metrics metrics = new Metrics(time);
ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics);
Expand All @@ -71,7 +81,43 @@ public void testGlobalSensors() {
assertMetricValue(metrics, metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP), 30.0);
}

@Test
public void testCoordinatorGlobalSensors() {
MockTime time = new MockTime();
Metrics metrics = new Metrics(time);
ShareCoordinatorMetrics coordinatorMetrics = new ShareCoordinatorMetrics(metrics);

coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_SENSOR_NAME);
assertMetricValue(metrics, metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP), 1.0 / 30); //sampled stats
assertMetricValue(metrics, metrics.metricName("write-total", ShareCoordinatorMetrics.METRICS_GROUP), 1.0);

coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, 20);
coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, 30);
assertMetricValue(metrics, metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP), 50.0 / 2);
assertMetricValue(metrics, metrics.metricName("write-latency-max", ShareCoordinatorMetrics.METRICS_GROUP), 30.0);


assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
coordinatorMetrics.recordPrune(
10.0,
new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)
);
assertMetricValue(metrics, pruneMetricName(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1), 10.0);
}

private void assertMetricValue(Metrics metrics, MetricName metricName, double val) {
assertEquals(val, metrics.metric(metricName).metricValue());
}

private MetricName pruneMetricName(Metrics metrics, String topic, Integer partition) {
return metrics.metricName(
"last-pruned-offset",
ShareCoordinatorMetrics.METRICS_GROUP,
"The offset at which the share-group state topic was last pruned.",
Map.of(
"topic", topic,
"partition", Integer.toString(partition)
)
);
}
}

0 comments on commit 06a5e25

Please sign in to comment.