Skip to content

Commit

Permalink
Use static empty store files metadata (#84034)
Browse files Browse the repository at this point in the history
In a large cluster we expect most nodes not to have a copy of most
shards, but today during replica shard allocation we create a new (and
nontrivial) object for each node that has no copy of a shard. With this
commit we check at deserialization time whether the response is empty
and, if so, avoid the unnecessary instantiation.

Relates #77466
  • Loading branch information
DaveCTurner authored Feb 17, 2022
1 parent e271450 commit 1b7b2a1
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 66 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/84034.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84034
summary: Use static empty store files metadata
area: Allocation
type: enhancement
issues: []
44 changes: 16 additions & 28 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
Expand Down Expand Up @@ -759,25 +760,17 @@ public String toString() {
*/
public static final class MetadataSnapshot implements Iterable<StoreFileMetadata>, Writeable {
private final Map<String, StoreFileMetadata> metadata;

public static final MetadataSnapshot EMPTY = new MetadataSnapshot();

private final Map<String, String> commitUserData;

private final long numDocs;

public static final MetadataSnapshot EMPTY = new MetadataSnapshot(emptyMap(), emptyMap(), 0L);

public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, String> commitUserData, long numDocs) {
this.metadata = metadata;
this.commitUserData = commitUserData;
this.numDocs = numDocs;
}

MetadataSnapshot() {
metadata = emptyMap();
commitUserData = emptyMap();
numDocs = 0;
}

MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
metadata = loadedMetadata.fileMetadata;
Expand All @@ -786,26 +779,21 @@ public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, Str
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
}

/**
* Read from a stream.
*/
public MetadataSnapshot(StreamInput in) throws IOException {
final int size = in.readVInt();
Map<String, StoreFileMetadata> metadata = new HashMap<>();
for (int i = 0; i < size; i++) {
StoreFileMetadata meta = new StoreFileMetadata(in);
metadata.put(meta.name(), meta);
}
Map<String, String> commitUserData = new HashMap<>();
int num = in.readVInt();
for (int i = num; i > 0; i--) {
commitUserData.put(in.readString(), in.readString());
public static MetadataSnapshot readFrom(StreamInput in) throws IOException {
final int metadataSize = in.readVInt();
final Map<String, StoreFileMetadata> metadata = metadataSize == 0 ? emptyMap() : Maps.newMapWithExpectedSize(metadataSize);
for (int i = 0; i < metadataSize; i++) {
final var storeFileMetadata = new StoreFileMetadata(in);
metadata.put(storeFileMetadata.name(), storeFileMetadata);
}
final var commitUserData = in.readMap(StreamInput::readString, StreamInput::readString);
final var numDocs = in.readLong();

this.metadata = unmodifiableMap(metadata);
this.commitUserData = unmodifiableMap(commitUserData);
this.numDocs = in.readLong();
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
if (metadataSize == 0 && commitUserData.size() == 0 && numDocs == 0) {
return MetadataSnapshot.EMPTY;
} else {
return new MetadataSnapshot(metadata, commitUserData, numDocs);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public RecoveryCleanFilesRequest(
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
snapshotFiles = new Store.MetadataSnapshot(in);
snapshotFiles = Store.MetadataSnapshot.readFrom(in);
totalTranslogOps = in.readVInt();
globalCheckpoint = in.readZLong();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public StartRecoveryRequest(StreamInput in) throws IOException {
targetAllocationId = in.readString();
sourceNode = new DiscoveryNode(in);
targetNode = new DiscoveryNode(in);
metadataSnapshot = new Store.MetadataSnapshot(in);
metadataSnapshot = Store.MetadataSnapshot.readFrom(in);
primaryRelocation = in.readBoolean();
startingSeqNo = in.readLong();
if (in.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -45,12 +46,13 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;

public class TransportNodesListShardStoreMetadata extends TransportNodesAction<
TransportNodesListShardStoreMetadata.Request,
TransportNodesListShardStoreMetadata.NodesStoreFilesMetadata,
Expand Down Expand Up @@ -132,18 +134,17 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
if (indexShard != null) {
try {
final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata(
shardId,
indexShard.snapshotStoreMetadata(),
indexShard.getPeerRecoveryRetentionLeases()
);
exists = true;
return storeFilesMetadata;
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e);
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
return StoreFilesMetadata.EMPTY;
} catch (IOException e) {
logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e);
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
return StoreFilesMetadata.EMPTY;
}
}
}
Expand All @@ -166,7 +167,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
}
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);
if (shardPath == null) {
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
return StoreFilesMetadata.EMPTY;
}
// note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means:
// 1) a shard is being constructed, which means the master will not use a copy of this replica
Expand All @@ -180,7 +181,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
);
// We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when
// we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard.
return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList());
return new StoreFilesMetadata(metadataSnapshot, emptyList());
} finally {
TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
if (exists) {
Expand All @@ -192,37 +193,43 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
}

public static class StoreFilesMetadata implements Iterable<StoreFileMetadata>, Writeable {
private final ShardId shardId;
private final Store.MetadataSnapshot metadataSnapshot;
private final List<RetentionLease> peerRecoveryRetentionLeases;

public StoreFilesMetadata(
ShardId shardId,
Store.MetadataSnapshot metadataSnapshot,
List<RetentionLease> peerRecoveryRetentionLeases
) {
this.shardId = shardId;
private static final ShardId FAKE_SHARD_ID = new ShardId("_na_", "_na_", 0);
public static final StoreFilesMetadata EMPTY = new StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList());

public StoreFilesMetadata(Store.MetadataSnapshot metadataSnapshot, List<RetentionLease> peerRecoveryRetentionLeases) {
this.metadataSnapshot = metadataSnapshot;
this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases;
}

public StoreFilesMetadata(StreamInput in) throws IOException {
this.shardId = new ShardId(in);
this.metadataSnapshot = new Store.MetadataSnapshot(in);
this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new);
public static StoreFilesMetadata readFrom(StreamInput in) throws IOException {
if (in.getVersion().before(Version.V_8_2_0)) {
new ShardId(in);
}
final var metadataSnapshot = Store.MetadataSnapshot.readFrom(in);
final var peerRecoveryRetentionLeases = in.readList(RetentionLease::new);
if (metadataSnapshot == Store.MetadataSnapshot.EMPTY && peerRecoveryRetentionLeases.isEmpty()) {
return EMPTY;
} else {
return new StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
if (out.getVersion().before(Version.V_8_2_0)) {
// no compatible version cares about the shard ID, we can just make one up
FAKE_SHARD_ID.writeTo(out);

// NB only checked this for versions back to 7.17.0, we are assuming that we don't use this with earlier versions:
assert out.getVersion().onOrAfter(Version.V_7_17_0) : out.getVersion();
}
metadataSnapshot.writeTo(out);
out.writeList(peerRecoveryRetentionLeases);
}

public ShardId shardId() {
return this.shardId;
}

public boolean isEmpty() {
return metadataSnapshot.size() == 0;
}
Expand Down Expand Up @@ -267,8 +274,6 @@ public String syncId() {
@Override
public String toString() {
return "StoreFilesMetadata{"
+ ", shardId="
+ shardId
+ ", metadataSnapshot{size="
+ metadataSnapshot.size()
+ ", syncId="
Expand Down Expand Up @@ -385,7 +390,7 @@ public static class NodeStoreFilesMetadata extends BaseNodeResponse {

public NodeStoreFilesMetadata(StreamInput in, DiscoveryNode node) throws IOException {
super(in, node);
storeFilesMetadata = new StoreFilesMetadata(in);
storeFilesMetadata = StoreFilesMetadata.readFrom(in);
}

public NodeStoreFilesMetadata(DiscoveryNode node, StoreFilesMetadata storeFilesMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,6 @@ TestAllocator addData(
data.put(
node,
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(
shardId,
new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()),
peerRecoveryRetentionLeases
)
Expand Down
51 changes: 42 additions & 9 deletions server/src/test/java/org/elasticsearch/index/store/StoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongUnaryOperator;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -93,6 +96,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;

public class StoreTests extends ESTestCase {

Expand Down Expand Up @@ -918,7 +922,7 @@ public void testMetadataSnapshotStreaming() throws Exception {
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
Store.MetadataSnapshot inMetadataSnapshot = new Store.MetadataSnapshot(in);
Store.MetadataSnapshot inMetadataSnapshot = Store.MetadataSnapshot.readFrom(in);
Map<String, StoreFileMetadata> origEntries = new HashMap<>();
origEntries.putAll(outMetadataSnapshot.asMap());
for (Map.Entry<String, StoreFileMetadata> entry : inMetadataSnapshot.asMap().entrySet()) {
Expand All @@ -928,6 +932,21 @@ public void testMetadataSnapshotStreaming() throws Exception {
assertThat(inMetadataSnapshot.getCommitUserData(), equalTo(outMetadataSnapshot.getCommitUserData()));
}

public void testEmptyMetadataSnapshotStreaming() throws Exception {
var outMetadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY : new Store.MetadataSnapshot(emptyMap(), emptyMap(), 0L);
var targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);

var outBuffer = new ByteArrayOutputStream();
var out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(targetNodeVersion);
outMetadataSnapshot.writeTo(out);

var inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
var in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
assertThat(Store.MetadataSnapshot.readFrom(in), sameInstance(Store.MetadataSnapshot.EMPTY));
}

protected Store.MetadataSnapshot createMetadataSnapshot() {
StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("segments", 1, "666", MIN_SUPPORTED_LUCENE_VERSION.toString());
StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("no_segments", 1, "666", MIN_SUPPORTED_LUCENE_VERSION.toString());
Expand Down Expand Up @@ -978,21 +997,16 @@ public void testStreamStoreFilesMetadata() throws Exception {
);
}
TransportNodesListShardStoreMetadata.StoreFilesMetadata outStoreFileMetadata =
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(
new ShardId("test", "_na_", 0),
metadataSnapshot,
peerRecoveryRetentionLeases
);
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases);
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
org.elasticsearch.Version targetNodeVersion = randomVersion(random());
org.elasticsearch.Version targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);
out.setVersion(targetNodeVersion);
outStoreFileMetadata.writeTo(out);
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
TransportNodesListShardStoreMetadata.StoreFilesMetadata inStoreFileMetadata =
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(in);
var inStoreFileMetadata = TransportNodesListShardStoreMetadata.StoreFilesMetadata.readFrom(in);
Iterator<StoreFileMetadata> outFiles = outStoreFileMetadata.iterator();
for (StoreFileMetadata inFile : inStoreFileMetadata) {
assertThat(inFile.name(), equalTo(outFiles.next().name()));
Expand All @@ -1001,6 +1015,25 @@ public void testStreamStoreFilesMetadata() throws Exception {
assertThat(outStoreFileMetadata.peerRecoveryRetentionLeases(), equalTo(peerRecoveryRetentionLeases));
}

public void testStreamEmptyStoreFilesMetadata() throws Exception {
var outStoreFileMetadata = randomBoolean()
? TransportNodesListShardStoreMetadata.StoreFilesMetadata.EMPTY
: new TransportNodesListShardStoreMetadata.StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList());
var outBuffer = new ByteArrayOutputStream();
var out = new OutputStreamStreamOutput(outBuffer);
var targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);
out.setVersion(targetNodeVersion);
outStoreFileMetadata.writeTo(out);

var inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
var in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
assertThat(
TransportNodesListShardStoreMetadata.StoreFilesMetadata.readFrom(in),
sameInstance(TransportNodesListShardStoreMetadata.StoreFilesMetadata.EMPTY)
);
}

public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
final ShardId shardId = new ShardId("index", "_na_", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static class PutCcrRestoreSessionResponse extends ActionResponse {
PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
super(in);
node = new DiscoveryNode(in);
storeFileMetadata = new Store.MetadataSnapshot(in);
storeFileMetadata = Store.MetadataSnapshot.readFrom(in);
mappingVersion = in.readVLong();
}

Expand Down

0 comments on commit 1b7b2a1

Please sign in to comment.