Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add storage utilization metrics #7868

Merged
merged 16 commits into from
Aug 25, 2021
Merged

Conversation

lct45
Copy link
Contributor

@lct45 lct45 commented Jul 23, 2021

Description

As part of the observability metrics we're adding in Q3, we'd like to see storage metrics. This adds storage metrics by node, task, and query.

Testing done

Tested locally that the metrics picked up, added unit tests, did benchmarking

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@lct45 lct45 requested a review from a team as a code owner July 23, 2021 16:27
@lct45 lct45 requested review from cadonna and rodesai July 23, 2021 16:27
final long totalSpace = f.getTotalSpace();
final double percFree = percentage(freeSpace, (double) totalSpace);
dataPoints.add(new MetricsReporter.DataPoint(sampleTime,"storage-usage", freeSpace));
dataPoints.add(new MetricsReporter.DataPoint(sampleTime,"storage-usage-perc", percFree));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that looking at the raw free space number didn't give me a sense of how full my disk was, but maybe other people automatically know how much space there is total? For task level we don't have total file size so maybe we don't want it here either for consistency

final long totalSpace = f.getTotalSpace();
final long usedSpace = totalSpace - f.getFreeSpace();
final double percFree = percentage((double) usedSpace, (double) totalSpace);
dataPoints.add(new MetricsReporter.DataPoint(sampleTime,"storage-usage", (double) usedSpace));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after comma.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: seems we are only keep three data points here since the next file's usage would simply overwrite the previous ones, is that intentional??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah good point @guozhangwang. Looking back at this I'm really reporting storage usage by query which I do later. I switched it to aggregate all of the files and return the final result as the node reporting. I saw your comment back on the one-pager about if getting the query storage usage here with f.getFreeSpace will be more accurate than aggregating sst-file-size, I'll leave log lines in and see if I can get a read on that during benchmarks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a second pass; but it seems this PR is not complete and ready for review yet? @lct45 please feel free to ping me when it's done.

);
metrics.put(queryId, new ArrayList<>());
}
// We've seen metric for this query before
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for this query's task

private final Collection<TaskStorageMetric> registeredMetrics;

