Skip to content

Commit

Permalink
Total data set size in stats (#70625) (#71057)
Browse files Browse the repository at this point in the history
With shared cache searchable snapshots we have shards that have a size
in S3 that differs from the locally occupied disk space. This commit
introduces `store.total_data_set_size` to node and indices stats, allowing to
differ between the two.

Relates #69820
  • Loading branch information
henningandersen authored Mar 31, 2021
1 parent 7bf3780 commit 44ed243
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 47 deletions.
10 changes: 10 additions & 0 deletions docs/reference/cluster/nodes-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,16 @@ Total size of all shards assigned to the node.
(integer)
Total size, in bytes, of all shards assigned to the node.

`total_data_set_size`::
(<<byte-units,byte value>>)
Total data set size of all shards assigned to the node.
This includes the size of shards not stored fully on the node (shared cache searchable snapshots).

`total_data_set_size_in_bytes`::
(integer)
Total data set size, in bytes, of all shards assigned to the node.
This includes the size of shards not stored fully on the node (shared cache searchable snapshots).

`reserved`::
(<<byte-units,byte value>>)
A prediction of how much larger the shard stores on this node will eventually
Expand Down
12 changes: 12 additions & 0 deletions docs/reference/cluster/stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ Total size of all shards assigned to selected nodes.
(integer)
Total size, in bytes, of all shards assigned to selected nodes.

`total_data_set_size`::
(<<byte-units, byte units>>)
Total data set size of all shards assigned to selected nodes.
This includes the size of shards not stored fully on the nodes (shared cache searchable snapshots).

`total_data_set_size_in_bytes`::
(integer)
Total data set size, in bytes, of all shards assigned to selected nodes.
This includes the size of shards not stored fully on the nodes (shared cache searchable snapshots).

`reserved`::
(<<byte-units,byte value>>)
A prediction of how much larger the shard stores will eventually grow due to
Expand Down Expand Up @@ -1238,6 +1248,8 @@ The API returns the following response:
"store": {
"size": "16.2kb",
"size_in_bytes": 16684,
"total_data_set_size": "16.2kb",
"total_data_set_size_in_bytes": 16684,
"reserved": "0b",
"reserved_in_bytes": 0
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
---
"Store stats":
- skip:
version: " - 7.99.99"
reason: "reserved_in_bytes field is not returned in prior versions"
features: [arbitrary_key]
version: " - 7.12.99"
reason: "total_data_set_size added in 7.13"

- do:
nodes.info:
node_id: _master
- set:
nodes._arbitrary_key_: master

- do:
nodes.stats:
metric: [ indices ]
index_metric: [ store ]

- is_false: nodes.$master.discovery
- is_true: nodes.$master.indices.store
- gte: { nodes.$master.indices.store.size_in_bytes: 0 }
- gte: { nodes.$master.indices.store.reserved_in_bytes: -1 }
- set:
nodes.$master.indices.store.size_in_bytes: size_in_bytes
- match: { nodes.$master.indices.store.total_data_set_size_in_bytes: $size_in_bytes }

---
"Store stats bwc":
- skip:
features: [arbitrary_key]
version: " - 7.8.99"
reason: "reserved_in_bytes field is not returned in prior versions"

- do:
nodes.info:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -373,7 +374,7 @@ private long getAvgShardSizeInBytes() throws IOException {
long sum = 0;
int count = 0;
for (IndexShard indexShard : this) {
sum += indexShard.store().stats(0L).sizeInBytes();
sum += indexShard.store().stats(0L, LongUnaryOperator.identity()).sizeInBytes();
count++;
}
if (count == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -1089,15 +1090,16 @@ public GetStats getStats() {
}

public StoreStats storeStats() {
if (DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(indexSettings.getSettings())) {
// if this shard has no disk footprint then its size is reported as 0
return new StoreStats(0, 0);
}
try {
final RecoveryState recoveryState = this.recoveryState;
final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover();
final long reservedBytes = bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover;
return store.stats(reservedBytes);
if (DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(indexSettings.getSettings())) {
// if this shard has no disk footprint then its local size is reported as 0
return store.stats(0, size -> 0);
} else {
final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover();
final long reservedBytes = bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover;
return store.stats(reservedBytes, LongUnaryOperator.identity());
}
} catch (IOException e) {
failShard("Failing shard because of exception during storeStats", e);
throw new ElasticsearchException("io exception while building 'store stats'", e);
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.LongUnaryOperator;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -346,10 +347,12 @@ public CheckIndex.Status checkIndex(PrintStream out) throws IOException {

/**
* @param reservedBytes a prediction of how much larger the store is expected to grow, or {@link StoreStats#UNKNOWN_RESERVED_BYTES}.
* @param localSizeFunction to calculate the local size of the shard based on the shard size.
*/
public StoreStats stats(long reservedBytes) throws IOException {
public StoreStats stats(long reservedBytes, LongUnaryOperator localSizeFunction) throws IOException {
ensureOpen();
return new StoreStats(directory.estimateSize(), reservedBytes);
long sizeInBytes = directory.estimateSize();
return new StoreStats(localSizeFunction.applyAsLong(sizeInBytes), sizeInBytes, reservedBytes);
}

/**
Expand Down
28 changes: 26 additions & 2 deletions server/src/main/java/org/elasticsearch/index/store/StoreStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public class StoreStats implements Writeable, ToXContentFragment {
public static final long UNKNOWN_RESERVED_BYTES = -1L;

public static final Version RESERVED_BYTES_VERSION = Version.V_7_9_0;
public static final Version TOTAL_DATA_SET_SIZE_SIZE_VERSION = Version.V_7_13_0;

private long sizeInBytes;
private long totalDataSetSizeInBytes;
private long reservedSize;

public StoreStats() {
Expand All @@ -40,6 +42,11 @@ public StoreStats(StreamInput in) throws IOException {
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
in.readVLong(); // throttleTimeInNanos
}
if (in.getVersion().onOrAfter(TOTAL_DATA_SET_SIZE_SIZE_VERSION)) {
totalDataSetSizeInBytes = in.readVLong();
} else {
totalDataSetSizeInBytes = sizeInBytes;
}
if (in.getVersion().onOrAfter(RESERVED_BYTES_VERSION)) {
reservedSize = in.readZLong();
} else {
Expand All @@ -49,19 +56,22 @@ public StoreStats(StreamInput in) throws IOException {

/**
* @param sizeInBytes the size of the store in bytes
* @param totalDataSetSizeInBytes the size of the total data set in bytes, can differ from sizeInBytes for shards using shared cache
* storage
* @param reservedSize a prediction of how much larger the store is expected to grow, or {@link StoreStats#UNKNOWN_RESERVED_BYTES}.
*/
public StoreStats(long sizeInBytes, long reservedSize) {
public StoreStats(long sizeInBytes, long totalDataSetSizeInBytes, long reservedSize) {
assert reservedSize == UNKNOWN_RESERVED_BYTES || reservedSize >= 0 : reservedSize;
this.sizeInBytes = sizeInBytes;
this.totalDataSetSizeInBytes = totalDataSetSizeInBytes;
this.reservedSize = reservedSize;
}

public void add(StoreStats stats) {
if (stats == null) {
return;
}
sizeInBytes += stats.sizeInBytes;
totalDataSetSizeInBytes += stats.totalDataSetSizeInBytes;
reservedSize = ignoreIfUnknown(reservedSize) + ignoreIfUnknown(stats.reservedSize);
}

Expand All @@ -85,6 +95,14 @@ public ByteSizeValue getSize() {
return size();
}

public ByteSizeValue totalDataSetSize() {
return new ByteSizeValue(totalDataSetSizeInBytes);
}

public ByteSizeValue getTotalDataSetSize() {
return totalDataSetSize();
}

/**
* A prediction of how much larger this store will eventually grow. For instance, if we are currently doing a peer recovery or restoring
* a snapshot into this store then we can account for the rest of the recovery using this field. A value of {@code -1B} indicates that
Expand All @@ -100,6 +118,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_0_0_alpha1)) {
out.writeVLong(0L); // throttleTimeInNanos
}
if (out.getVersion().onOrAfter(TOTAL_DATA_SET_SIZE_SIZE_VERSION)) {
out.writeVLong(totalDataSetSizeInBytes);
}
if (out.getVersion().onOrAfter(RESERVED_BYTES_VERSION)) {
out.writeZLong(reservedSize);
}
Expand All @@ -109,6 +130,7 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.STORE);
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, size());
builder.humanReadableField(Fields.TOTAL_DATA_SET_SIZE_IN_BYTES, Fields.TOTAL_DATA_SET_SIZE, totalDataSetSize());
builder.humanReadableField(Fields.RESERVED_IN_BYTES, Fields.RESERVED, getReservedSize());
builder.endObject();
return builder;
Expand All @@ -118,6 +140,8 @@ static final class Fields {
static final String STORE = "store";
static final String SIZE = "size";
static final String SIZE_IN_BYTES = "size_in_bytes";
static final String TOTAL_DATA_SET_SIZE = "total_data_set_size";
static final String TOTAL_DATA_SET_SIZE_IN_BYTES = "total_data_set_size_in_bytes";
static final String RESERVED = "reserved";
static final String RESERVED_IN_BYTES = "reserved_in_bytes";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testCreation() {
Path path = createTempDir().resolve("indices").resolve(shardRouting.shardId().getIndex().getUUID())
.resolve(String.valueOf(shardRouting.shardId().id()));
IndexShard indexShard = mock(IndexShard.class);
StoreStats storeStats = new StoreStats(100, 200);
StoreStats storeStats = new StoreStats(100, 150, 200);
when(indexShard.storeStats()).thenReturn(storeStats);
ShardStats shardStats = new ShardStats(shardRouting, new ShardPath(false, path, path, shardRouting.shardId()),
new CommonStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void testErrorCondition() {
assertTrue(
expectThrows(IllegalStateException.class, () ->
TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), state,
new StoreStats(between(1, 100), between(1, 100)), (i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000),
between(1, 100)), "target")
).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));

Expand All @@ -82,7 +83,7 @@ public void testErrorCondition() {
TransportResizeAction.prepareCreateIndexRequest(req,
createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).build()).metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null
, "target");
}
Expand All @@ -96,7 +97,7 @@ public void testErrorCondition() {
createClusterState("source", 8, 1,
Settings.builder().put("index.blocks.write", true).put("index.soft_deletes.enabled", true).build())
.metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> new DocsStats(between(10, 1000), between(1, 10), between(1, 10000)), "target");
});
assertThat(softDeletesError.getMessage(), equalTo("Can't disable [index.soft_deletes.enabled] setting on resize"));
Expand All @@ -117,7 +118,7 @@ public void testErrorCondition() {
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();

TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), clusterState.metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), "target");
}

Expand All @@ -141,15 +142,17 @@ public void testPassNumRoutingShards() {
resizeRequest.getTargetIndexRequest()
.settings(Settings.builder().put("index.number_of_shards", 2).build());
IndexMetadata indexMetadata = clusterState.metadata().index("source");
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, new StoreStats(between(1, 100), between(1, 100)),
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata,
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
null, "target");

resizeRequest.getTargetIndexRequest()
.settings(Settings.builder()
.put("index.number_of_routing_shards", randomIntBetween(2, 10))
.put("index.number_of_shards", 2)
.build());
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, new StoreStats(between(1, 100), between(1, 100)),
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata,
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
null, "target");
}

Expand All @@ -174,7 +177,7 @@ public void testPassNumRoutingShardsAndFail() {
resizeRequest.getTargetIndexRequest()
.settings(Settings.builder().put("index.number_of_shards", numShards * 2).build());
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState.metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)), null, "target");
new StoreStats(between(1, 100), between(0, 100), between(1, 100)), null, "target");

resizeRequest.getTargetIndexRequest()
.settings(Settings.builder()
Expand All @@ -183,7 +186,7 @@ public void testPassNumRoutingShardsAndFail() {
ClusterState finalState = clusterState;
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
() -> TransportResizeAction.prepareCreateIndexRequest(resizeRequest, finalState.metadata().index("source"),
new StoreStats(between(1, 100), between(1, 100)), null, "target"));
new StoreStats(between(1, 100), between(0, 100), between(1, 100)), null, "target"));
assertEquals("cannot provide index.number_of_routing_shards on resize", iae.getMessage());
}

Expand Down Expand Up @@ -211,7 +214,8 @@ public void testShrinkIndexSettings() {
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
target.setWaitForActiveShards(activeShardCount);
CreateIndexClusterStateUpdateRequest request = TransportResizeAction.prepareCreateIndexRequest(
target, clusterState.metadata().index(indexName), new StoreStats(between(1, 100), between(1, 100)), (i) -> stats, "target");
target, clusterState.metadata().index(indexName),
new StoreStats(between(1, 100), between(0, 100), between(1, 100)), (i) -> stats, "target");
assertNotNull(request.recoverFrom());
assertEquals(indexName, request.recoverFrom().getName());
assertEquals("1", request.settings().get("index.number_of_shards"));
Expand Down Expand Up @@ -242,7 +246,8 @@ public void testShrinkWithMaxPrimaryShardSize() {
.settings(Settings.builder().put("index.number_of_shards", 2).build());
assertTrue(
expectThrows(IllegalArgumentException.class, () ->
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, state, new StoreStats(between(1, 100), between(1, 100)),
TransportResizeAction.prepareCreateIndexRequest(resizeRequest, state,
new StoreStats(between(1, 100), between(0, 100), between(1, 100)),
(i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), "target")
).getMessage().startsWith("Cannot set both index.number_of_shards and max_primary_shard_size for the target index"));

Expand All @@ -268,7 +273,7 @@ public void testShrinkWithMaxPrimaryShardSize() {
// each shard's storage will not be greater than the `max_primary_shard_size`
ResizeRequest target1 = new ResizeRequest("target", "source");
target1.setMaxPrimaryShardSize(new ByteSizeValue(2));
StoreStats storeStats = new StoreStats(10, between(1, 100));
StoreStats storeStats = new StoreStats(10, between(0, 100), between(1, 100));
final int targetIndexShardsNum1 = 5;
final ActiveShardCount activeShardCount1 = ActiveShardCount.from(targetIndexShardsNum1);
target1.setWaitForActiveShards(targetIndexShardsNum1);
Expand All @@ -285,7 +290,7 @@ public void testShrinkWithMaxPrimaryShardSize() {
// the shards number of the target index will be equal to the source index's shards number
ResizeRequest target2 = new ResizeRequest("target2", "source");
target2.setMaxPrimaryShardSize(new ByteSizeValue(1));
StoreStats storeStats2 = new StoreStats(100, between(1, 100));
StoreStats storeStats2 = new StoreStats(100, between(0, 100), between(1, 100));
final int targetIndexShardsNum2 = 10;
final ActiveShardCount activeShardCount2 = ActiveShardCount.from(targetIndexShardsNum2);
target2.setWaitForActiveShards(activeShardCount2);
Expand Down
Loading

0 comments on commit 44ed243

Please sign in to comment.