Skip to content

Commit

Permalink
fix: add logcal cluster id to observability metrics (#8141)
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 authored Sep 17, 2021
1 parent b6e1262 commit d76a4fc
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class StorageUtilizationMetricsReporter implements MetricsReporter {
private final Map<String, Map<String, TaskStorageMetric>> metricsSeen;
private final Metrics metricRegistry;
static AtomicBoolean registeredNodeMetrics = new AtomicBoolean(false);
private static Map<String, String> customTags = new HashMap<>();

public StorageUtilizationMetricsReporter() {
this(MetricCollectors.getMetrics());
Expand All @@ -72,19 +73,24 @@ public void init(final List<KafkaMetric> list) {
public void configure(final Map<String, ?> map) {
}

public static void configureShared(final File baseDir, final Metrics metricRegistry) {
public static void configureShared(
final File baseDir,
final Metrics metricRegistry,
final Map<String, String> configTags
) {
if (registeredNodeMetrics.getAndSet(true)) {
return;
}
customTags = ImmutableMap.copyOf(configTags);
LOGGER.info("Adding node level storage usage gauges");
final MetricName nodeAvailable =
metricRegistry.metricName("node_storage_free_bytes", METRIC_GROUP);
metricRegistry.metricName("node_storage_free_bytes", METRIC_GROUP, customTags);
final MetricName nodeTotal =
metricRegistry.metricName("node_storage_total_bytes", METRIC_GROUP);
metricRegistry.metricName("node_storage_total_bytes", METRIC_GROUP, customTags);
final MetricName nodeUsed =
metricRegistry.metricName("node_storage_used_bytes", METRIC_GROUP);
metricRegistry.metricName("node_storage_used_bytes", METRIC_GROUP, customTags);
final MetricName nodePct =
metricRegistry.metricName("storage_utilization", METRIC_GROUP);
metricRegistry.metricName("storage_utilization", METRIC_GROUP, customTags);

metricRegistry.addMetric(
nodeAvailable,
Expand Down Expand Up @@ -162,14 +168,16 @@ private synchronized void handleNewSstFilesSizeMetric(
final String taskId,
final String queryId
) {
final Map<String, String> queryMetricTags = getQueryMetricTags(queryId);
final Map<String, String> taskMetricTags = getTaskMetricTags(queryMetricTags, taskId);
LOGGER.debug("Updating disk usage metrics");
// if we haven't seen a task for this query yet
if (!metricsSeen.containsKey(queryId)) {
metricRegistry.addMetric(
metricRegistry.metricName(
"query_storage_used_bytes",
METRIC_GROUP,
ImmutableMap.of("query-id", queryId)),
queryMetricTags),
(Gauge<BigInteger>) (config, now) -> computeQueryMetric(queryId)
);
metricsSeen.put(queryId, new HashMap<>());
Expand All @@ -182,7 +190,7 @@ private synchronized void handleNewSstFilesSizeMetric(
metricRegistry.metricName(
"task_storage_used_bytes",
METRIC_GROUP,
ImmutableMap.of("task-id", taskId, "query-id", queryId)
taskMetricTags
));
// add to list of seen task metrics for this query
metricsSeen.get(queryId).put(taskId, newMetric);
Expand Down Expand Up @@ -215,7 +223,7 @@ private synchronized void handleRemovedSstFileSizeMetric(
metricRegistry.removeMetric(metricRegistry.metricName(
"query_storage_used_bytes",
METRIC_GROUP,
ImmutableMap.of("query-id", queryId))
getQueryMetricTags(queryId))
);
}
}
Expand Down Expand Up @@ -245,6 +253,26 @@ private String getQueryId(final KafkaMetric metric) {
throw new KsqlException("Missing query ID when reporting utilization metrics");
}
}

private Map<String, String> getQueryMetricTags(final String queryId) {
final Map<String, String> queryMetricTags = new HashMap<>(customTags);
queryMetricTags.put("query-id", queryId);
return ImmutableMap.copyOf(queryMetricTags);
}

private Map<String, String> getTaskMetricTags(
final Map<String, String> queryTags,
final String taskId
) {
final Map<String, String> taskMetricTags = new HashMap<>(queryTags);
taskMetricTags.put("task-id", taskId);
return ImmutableMap.copyOf((taskMetricTags));
}

@VisibleForTesting
static void setTags(final Map<String, String> tags) {
customTags = tags;
}

@VisibleForTesting
static void reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -55,6 +54,7 @@ public class PersistentQuerySaturationMetrics implements Runnable {
private static final String STREAMS_THREAD_METRICS_GROUP = "stream-thread-metrics";
private static final String THREAD_ID = "thread-id";
private static final String QUERY_ID = "query-id";
private static Map<String, String> customTags = new HashMap<>();

private final Map<String, KafkaStreamsSaturation> perKafkaStreamsStats = new HashMap<>();
private final KsqlEngine engine;
Expand All @@ -67,9 +67,10 @@ public PersistentQuerySaturationMetrics(
final KsqlEngine engine,
final MetricsReporter reporter,
final Duration window,
final Duration sampleMargin
final Duration sampleMargin,
final Map<String, String> customTags
) {
this(Instant::now, engine, reporter, window, sampleMargin);
this(Instant::now, engine, reporter, window, sampleMargin, customTags);
}

@VisibleForTesting
Expand All @@ -78,13 +79,15 @@ public PersistentQuerySaturationMetrics(
final KsqlEngine engine,
final MetricsReporter reporter,
final Duration window,
final Duration sampleMargin
final Duration sampleMargin,
final Map<String, String> customTags
) {
this.time = Objects.requireNonNull(time, "time");
this.engine = Objects.requireNonNull(engine, "engine");
this.reporter = Objects.requireNonNull(reporter, "reporter");
this.window = Objects.requireNonNull(window, "window");
this.sampleMargin = Objects.requireNonNull(sampleMargin, "sampleMargin");
this.customTags = Objects.requireNonNull(customTags, "customTags");
}

@Override
Expand Down Expand Up @@ -158,11 +161,17 @@ private void report(final Instant now, final double saturation) {
now,
NODE_QUERY_SATURATION,
saturation,
Collections.emptyMap()
customTags
)
)
);
}

private static Map<String, String> getTags(final String key, final String value) {
final Map<String, String> newTags = new HashMap<>(customTags);
newTags.put(key, value);
return newTags;
}

private static final class KafkaStreamsSaturation {
private final Set<QueryId> queryIds = new HashSet<>();
Expand Down Expand Up @@ -190,7 +199,7 @@ private void reportThreadSaturation(
now,
QUERY_THREAD_SATURATION,
saturation,
ImmutableMap.of(THREAD_ID, name)
getTags(THREAD_ID, name)
)
));
}
Expand All @@ -207,7 +216,7 @@ private void reportQuerySaturation(
now,
QUERY_SATURATION,
saturation,
ImmutableMap.of(QUERY_ID, queryId.toString())
getTags(QUERY_ID, queryId.toString())
)
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -41,6 +40,10 @@ public class StorageUtilizationMetricsReporterTest {
private static final String TRANSIENT_THREAD_ID = "_confluent_blahblah_transient_blahblah_4-blahblah";
private static final String TASK_STORAGE_METRIC = "task_storage_used_bytes";
private static final String QUERY_STORAGE_METRIC = "query_storage_used_bytes";
private static final Map<String, String> BASE_TAGS = ImmutableMap.of("logical_cluster_id", "logical-id");
private static final Map<String, String> QUERY_TAGS = ImmutableMap.of("logical_cluster_id", "logical-id", "query-id", "CTAS_TEST_1");
private static final Map<String, String> TASK_ONE_TAGS = ImmutableMap.of("logical_cluster_id", "logical-id", "query-id", "CTAS_TEST_1", "task-id", "t1");
private static final Map<String, String> TASK_TWO_TAGS = ImmutableMap.of("logical_cluster_id", "logical-id", "query-id", "CTAS_TEST_1", "task-id", "t2");

private StorageUtilizationMetricsReporter listener;

Expand All @@ -50,17 +53,11 @@ public class StorageUtilizationMetricsReporterTest {
private ArgumentCaptor<MetricValueProvider<?>> metricValueProvider;

@Before
@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
public void setUp() throws IOException {
public void setUp() {
listener = new StorageUtilizationMetricsReporter(metrics);
when(metrics.metricName(any(), any(), (Map<String, String>) any())).thenAnswer(
a -> new MetricName(a.getArgument(0), a.getArgument(1), "", a.getArgument(2)));
when(metrics.metricName(any(), any())).thenAnswer(
a -> new MetricName(a.getArgument(0), a.getArgument(1), "", Collections.emptyMap()));
final File f = new File("/tmp/storage-test/");
f.getParentFile().mkdirs();
f.createNewFile();
listener.configureShared(f, metrics);
StorageUtilizationMetricsReporter.setTags(BASE_TAGS);
}

@After
Expand All @@ -69,18 +66,25 @@ public void cleanup() {
}

@Test
@SuppressFBWarnings("BX_UNBOXING_IMMEDIATELY_REBOXED")
public void shouldAddNodeMetricsOnConfigure() {
@SuppressFBWarnings({
"BX_UNBOXING_IMMEDIATELY_REBOXED", "" +
"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"
})
public void shouldAddNodeMetricsOnConfigure() throws IOException {
// Given:
final File f = new File("/tmp/storage-test/");
f.getParentFile().mkdirs();
f.createNewFile();
listener.configureShared(f, metrics, BASE_TAGS);

// When:
final Gauge<?> storageFreeGauge = verifyAndGetRegisteredMetric("node_storage_free_bytes", Collections.emptyMap());
final Gauge<?> storageFreeGauge = verifyAndGetRegisteredMetric("node_storage_free_bytes", BASE_TAGS);
final Object storageFreeValue = storageFreeGauge.value(null, 0);
final Gauge<?> storageTotalGauge = verifyAndGetRegisteredMetric("node_storage_total_bytes", Collections.emptyMap());
final Gauge<?> storageTotalGauge = verifyAndGetRegisteredMetric("node_storage_total_bytes", BASE_TAGS);
final Object storageTotalValue = storageTotalGauge.value(null, 0);
final Gauge<?> storageUsedGauge = verifyAndGetRegisteredMetric("node_storage_used_bytes", Collections.emptyMap());
final Gauge<?> storageUsedGauge = verifyAndGetRegisteredMetric("node_storage_used_bytes", BASE_TAGS);
final Object storageUsedValue = storageUsedGauge.value(null, 0);
final Gauge<?> pctUsedGauge = verifyAndGetRegisteredMetric("storage_utilization", Collections.emptyMap());
final Gauge<?> pctUsedGauge = verifyAndGetRegisteredMetric("storage_utilization", BASE_TAGS);
final Object pctUsedValue = pctUsedGauge.value(null, 0);

// Then:
Expand Down Expand Up @@ -112,10 +116,11 @@ public void shouldAddNewGauges() {
);

// When:
final Gauge<?> taskGauge = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, ImmutableMap.of("task-id", "t1", "query-id", "CTAS_TEST_1"));
final Object taskValue = taskGauge.value(null, 0);
final Gauge<?> queryGauge = verifyAndGetRegisteredMetric(QUERY_STORAGE_METRIC, ImmutableMap.of("query-id", "CTAS_TEST_1"));

final Gauge<?> queryGauge = verifyAndGetRegisteredMetric(QUERY_STORAGE_METRIC, QUERY_TAGS);
final Object queryValue = queryGauge.value(null, 0);
final Gauge<?> taskGauge = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS);
final Object taskValue = taskGauge.value(null, 0);

// Then:
assertThat(taskValue, equalTo(BigInteger.valueOf(2)));
Expand All @@ -130,21 +135,21 @@ public void shouldUpdateExistingGauges() {
KAFKA_METRIC_GROUP,
KAFKA_METRIC_NAME,
BigInteger.valueOf(2),
ImmutableMap.of("task-id", "t1", "thread-id", THREAD_ID))
ImmutableMap.of("task-id", "t1", "thread-id", THREAD_ID, "logical_cluster_id", "logical-id"))
);

// When:
listener.metricChange(mockMetric(
KAFKA_METRIC_GROUP,
KAFKA_METRIC_NAME,
BigInteger.valueOf(15),
ImmutableMap.of("task-id", "t1", "thread-id", THREAD_ID))
ImmutableMap.of("task-id", "t1", "thread-id", THREAD_ID, "logical_cluster_id", "logical-id"))
);

// Then:
final Gauge<?> taskGauge = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, ImmutableMap.of("task-id", "t1", "query-id", "CTAS_TEST_1"));
final Gauge<?> taskGauge = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS);
final Object taskValue = taskGauge.value(null, 0);
final Gauge<?> queryGauge = verifyAndGetRegisteredMetric(QUERY_STORAGE_METRIC, ImmutableMap.of("query-id", "CTAS_TEST_1"));
final Gauge<?> queryGauge = verifyAndGetRegisteredMetric(QUERY_STORAGE_METRIC, QUERY_TAGS);
final Object queryValue = queryGauge.value(null, 0);