public UtilizationMetricsListener(final KsqlConfig config, final Metrics metricRegistry) {
this.queryStorageMetrics = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more about the differences of these three collections: the metrics map, the queryStorageMetrics map, and the registeredMetrics set. From what I can see it seems:

  • queryStorageMetrics is used for the per-query metrics, but it was not populated and updated anywhere.
  • registeredMetrics is used for per-query-task metrics
  • metrics is also used for storing per-query metrics?

The relationships of these and how they are leveraged / updated are all not very clear to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simplify things here by doing a few things:

First, repurpose TaskStorageMetric as a general purpose gauge that you can use to implement measure by adding up other metrics. It already is this, you just need to drop the notion of a task ID from it. You can rename to AggregatedMetric (or maybe even refactor RocksDBMetricCollector to pull out that inner class and use it).

Then, maintain 2 maps:
Map<String, AggregatedMetric> taskStorageUsage - this maps from QueryID to an aggregated metric that measures usage for the query. It's aggregated metric will contain all the state store metrics for that query.
Map<TaskID, AggregatedMetric> queryStorageUsage - this maps from Task ID to an aggregated metric that measures usage for the task. It's aggregated metric will contain all the state store metrics for that task. You'll probably have to add a TaskID type here that includes the task ID and query ID.

Then, whenever you get an event for a new metric, first compute the QueryID and TaskID. If the task is new, then add a new task-level metric to the metric registry (metricsRegistry) and add a new AggregatedMetric to taskStorageUsage. If the query is new, then add a new query-level metric to the metric registry and add a new AggregatedMetric to queryStorageUSage. Then, add the metric you're being notified on to the AggregatedMetric for the task and query.

Similarly, on a notification that a state store metric is removed, remove the state store metric from the corresponding AggregatedMetric instances. If the AggregatedMetric instance is now empty, then unregister it from the metrics registry.

if (metrics.get(queryId).contains(taskId)) {
// we have this task metric already, just need to update it
resetMetric(metric);
for (TaskStorageMetric storageMetric : registeredMetrics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put this for loop into the resetMetric as well??

}
final String taskId = metric.metricName().tags().getOrDefault("task-id", "");

final String queryId = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should be able to get this from the task id or the thread id, but it depends on whether we're using the new consolidated runtime stuff or not.

currently the thread-id tag will have the form _confluent-ksql-<server id>query_<query ID>-<uuid>-StreamThread-0 for persistent queries, and the form _confluent-ksql-<server id>transient_<query ID>-<uuid>-StreamThread-0 for transient queries.

once the new runtime changes go in, you would need to look in the task ID. @ableegoldman can you advise on how @lct45 would get the query ID from the task ID for the new runtime?

private final Collection<TaskStorageMetric> registeredMetrics;

public UtilizationMetricsListener(final KsqlConfig config, final Metrics metricRegistry) {
this.queryStorageMetrics = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simplify things here by doing a few things:

First, repurpose TaskStorageMetric as a general purpose gauge that you can use to implement measure by adding up other metrics. It already is this, you just need to drop the notion of a task ID from it. You can rename to AggregatedMetric (or maybe even refactor RocksDBMetricCollector to pull out that inner class and use it).

Then, maintain 2 maps:
Map<String, AggregatedMetric> taskStorageUsage - this maps from QueryID to an aggregated metric that measures usage for the query. It's aggregated metric will contain all the state store metrics for that query.
Map<TaskID, AggregatedMetric> queryStorageUsage - this maps from Task ID to an aggregated metric that measures usage for the task. It's aggregated metric will contain all the state store metrics for that task. You'll probably have to add a TaskID type here that includes the task ID and query ID.

Then, whenever you get an event for a new metric, first compute the QueryID and TaskID. If the task is new, then add a new task-level metric to the metric registry (metricsRegistry) and add a new AggregatedMetric to taskStorageUsage. If the query is new, then add a new query-level metric to the metric registry and add a new AggregatedMetric to queryStorageUSage. Then, add the metric you're being notified on to the AggregatedMetric for the task and query.

Similarly, on a notification that a state store metric is removed, remove the state store metric from the corresponding AggregatedMetric instances. If the AggregatedMetric instance is now empty, then unregister it from the metrics registry.

);
metricsSeen.put(queryId, new ArrayList<>());
}
final TaskStorageMetric newMetric = new TaskStorageMetric(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can simplify this bit with a few changes:

  • make metricsSeen a map from query id to a map from task id to TaskStorageMetric (e.g. Map<String, Map<String, TaskStorageMetric>)
  • there's no need for resetAndUpdateMetric - you just need to call add on TaskStorageMetric and the value will get replaced in the map. Also technically that case should never actually happen - consider adding a warning log

Then this bit can just be:

final TaskStorageMetric taskMetric;
if (!metricsSeen.get(queryId).contains(taskId)) {
    taskMetric = new TaskStorageMetric(...);
    metricsSeen.get(queryId).add(taskId, new taskMetric);
    metricRegistry.addMetric(...)
} else {
    taskmetric = metricsSeen.get(queryId).get(taskId);
}
taskMetric.add(metric);

This should simplify removal too since you don't have to iterate the whole list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What case should never happen, calling add on an existing TaskStorageMetric? The rest of this makes sense and makes it cleaner

Matcher matcher = pattern.matcher(queryIdTag);
final String queryId = matcher.find() ? matcher.group(1) : "";
// if we haven't seen a task for this query yet
if (!metricsSeen.containsKey(queryId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything from this line down to the end of this method needs to be in a synchronized block. I wouldn't synchronize this whole method though because most of the time it's called it doesn't need to do anything (since most metrics will fail the initial check).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronized on metricsSeen, not metricsRegistry, right?

this.metricName = metricName;
}

private void add(final KafkaMetric metric) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add, remove, and getValue need to be synchronized - streams threads may be creating/removing state stores while telemetry is reading the value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind - noticed this is a ConcurrentHashMap

}
}

final BigInteger computeQueryMetric(final String queryId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be synchronized with metric addition and removal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, to be extra careful I would call metricValue outside of the lock. metricValue is going to call into Streams, which could in theory (though very unlikely) try to take a lock that another stream thread holds while waiting to get our lock (because that thread is trying to register a metric).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, is this still true since the metric map is a concurrent hash map?

final MetricName nodeTotal =
metricRegistry.metricName("node-storage-total", METRIC_GROUP);
final MetricName nodeUsed =
metricRegistry.metricName("node-storage-used", METRIC_GROUP);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rodesai I can't remember what we landed on for node metrics - IIRC calculated storage used by doing (f.getTotalSpace - f.getFreeSpace) isn't actually accurate for what we want. Given that, do we just want to report f.getFreeSpace? Or f.getFreeSpace and f.getTotalSpace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally for naming these + the other storage metrics, do we want to add a unit suffix? eg _bytes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC calculated storage used by doing (f.getTotalSpace - f.getFreeSpace) isn't actually accurate for what we want. Given that, do we just want to report f.getFreeSpace? Or f.getFreeSpace and f.getTotalSpace?

I thought we determined it was probably accurate, and the disparity we saw was very small (18MB) and probably explained by the workload continuing to run between the time you printed the metrics and the time you looked at df. So we decided to get a PR out and we can test on a cloud instance rather than on our macbooks.

And yeah a _bytes suffix makes sense - good call

final MetricName nodeTotal =
metricRegistry.metricName("node-storage-total", METRIC_GROUP);
final MetricName nodeUsed =
metricRegistry.metricName("node-storage-used", METRIC_GROUP);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC calculated storage used by doing (f.getTotalSpace - f.getFreeSpace) isn't actually accurate for what we want. Given that, do we just want to report f.getFreeSpace? Or f.getFreeSpace and f.getTotalSpace?

I thought we determined it was probably accurate, and the disparity we saw was very small (18MB) and probably explained by the workload continuing to run between the time you printed the metrics and the time you looked at df. So we decided to get a PR out and we can test on a cloud instance rather than on our macbooks.

And yeah a _bytes suffix makes sense - good call

final String queryId = getQueryId(metric);

// if we haven't seen a task for this query yet
synchronized (metricsSeen) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this into another method like private synchronized void handleNewSstFilesSizeMetric(final KafkaMetric metric, final String taskId, final String queryId) {...?

ditto for the other side of this in metricRemoval

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we have to do this because the hashmap is a concurrent hashmap, right? So when we access it we need to do it in a synchronized fashion? Is there something different about using a synchronized function rather than a synchronized block or is the synchronized function easier to use


final String queryId = getQueryId(metric);
final String taskId = metric.metricName().tags().getOrDefault("task-id", "");
final TaskStorageMetric taskMetric = metricsSeen.get(queryId).get(taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move everything below this into another method like:

private synchronized void handleRemovedSstFileSizeMetric(...


private BigInteger computeQueryMetric(final String queryId) {
BigInteger queryMetricSum = BigInteger.ZERO;
for (Map.Entry<String, TaskStorageMetric> entry : metricsSeen.get(queryId).entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to synchronize the bit here that iterates over the map. However we don't want to synchronize the part that gets the metric (to eliminate the risk of a deadlock). So you can write this like:

private BigInteger computeQueryMetric(final String queryId) {
    BigInteger queryMetricSum = BigInteger.ZERO;
    for (final Supplier<BigInteger> gauge : getGaugesForQuery(queryId)) {
        queryMetricSum = queryMetricSum.add(gauge.get());
    }
    return queryMetricSum;
}

private synchronized Collection<Supplier<BigInteger>> getGaugesForQuery(final String queryId) {
    return metricsSeen.get(queryId).values().stream()
       .map(v -> () -> v.getValue())
       .collect(Collectors.toList());
}

import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.streams.StreamsConfig;

public class StorageUtilizationMetrics implements MetricsReporter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: StorageUtilizationMetricsReporter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I'm wondering when would we implement org.apache.kafka.common.metrics.MetricsReporter and when we implement io.confluent.ksql.internal.MetricsReporter, they seem to be serving the same purpose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rodesai may have something more specific here but IIUC io.confluent.ksql.internal.MetricsReporter is used here to report a ksql metric object dataPoint and org.apache.kafka.common.metrics.MetricsReporter is used for KafkaMetrics

final String queryIdTag = metric.metricName().tags().getOrDefault("thread-id", "");
final Pattern pattern = Pattern.compile("(?<=query_|transient_)(.*?)(?=-)");
final Matcher matcher = pattern.matcher(queryIdTag);
return matcher.find() ? matcher.group(1) : "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we ever not find the match? When that happens we would not get the queryId we would put "" into metricsSeen silently. Should we treat it as a bug instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always find a match for queries, although my regex skills aren't incredible so it depends on if anyone else sees any issues on L 233 😉. But I think you're right - we should treat it as a bug. I'm hesitant to crash the app if the queryId isn't showing up for a metric, on the other hand, if we only log an error we likely won't know that we're missing metrics... We also don't want to continue reporting any of these metrics if we don't have a queryID imo, so maybe throwing is the right way to go. LMK what you think


private static class TaskStorageMetric {
final MetricName metricName;
private final Map<MetricName, KafkaMetric> metrics = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, are we having more than one metric here? Since we are filtering on metric.metricName().name().equals("total-sst-files-size") at the first place, we would end up with only that metric name right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, total-sst-files-size is broken down by store, so we may have multiple stores per task. The storeName is included in the tag map so the metric names would be different if you had multiple stores under one task

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, you're right :)

final String taskId
) {
// remove storage metric for this task
taskMetric.remove(metric);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment: if we would only ever have one metric under that task, then we can simplify this a bit.

@lct45 lct45 force-pushed the disk_util branch 2 times, most recently from c1e929a to b231ae0 Compare August 13, 2021 15:57
nodeTotal,
(Gauge<Long>) (config, now) -> baseDir.getTotalSpace()
);
metricRegistry.addMetric(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a storage_utilization metric that returns ((baseDir.getTotalSpace - baseDir.getFreeSpace) / baseDir.getTotalSpace)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no risk of division by 0 here, right? Since our baseDir should always exist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And do you want this all the way up to the metrics api + exposed to users?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be no risk of division by 0 here, right? Since our baseDir should always exist

yeah that should never happen

And do you want this all the way up to the metrics api + exposed to users?

yeah, the max value of this metric (over all the nodes) is what we want to show users in cloud

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lct45 lct45 force-pushed the disk_util branch 2 times, most recently from a72c538 to a6c9d24 Compare August 17, 2021 20:07
@guozhangwang
Copy link
Contributor

LGTM.

@lct45 lct45 force-pushed the disk_util branch 9 times, most recently from c95154a to ed287ab Compare August 24, 2021 17:25
Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lct45 lct45 merged commit 22a8741 into confluentinc:master Aug 25, 2021
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.streams.StreamsConfig;

public class StorageUtilizationMetricsReporter implements MetricsReporter {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you are implementing a new MetricsReporter here.

A MetricsReporter is for reporting metrics via different mechanisms (JMX, etc.). For defining new metrics, you typically just create a new KafkaMetric and call Metrics.addMetric().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants