Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional BlobCacheMetrics, expose BlobCacheMetrics via SharedBlobCacheService #111730

Merged
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5dacb69
Add callback for copy-to-cache metrics, additional BlobCacheMetrics
nicktindall Aug 9, 2024
c4b6487
Update docs/changelog/111730.yaml
nicktindall Aug 9, 2024
ad68e99
Shorten metric names (exceeded max length)
nicktindall Aug 9, 2024
181b958
Align attribute key name
nicktindall Aug 9, 2024
07dfc5a
Remove callback from method not used by stateless
nicktindall Aug 9, 2024
9c8ee42
Add CachePopulationReason.CacheMiss
nicktindall Aug 9, 2024
752b1ef
Revert "Remove callback from method not used by stateless"
nicktindall Aug 9, 2024
c0189f4
NO_OP -> NOOP
nicktindall Aug 9, 2024
7c34720
Remove metrics from copyToCacheFileAligned not used
nicktindall Aug 12, 2024
69d58ec
Merge remote-tracking branch 'origin/main' into feature/ES-9067_expos…
nicktindall Aug 12, 2024
c823f75
Add todo
nicktindall Aug 12, 2024
10a32f6
Protect against divide-by-zero
nicktindall Aug 12, 2024
439152c
Remove unused CachePopulationReason#LoadCommit
nicktindall Aug 12, 2024
f8dfabc
Merge branch 'main' into feature/ES-9067_expose_cache_copy_metrics
nicktindall Aug 13, 2024
5c38dbf
Simplify blob-cache population notifications, extract interface
nicktindall Aug 13, 2024
d504094
Add time/bytes counter, make throughput MB/s
nicktindall Aug 13, 2024
c8e6d28
Fix metric names
nicktindall Aug 13, 2024
03a8f1d
Fix javadocs
nicktindall Aug 13, 2024
47a0a35
Be more defensive when handling metrics
nicktindall Aug 13, 2024
a71d70b
Don't notify when no bytes were copied
nicktindall Aug 13, 2024
95d2e5f
Add test for SharedBytes#copyToCacheFileAligned
nicktindall Aug 14, 2024
e18a283
Fix comment, ensure room for at least a byte
nicktindall Aug 14, 2024
168efe1
Use strings for metric values
nicktindall Aug 14, 2024
f60d341
Take shard ID as a string, to reduce string concatenation
nicktindall Aug 14, 2024
c938e89
Remove BlobCachePopulationListener
nicktindall Aug 15, 2024
8f5967a
Add source to BlobCacheMetrics
nicktindall Aug 15, 2024
45fb734
Add test, extract CachePopulationSource
nicktindall Aug 15, 2024
71c0b2c
Expose BlobCacheMetrics from SharedBlobCacheService
nicktindall Aug 15, 2024
91405f2
Fix test name
nicktindall Aug 15, 2024
03a0140
Delete docs/changelog/111730.yaml
nicktindall Aug 15, 2024
df97c59
Improve metric and attribute names
nicktindall Aug 15, 2024
8295d35
Randomise BlobCacheMetricsTests
nicktindall Aug 15, 2024
a903534
De-duplicate
nicktindall Aug 15, 2024
e8edc63
Fix spotless
nicktindall Aug 15, 2024
81e2776
Add Unknown CachePopulationSource
nicktindall Aug 15, 2024
b17f316
Apply feedback
nicktindall Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,37 @@
package org.elasticsearch.blobcache;

import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.DoubleHistogram;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class BlobCacheMetrics {
public static final String CACHE_POPULATION_REASON_ATTRIBUTE_KEY = "reason";
public static final String CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY = "source";
public static final String SHARD_ID_ATTRIBUTE_KEY = "shard_id";
public static final String INDEX_ATTRIBUTE_KEY = "index_name";

private final LongCounter cacheMissCounter;
private final LongCounter evictedCountNonZeroFrequency;
private final LongHistogram cacheMissLoadTimes;
private final DoubleHistogram cachePopulateThroughput;
private final LongCounter cachePopulationBytes;
private final LongCounter cachePopulationTime;

public enum CachePopulationReason {
/**
* When warming the cache
*/
Warming,
/**
* When the data we need is not in the cache
*/
CacheMiss
}

This comment was marked as outdated.


public BlobCacheMetrics(MeterRegistry meterRegistry) {
this(
Expand All @@ -33,14 +56,39 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
"es.blob_cache.cache_miss_load_times.histogram",
"The time in milliseconds for populating entries in the blob store resulting from a cache miss, expressed as a histogram.",
"ms"
),
meterRegistry.registerDoubleHistogram(
"es.blob_cache.population.throughput.histogram",
"The throughput when populating the blob store from the cache",
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
"MiB/second"
),
Copy link
Contributor Author

@nicktindall nicktindall Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use MiB/s to work around LongHistogram limit of 131,072

meterRegistry.registerLongCounter(
"es.blob_cache.population.bytes.total",
"The number of bytes that have been loaded into the cache",
"bytes"
),
meterRegistry.registerLongCounter(
"es.blob_cache.population.time.total",
"The time spent copying data into the cache",
"milliseconds"
)
);
}

BlobCacheMetrics(LongCounter cacheMissCounter, LongCounter evictedCountNonZeroFrequency, LongHistogram cacheMissLoadTimes) {
BlobCacheMetrics(
LongCounter cacheMissCounter,
LongCounter evictedCountNonZeroFrequency,
LongHistogram cacheMissLoadTimes,
DoubleHistogram cachePopulateThroughput,
LongCounter cachePopulationBytes,
LongCounter cachePopulationTime
) {
this.cacheMissCounter = cacheMissCounter;
this.evictedCountNonZeroFrequency = evictedCountNonZeroFrequency;
this.cacheMissLoadTimes = cacheMissLoadTimes;
this.cachePopulateThroughput = cachePopulateThroughput;
this.cachePopulationBytes = cachePopulationBytes;
this.cachePopulationTime = cachePopulationTime;
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
}

public static BlobCacheMetrics NOOP = new BlobCacheMetrics(TelemetryProvider.NOOP.getMeterRegistry());
Expand All @@ -56,4 +104,55 @@ public LongCounter getEvictedCountNonZeroFrequency() {
public LongHistogram getCacheMissLoadTimes() {
return cacheMissLoadTimes;
}

/**
* Record the various cache population metrics after a chunk is copied to the cache
*
* @param totalBytesCopied The total number of bytes copied
* @param totalCopyTimeNanos The time taken to copy the bytes in nanoseconds
* @param index The index being loaded
* @param shardId The ID of the shard being loaded
* @param cachePopulationReason The reason for the cache being populated
* @param cachePopulationSource The source from which the data is being loaded
*/
public void recordCachePopulationMetrics(
int totalBytesCopied,
long totalCopyTimeNanos,
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
String index,
int shardId,
CachePopulationReason cachePopulationReason,
CachePopulationSource cachePopulationSource
) {
Map<String, Object> metricAttributes = Map.of(
INDEX_ATTRIBUTE_KEY,
index,
SHARD_ID_ATTRIBUTE_KEY,
shardId,
CACHE_POPULATION_REASON_ATTRIBUTE_KEY,
cachePopulationReason.name(),
CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY,
cachePopulationSource.name()
);
ywangd marked this conversation as resolved.
Show resolved Hide resolved
assert totalBytesCopied > 0 : "We shouldn't be recording zero-sized copies";
cachePopulationBytes.incrementBy(totalBytesCopied, metricAttributes);

// This is almost certainly paranoid, but if we had a very fast/small copy with a very coarse nanosecond timer it might happen?
if (totalCopyTimeNanos > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add a warning log in the else branch similar to how we log a warning if s3 metric does not have a valid request time metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b17f316

I couldn't find the warning you were referring to, but I did add one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request);

cachePopulateThroughput.record(toMebibytesPerSecond(totalBytesCopied, totalCopyTimeNanos), metricAttributes);
cachePopulationTime.incrementBy(TimeUnit.NANOSECONDS.toMillis(totalCopyTimeNanos), metricAttributes);
}
}

/**
* Calculate throughput as MiB/second
*
* @param totalBytes The total number of bytes transferred
* @param totalNanoseconds The time to transfer in nanoseconds
* @return The throughput as MiB/second
*/
private double toMebibytesPerSecond(int totalBytes, long totalNanoseconds) {
double totalSeconds = totalNanoseconds / 1_000_000_000.0;
double totalMegabytes = totalBytes / 1_048_576.0;
return totalMegabytes / totalSeconds;
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used Mebibytes because that's what ByteSizeValue#ofMb uses

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.blobcache;

/**
* The places we populate the cache from
*/
public enum CachePopulationSource {
/**
* When loading data from the blob-store
*/
BlobStore,
/**
* When fetching data from a peer node
*/
Peer
}
Copy link
Contributor Author

@nicktindall nicktindall Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This got extracted out to make it easier to use in InputStreamWithSource in stateless, you could argue that CachePopulationReason should be extracted also for consistency, and I would be open to that, but it's not used elsewhere yet so I left it in BlobCacheMetrics.

Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ public static long calculateCacheSize(Settings settings, long totalFsSize) {
.getBytes();
}

public BlobCacheMetrics getBlobCacheMetrics() {
return blobCacheMetrics;
}

public int getRangeSize() {
return rangeSize;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.blobcache;

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.util.concurrent.TimeUnit;

public class BlobCacheMetricsTests extends ESTestCase {

private RecordingMeterRegistry recordingMeterRegistry;
private BlobCacheMetrics metrics;

@Before
public void createMetrics() {
recordingMeterRegistry = new RecordingMeterRegistry();
metrics = new BlobCacheMetrics(recordingMeterRegistry);
}

public void testRecordCachePopulationMetricsRecordsThroughput() {
int mebiBytesSent = randomIntBetween(1, 4);
int secondsTaken = randomIntBetween(1, 5);
String indexName = randomAlphaOfLength(10);
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
int shardId = randomIntBetween(0, 10);
BlobCacheMetrics.CachePopulationReason cachePopulationReason = randomFrom(BlobCacheMetrics.CachePopulationReason.values());
CachePopulationSource cachePopulationSource = randomFrom(CachePopulationSource.values());
metrics.recordCachePopulationMetrics(
Math.toIntExact(ByteSizeValue.ofMb(mebiBytesSent).getBytes()),
TimeUnit.SECONDS.toNanos(secondsTaken),
indexName,
shardId,
cachePopulationReason,
cachePopulationSource
);
Measurement throughputMeasurement = recordingMeterRegistry.getRecorder()
.getMeasurements(InstrumentType.DOUBLE_HISTOGRAM, "es.blob_cache.population.throughput.histogram")
.get(0);
assertEquals(throughputMeasurement.getDouble(), (double) mebiBytesSent / secondsTaken, 0.0);
assertExpectedAttributesPresent(throughputMeasurement, shardId, indexName, cachePopulationReason, cachePopulationSource);
}

public void testRecordCachePopulationMetricsRecordsTotalBytes() {
int mebiBytesSent = randomIntBetween(1, 4);
int secondsTaken = randomIntBetween(1, 5);
String indexName = randomAlphaOfLength(10);
int shardId = randomIntBetween(0, 10);
BlobCacheMetrics.CachePopulationReason cachePopulationReason = randomFrom(BlobCacheMetrics.CachePopulationReason.values());
CachePopulationSource cachePopulationSource = randomFrom(CachePopulationSource.values());
metrics.recordCachePopulationMetrics(
Math.toIntExact(ByteSizeValue.ofMb(mebiBytesSent).getBytes()),
TimeUnit.SECONDS.toNanos(secondsTaken),
indexName,
shardId,
cachePopulationReason,
cachePopulationSource
);
Measurement totalBytesMeasurement = recordingMeterRegistry.getRecorder()
.getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.population.bytes.total")
.get(0);
assertEquals(totalBytesMeasurement.getLong(), ByteSizeValue.ofMb(mebiBytesSent).getBytes());
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
assertExpectedAttributesPresent(totalBytesMeasurement, shardId, indexName, cachePopulationReason, cachePopulationSource);
}

public void testRecordCachePopulationMetricsRecordsTotalTime() {
int mebiBytesSent = randomIntBetween(1, 4);
int secondsTaken = randomIntBetween(1, 5);
String indexName = randomAlphaOfLength(10);
int shardId = randomIntBetween(0, 10);
BlobCacheMetrics.CachePopulationReason cachePopulationReason = randomFrom(BlobCacheMetrics.CachePopulationReason.values());
CachePopulationSource cachePopulationSource = randomFrom(CachePopulationSource.values());
metrics.recordCachePopulationMetrics(
Math.toIntExact(ByteSizeValue.ofMb(mebiBytesSent).getBytes()),
TimeUnit.SECONDS.toNanos(secondsTaken),
indexName,
shardId,
cachePopulationReason,
cachePopulationSource
);
Measurement totalTimeMeasurement = recordingMeterRegistry.getRecorder()
.getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.population.time.total")
.get(0);
assertEquals(totalTimeMeasurement.getLong(), TimeUnit.SECONDS.toMillis(secondsTaken));
assertExpectedAttributesPresent(totalTimeMeasurement, shardId, indexName, cachePopulationReason, cachePopulationSource);
}

private static void assertExpectedAttributesPresent(
Measurement measurement,
int shardId,
String indexName,
BlobCacheMetrics.CachePopulationReason cachePopulationReason,
CachePopulationSource cachePopulationSource
) {
assertEquals(measurement.attributes().get(BlobCacheMetrics.SHARD_ID_ATTRIBUTE_KEY), shardId);
assertEquals(measurement.attributes().get(BlobCacheMetrics.INDEX_ATTRIBUTE_KEY), indexName);
assertEquals(measurement.attributes().get(BlobCacheMetrics.CACHE_POPULATION_REASON_ATTRIBUTE_KEY), cachePopulationReason.name());
assertEquals(measurement.attributes().get(BlobCacheMetrics.CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY), cachePopulationSource.name());
}
}