assertThat(taskValue, equalTo(BigInteger.valueOf(15)));
Expand All @@ -158,22 +163,23 @@ public void shouldCombineTaskMetricsToQueryMetric() {
KAFKA_METRIC_GROUP,
KAFKA_METRIC_NAME,
BigInteger.valueOf(2),
ImmutableMap.of("task-id", "t1", "thread-id", THREAD_ID))
ImmutableMap.of("task-id", "t1", "thread-id", THREAD_ID, "logical_cluster_id", "logical-id"))
);
listener.metricChange(mockMetric(
KAFKA_METRIC_GROUP,
KAFKA_METRIC_NAME,
BigInteger.valueOf(5),
ImmutableMap.of("task-id", "t2", "thread-id", THREAD_ID))
ImmutableMap.of("task-id", "t2", "thread-id", THREAD_ID, "logical_cluster_id", "logical-id"))
);

// Then:
final Gauge<?> taskGaugeOne = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, ImmutableMap.of("task-id", "t1", "query-id", "CTAS_TEST_1"));
final Gauge<?> queryGauge = verifyAndGetRegisteredMetric(QUERY_STORAGE_METRIC, QUERY_TAGS);
final Object queryValue = queryGauge.value(null, 0);
final Gauge<?> taskGaugeOne = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS);
final Object taskValueOne = taskGaugeOne.value(null, 0);
final Gauge<?> taskGaugeTwo = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, ImmutableMap.of("task-id", "t2", "query-id", "CTAS_TEST_1"));
final Gauge<?> taskGaugeTwo = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_TWO_TAGS);
final Object taskValueTwo = taskGaugeTwo.value(null, 0);
final Gauge<?> queryGauge = verifyAndGetRegisteredMetric(QUERY_STORAGE_METRIC, ImmutableMap.of("query-id", "CTAS_TEST_1"));
final Object queryValue = queryGauge.value(null, 0);


