Skip to content

Commit

Permalink
review notes
Browse files Browse the repository at this point in the history
  • Loading branch information
lct45 committed Aug 13, 2021
1 parent 9cbd755 commit c1e929a
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,25 @@

package io.confluent.ksql.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.metrics.MetricCollectors;
import java.io.File;
import java.math.BigInteger;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import io.confluent.ksql.util.KsqlException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Gauge;
Expand All @@ -37,18 +43,18 @@
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.streams.StreamsConfig;

public class StorageUtilizationMetrics implements MetricsReporter {
private static final String METRIC_GROUP = "ksqldb-utilization";
public class StorageUtilizationMetricsReporter implements MetricsReporter {
private static final String METRIC_GROUP = "ksqldb_utilization";

private final Map<String, Map<String, TaskStorageMetric>> metricsSeen;
private final Metrics metricRegistry;
static AtomicBoolean registeredNodeMetrics = new AtomicBoolean(false);

public StorageUtilizationMetrics() {
public StorageUtilizationMetricsReporter() {
this(MetricCollectors.getMetrics());
}

public StorageUtilizationMetrics(final Metrics metricRegistry) {
public StorageUtilizationMetricsReporter(final Metrics metricRegistry) {
this.metricsSeen = new HashMap<>();
this.metricRegistry = metricRegistry;
}
Expand Down Expand Up @@ -77,11 +83,11 @@ private static void configureShared(final File baseDir, final Metrics metricRegi
return;
}
final MetricName nodeAvailable =
metricRegistry.metricName("node-storage-available", METRIC_GROUP);
metricRegistry.metricName("node_storage_available_bytes", METRIC_GROUP);
final MetricName nodeTotal =
metricRegistry.metricName("node-storage-total", METRIC_GROUP);
metricRegistry.metricName("node_storage_total_bytes", METRIC_GROUP);
final MetricName nodeUsed =
metricRegistry.metricName("node-storage-used", METRIC_GROUP);
metricRegistry.metricName("node_storage_used_bytes", METRIC_GROUP);

metricRegistry.addMetric(
nodeAvailable,
Expand All @@ -102,44 +108,12 @@ public void metricChange(final KafkaMetric metric) {
if (!metric.metricName().name().equals("total-sst-files-size")) {
return;
}
final String taskId = metric.metricName().tags().getOrDefault("task-id", "");
final String queryId = getQueryId(metric);

// if we haven't seen a task for this query yet
synchronized (metricsSeen) {
if (!metricsSeen.containsKey(queryId)) {
metricRegistry.addMetric(
metricRegistry.metricName(
"query-storage-usage",
METRIC_GROUP,
ImmutableMap.of("query-id", queryId)),
(Gauge<BigInteger>) (config, now) -> computeQueryMetric(queryId)
);
metricsSeen.put(queryId, new HashMap<>());
}
final TaskStorageMetric newMetric;
// We haven't seen a metric for this query's task before
if (!metricsSeen.get(queryId).containsKey(taskId)) {
// create a new task level metric to track state store storage usage
newMetric = new TaskStorageMetric(
metricRegistry.metricName(
"task-storage-usage",
METRIC_GROUP,
ImmutableMap.of("task-id", taskId, "query-id", queryId)
));
// add to list of seen task metrics for this query
metricsSeen.get(queryId).put(taskId, newMetric);
// create gauge for task level storage usage
metricRegistry.addMetric(
newMetric.metricName,
(Gauge<BigInteger>) (config, now) -> newMetric.getValue()
);
} else {
// We have this metric already
newMetric = metricsSeen.get(queryId).get(taskId);
}
newMetric.add(metric);
}
handleNewSstFilesSizeMetric(
metric,
metric.metricName().tags().getOrDefault("task-id", ""),
getQueryId(metric)
);
}

@Override
Expand All @@ -153,21 +127,7 @@ public void metricRemoval(final KafkaMetric metric) {
final String taskId = metric.metricName().tags().getOrDefault("task-id", "");
final TaskStorageMetric taskMetric = metricsSeen.get(queryId).get(taskId);

// remove storage metric for this task
taskMetric.remove(metric);
if (taskMetric.metrics.size() == 0) {
// no more storage metrics for this task, can remove task gauge
metricRegistry.removeMetric(taskMetric.metricName);
metricsSeen.get(queryId).remove(taskId);
if (metricsSeen.get(queryId).size() == 0) {
// we've removed all the task metrics for this query, don't need the query metrics anymore
metricRegistry.removeMetric(metricRegistry.metricName(
"query-storage-usage",
METRIC_GROUP,
ImmutableMap.of("query-id", queryId))
);
}
}
handleRemovedSstFileSizeMetric(taskMetric, metric, queryId, taskId);
}

@Override
Expand All @@ -194,26 +154,100 @@ public void contextChange(final MetricsContext metricsContext) {

}

private synchronized void handleNewSstFilesSizeMetric(
final KafkaMetric metric,
final String taskId,
final String queryId
) {
// if we haven't seen a task for this query yet
if (!metricsSeen.containsKey(queryId)) {
metricRegistry.addMetric(
metricRegistry.metricName(
"query_storage_usage",
METRIC_GROUP,
ImmutableMap.of("query-id", queryId)),
(Gauge<BigInteger>) (config, now) -> computeQueryMetric(queryId)
);
metricsSeen.put(queryId, new HashMap<>());
}
final TaskStorageMetric newMetric;
// We haven't seen a metric for this query's task before
if (!metricsSeen.get(queryId).containsKey(taskId)) {
// create a new task level metric to track state store storage usage
newMetric = new TaskStorageMetric(
metricRegistry.metricName(
"task_storage_usage",
METRIC_GROUP,
ImmutableMap.of("task-id", taskId, "query-id", queryId)
));
// add to list of seen task metrics for this query
metricsSeen.get(queryId).put(taskId, newMetric);
// create gauge for task level storage usage
metricRegistry.addMetric(
newMetric.metricName,
(Gauge<BigInteger>) (config, now) -> newMetric.getValue()
);
} else {
// We have this metric already
newMetric = metricsSeen.get(queryId).get(taskId);
}
newMetric.add(metric);
}

private synchronized void handleRemovedSstFileSizeMetric(
final TaskStorageMetric taskMetric,
final KafkaMetric metric,
final String queryId,
final String taskId
) {
// remove storage metric for this task
taskMetric.remove(metric);
if (taskMetric.metrics.size() == 0) {
// no more storage metrics for this task, can remove task gauge
metricRegistry.removeMetric(taskMetric.metricName);
metricsSeen.get(queryId).remove(taskId);
if (metricsSeen.get(queryId).size() == 0) {
// we've removed all the task metrics for this query, don't need the query metrics anymore
metricRegistry.removeMetric(metricRegistry.metricName(
"query-storage-usage",
METRIC_GROUP,
ImmutableMap.of("query-id", queryId))
);
}
}
}

private BigInteger computeQueryMetric(final String queryId) {
BigInteger queryMetricSum = BigInteger.ZERO;
for (Map.Entry<String, TaskStorageMetric> entry : metricsSeen.get(queryId).entrySet()) {
queryMetricSum = queryMetricSum.add(entry.getValue().getValue());
for (final Supplier<BigInteger> gauge : getGaugesForQuery(queryId)) {
queryMetricSum = queryMetricSum.add(gauge.get());
}
return queryMetricSum;
}

private synchronized Collection<Supplier<BigInteger>> getGaugesForQuery(final String queryId) {
return metricsSeen.get(queryId).values().stream()
.map(v -> (Supplier<BigInteger>) v::getValue)
.collect(Collectors.toList());
}

private String getQueryId(final KafkaMetric metric) {
final String queryIdTag = metric.metricName().tags().getOrDefault("thread-id", "");
final Pattern pattern = Pattern.compile("(?<=query_|transient_)(.*?)(?=-)");
final Matcher matcher = pattern.matcher(queryIdTag);
return matcher.find() ? matcher.group(1) : "";
try {
return matcher.group(1);
} catch (IllegalStateException e) {
throw new KsqlException("Missing query ID when reporting utilization metrics");
}
}

@VisibleForTesting
static void reset() {
registeredNodeMetrics.getAndSet(false);
}

public static class TaskStorageMetric {
private static class TaskStorageMetric {
final MetricName metricName;
private final Map<MetricName, KafkaMetric> metrics = new ConcurrentHashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import io.confluent.ksql.execution.streams.metrics.RocksDBMetricsCollector;
import io.confluent.ksql.execution.util.KeyUtil;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.StorageUtilizationMetrics;
import io.confluent.ksql.internal.StorageUtilizationMetricsReporter;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
Expand Down Expand Up @@ -385,7 +385,7 @@ private Map<String, Object> buildStreamsProperties(
updateListProperty(
newStreamsProperties,
StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
StorageUtilizationMetrics.class.getName()
StorageUtilizationMetricsReporter.class.getName()
);
return newStreamsProperties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.math.BigInteger;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThrows;
Expand All @@ -28,14 +28,14 @@
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class StorageUtilizationMetricsTest {
public class StorageUtilizationMetricsReporterTest {

private static final String METRIC_NAME = "total-sst-files-size";
private static final String METRIC_GROUP = "streams-metric";
private static final String THREAD_ID = "_confluent_blahblah_query_CTAS_TEST_1-blahblah";
private static final String TRANSIENT_THREAD_ID = "_confluent_blahblah_transient_blahblah_4-blahblah";

private StorageUtilizationMetrics listener;
private StorageUtilizationMetricsReporter listener;

@Mock
private Metrics metrics;
Expand All @@ -44,7 +44,7 @@ public class StorageUtilizationMetricsTest {

@Before
public void setUp() {
listener = new StorageUtilizationMetrics(metrics);
listener = new StorageUtilizationMetricsReporter(metrics);
when(metrics.metricName(any(), any(), (Map<String, String>) any())).thenAnswer(
a -> new MetricName(a.getArgument(0), a.getArgument(1), "", a.getArgument(2)));
when(metrics.metricName(any(), any())).thenAnswer(
Expand All @@ -54,7 +54,7 @@ public void setUp() {

@After
public void cleanup() {
StorageUtilizationMetrics.reset();
StorageUtilizationMetricsReporter.reset();
}

@Test
Expand Down Expand Up @@ -93,7 +93,7 @@ public void shouldAddNewGauges() {
METRIC_GROUP,
METRIC_NAME,
BigInteger.valueOf(2),
ImmutableMap.of("task-id", "t1", "thread-id", THREAD_ID))
ImmutableMap.of("task-id", "t1", "thread-id", "blhablah"))
);

// When:
Expand Down

0 comments on commit c1e929a

Please sign in to comment.