assertThat(taskValueOne, equalTo(BigInteger.valueOf(2)));
assertThat(taskValueTwo, equalTo(BigInteger.valueOf(5)));
Expand All @@ -197,7 +203,7 @@ public void shouldCombineStorageMetricsToTaskMetric() {
);

// Then:
final Gauge<?> taskGauge = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, ImmutableMap.of("task-id", "t1", "query-id", "blahblah_4"));
final Gauge<?> taskGauge = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, ImmutableMap.of("task-id", "t1", "query-id", "blahblah_4", "logical_cluster_id", "logical-id"));
final Object taskValue = taskGauge.value(null, 0);
assertThat(taskValue, equalTo(BigInteger.valueOf(7)));
}
Expand All @@ -216,8 +222,8 @@ public void shouldRemoveTaskAndQueryGauges() {
listener.metricRemoval(metric);

// Then:
verifyRemovedMetric(TASK_STORAGE_METRIC, ImmutableMap.of("task-id", "t1", "query-id", "CTAS_TEST_1"));
verifyRemovedMetric(QUERY_STORAGE_METRIC, ImmutableMap.of("query-id", "CTAS_TEST_1"));
verifyRemovedMetric(QUERY_STORAGE_METRIC, QUERY_TAGS);
verifyRemovedMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS);

}

Expand All @@ -235,7 +241,7 @@ public void shouldRemoveObsoleteStateStoreMetrics() {
BigInteger.valueOf(6),
ImmutableMap.of("store-id", "s2", "task-id", "t1", "thread-id", THREAD_ID));
listener.metricChange(metric);
final Gauge<?> taskGauge = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, ImmutableMap.of("task-id", "t1", "query-id", "CTAS_TEST_1"));
final Gauge<?> taskGauge = verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS);
Object taskValue = taskGauge.value(null, 0);
assertThat(taskValue, equalTo(BigInteger.valueOf(8)));

Expand All @@ -257,7 +263,7 @@ public void shouldIgnoreNonSSTMetrics() {
ImmutableMap.of("store-id", "s1", "task-id", "t1", "thread-id", THREAD_ID)));

// Then:
assertThrows(AssertionError.class, () -> verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, ImmutableMap.of("task-id", "t1", "query-id", "CTAS_TEST_1")));
assertThrows(AssertionError.class, () -> verifyAndGetRegisteredMetric(TASK_STORAGE_METRIC, TASK_ONE_TAGS));
}

private KafkaMetric mockMetric(
Expand All @@ -272,7 +278,7 @@ private KafkaMetric mockMetric(
private Gauge<?> verifyAndGetRegisteredMetric(final String name, final Map<String, String> tags) {
verify(metrics).addMetric(
argThat(
n -> n.name().equals(name) && n.tags().equals(tags)
n -> n.name().equals(name) && n.tags().entrySet().equals(tags.entrySet())
),
metricValueProvider.capture()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class PersistentQuerySaturationMetricsTest {
private static final QueryId QUERY_ID1 = new QueryId("hootie");
private static final QueryId QUERY_ID2 = new QueryId("hoo");
private static final QueryId QUERY_ID3 = new QueryId("boom");
private static final Map<String, String> CUSTOM_TAGS = ImmutableMap.of("logical_cluster_id", "logical-id");

@Mock
private MetricsReporter reporter;
Expand Down Expand Up @@ -103,7 +104,8 @@ public void setup() {
engine,
reporter,
WINDOW,
SAMPLE_MARGIN
SAMPLE_MARGIN,
CUSTOM_TAGS
);
}

Expand Down
Loading

0 comments on commit d76a4fc

Please sign in to comment.