From 1b7b2a1a7a9d4ac0c30bd56264c1f09c60ec074f Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 17 Feb 2022 10:24:38 +0000 Subject: [PATCH] Use static empty store files metadata (#84034) In a large cluster we expect most nodes not to have a copy of most shards, but today during replica shard allocation we create a new (and nontrivial) object for each node that has no copy of a shard. With this commit we check at deserialization time whether the response is empty and, if so, avoid the unnecessary instantiation. Relates #77466 --- docs/changelog/84034.yaml | 5 ++ .../org/elasticsearch/index/store/Store.java | 44 ++++++--------- .../recovery/RecoveryCleanFilesRequest.java | 2 +- .../recovery/StartRecoveryRequest.java | 2 +- .../TransportNodesListShardStoreMetadata.java | 55 ++++++++++--------- .../gateway/ReplicaShardAllocatorTests.java | 1 - .../elasticsearch/index/store/StoreTests.java | 51 ++++++++++++++--- .../PutCcrRestoreSessionAction.java | 2 +- 8 files changed, 96 insertions(+), 66 deletions(-) create mode 100644 docs/changelog/84034.yaml diff --git a/docs/changelog/84034.yaml b/docs/changelog/84034.yaml new file mode 100644 index 0000000000000..30065232231ef --- /dev/null +++ b/docs/changelog/84034.yaml @@ -0,0 +1,5 @@ +pr: 84034 +summary: Use static empty store files metadata +area: Allocation +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 5aabb13e957e4..28b6452acc5ed 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; @@ -759,25 +760,17 @@ public String toString() { */ public static final class MetadataSnapshot implements Iterable, Writeable { private final Map metadata; - - public static final MetadataSnapshot EMPTY = new MetadataSnapshot(); - private final Map commitUserData; - private final long numDocs; + public static final MetadataSnapshot EMPTY = new MetadataSnapshot(emptyMap(), emptyMap(), 0L); + public MetadataSnapshot(Map metadata, Map commitUserData, long numDocs) { this.metadata = metadata; this.commitUserData = commitUserData; this.numDocs = numDocs; } - MetadataSnapshot() { - metadata = emptyMap(); - commitUserData = emptyMap(); - numDocs = 0; - } - MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException { LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger); metadata = loadedMetadata.fileMetadata; @@ -786,26 +779,21 @@ public MetadataSnapshot(Map metadata, Map metadata = new HashMap<>(); - for (int i = 0; i < size; i++) { - StoreFileMetadata meta = new StoreFileMetadata(in); - metadata.put(meta.name(), meta); - } - Map commitUserData = new HashMap<>(); - int num = in.readVInt(); - for (int i = num; i > 0; i--) { - commitUserData.put(in.readString(), in.readString()); + public static MetadataSnapshot readFrom(StreamInput in) throws IOException { + final int metadataSize = in.readVInt(); + final Map metadata = metadataSize == 0 ? emptyMap() : Maps.newMapWithExpectedSize(metadataSize); + for (int i = 0; i < metadataSize; i++) { + final var storeFileMetadata = new StoreFileMetadata(in); + metadata.put(storeFileMetadata.name(), storeFileMetadata); } + final var commitUserData = in.readMap(StreamInput::readString, StreamInput::readString); + final var numDocs = in.readLong(); - this.metadata = unmodifiableMap(metadata); - this.commitUserData = unmodifiableMap(commitUserData); - this.numDocs = in.readLong(); - assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles(); + if (metadataSize == 0 && commitUserData.size() == 0 && numDocs == 0) { + return MetadataSnapshot.EMPTY; + } else { + return new MetadataSnapshot(metadata, commitUserData, numDocs); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java index 2e6ba419752f3..d5997938e715b 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java @@ -43,7 +43,7 @@ public RecoveryCleanFilesRequest( super(in); recoveryId = in.readLong(); shardId = new ShardId(in); - snapshotFiles = new Store.MetadataSnapshot(in); + snapshotFiles = Store.MetadataSnapshot.readFrom(in); totalTranslogOps = in.readVInt(); globalCheckpoint = in.readZLong(); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index fd6d287f5f5fa..a19388ca26126 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -40,7 +40,7 @@ public StartRecoveryRequest(StreamInput in) throws IOException { targetAllocationId = in.readString(); sourceNode = new DiscoveryNode(in); targetNode = new DiscoveryNode(in); - metadataSnapshot = new Store.MetadataSnapshot(in); + metadataSnapshot = Store.MetadataSnapshot.readFrom(in); primaryRelocation = in.readBoolean(); startingSeqNo = in.readLong(); if (in.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION)) { diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java index d28249eefc716..016841af9601d 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -45,12 +46,13 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; +import static java.util.Collections.emptyList; + public class TransportNodesListShardStoreMetadata extends TransportNodesAction< TransportNodesListShardStoreMetadata.Request, TransportNodesListShardStoreMetadata.NodesStoreFilesMetadata, @@ -132,7 +134,6 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep if (indexShard != null) { try { final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata( - shardId, indexShard.snapshotStoreMetadata(), indexShard.getPeerRecoveryRetentionLeases() ); @@ -140,10 +141,10 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep return storeFilesMetadata; } catch (org.apache.lucene.index.IndexNotFoundException e) { logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e); - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + return StoreFilesMetadata.EMPTY; } catch (IOException e) { logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e); - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + return StoreFilesMetadata.EMPTY; } } } @@ -166,7 +167,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep } final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); if (shardPath == null) { - return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList()); + return StoreFilesMetadata.EMPTY; } // note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means: // 1) a shard is being constructed, which means the master will not use a copy of this replica @@ -180,7 +181,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep ); // We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when // we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard. - return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList()); + return new StoreFilesMetadata(metadataSnapshot, emptyList()); } finally { TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); if (exists) { @@ -192,37 +193,43 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep } public static class StoreFilesMetadata implements Iterable, Writeable { - private final ShardId shardId; private final Store.MetadataSnapshot metadataSnapshot; private final List peerRecoveryRetentionLeases; - public StoreFilesMetadata( - ShardId shardId, - Store.MetadataSnapshot metadataSnapshot, - List peerRecoveryRetentionLeases - ) { - this.shardId = shardId; + private static final ShardId FAKE_SHARD_ID = new ShardId("_na_", "_na_", 0); + public static final StoreFilesMetadata EMPTY = new StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList()); + + public StoreFilesMetadata(Store.MetadataSnapshot metadataSnapshot, List peerRecoveryRetentionLeases) { this.metadataSnapshot = metadataSnapshot; this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases; } - public StoreFilesMetadata(StreamInput in) throws IOException { - this.shardId = new ShardId(in); - this.metadataSnapshot = new Store.MetadataSnapshot(in); - this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new); + public static StoreFilesMetadata readFrom(StreamInput in) throws IOException { + if (in.getVersion().before(Version.V_8_2_0)) { + new ShardId(in); + } + final var metadataSnapshot = Store.MetadataSnapshot.readFrom(in); + final var peerRecoveryRetentionLeases = in.readList(RetentionLease::new); + if (metadataSnapshot == Store.MetadataSnapshot.EMPTY && peerRecoveryRetentionLeases.isEmpty()) { + return EMPTY; + } else { + return new StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases); + } } @Override public void writeTo(StreamOutput out) throws IOException { - shardId.writeTo(out); + if (out.getVersion().before(Version.V_8_2_0)) { + // no compatible version cares about the shard ID, we can just make one up + FAKE_SHARD_ID.writeTo(out); + + // NB only checked this for versions back to 7.17.0, we are assuming that we don't use this with earlier versions: + assert out.getVersion().onOrAfter(Version.V_7_17_0) : out.getVersion(); + } metadataSnapshot.writeTo(out); out.writeList(peerRecoveryRetentionLeases); } - public ShardId shardId() { - return this.shardId; - } - public boolean isEmpty() { return metadataSnapshot.size() == 0; } @@ -267,8 +274,6 @@ public String syncId() { @Override public String toString() { return "StoreFilesMetadata{" - + ", shardId=" - + shardId + ", metadataSnapshot{size=" + metadataSnapshot.size() + ", syncId=" @@ -385,7 +390,7 @@ public static class NodeStoreFilesMetadata extends BaseNodeResponse { public NodeStoreFilesMetadata(StreamInput in, DiscoveryNode node) throws IOException { super(in, node); - storeFilesMetadata = new StoreFilesMetadata(in); + storeFilesMetadata = StoreFilesMetadata.readFrom(in); } public NodeStoreFilesMetadata(DiscoveryNode node, StoreFilesMetadata storeFilesMetadata) { diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index d386e080606a7..758da73def503 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -684,7 +684,6 @@ TestAllocator addData( data.put( node, new TransportNodesListShardStoreMetadata.StoreFilesMetadata( - shardId, new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), peerRecoveryRetentionLeases ) diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index f5a5d6ed4c009..efc1b846492b8 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -80,7 +80,10 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongUnaryOperator; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion; import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; @@ -93,6 +96,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; public class StoreTests extends ESTestCase { @@ -918,7 +922,7 @@ public void testMetadataSnapshotStreaming() throws Exception { ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); in.setVersion(targetNodeVersion); - Store.MetadataSnapshot inMetadataSnapshot = new Store.MetadataSnapshot(in); + Store.MetadataSnapshot inMetadataSnapshot = Store.MetadataSnapshot.readFrom(in); Map origEntries = new HashMap<>(); origEntries.putAll(outMetadataSnapshot.asMap()); for (Map.Entry entry : inMetadataSnapshot.asMap().entrySet()) { @@ -928,6 +932,21 @@ public void testMetadataSnapshotStreaming() throws Exception { assertThat(inMetadataSnapshot.getCommitUserData(), equalTo(outMetadataSnapshot.getCommitUserData())); } + public void testEmptyMetadataSnapshotStreaming() throws Exception { + var outMetadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY : new Store.MetadataSnapshot(emptyMap(), emptyMap(), 0L); + var targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT); + + var outBuffer = new ByteArrayOutputStream(); + var out = new OutputStreamStreamOutput(outBuffer); + out.setVersion(targetNodeVersion); + outMetadataSnapshot.writeTo(out); + + var inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + var in = new InputStreamStreamInput(inBuffer); + in.setVersion(targetNodeVersion); + assertThat(Store.MetadataSnapshot.readFrom(in), sameInstance(Store.MetadataSnapshot.EMPTY)); + } + protected Store.MetadataSnapshot createMetadataSnapshot() { StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("segments", 1, "666", MIN_SUPPORTED_LUCENE_VERSION.toString()); StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("no_segments", 1, "666", MIN_SUPPORTED_LUCENE_VERSION.toString()); @@ -978,21 +997,16 @@ public void testStreamStoreFilesMetadata() throws Exception { ); } TransportNodesListShardStoreMetadata.StoreFilesMetadata outStoreFileMetadata = - new TransportNodesListShardStoreMetadata.StoreFilesMetadata( - new ShardId("test", "_na_", 0), - metadataSnapshot, - peerRecoveryRetentionLeases - ); + new TransportNodesListShardStoreMetadata.StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); - org.elasticsearch.Version targetNodeVersion = randomVersion(random()); + org.elasticsearch.Version targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT); out.setVersion(targetNodeVersion); outStoreFileMetadata.writeTo(out); ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); InputStreamStreamInput in = new InputStreamStreamInput(inBuffer); in.setVersion(targetNodeVersion); - TransportNodesListShardStoreMetadata.StoreFilesMetadata inStoreFileMetadata = - new TransportNodesListShardStoreMetadata.StoreFilesMetadata(in); + var inStoreFileMetadata = TransportNodesListShardStoreMetadata.StoreFilesMetadata.readFrom(in); Iterator outFiles = outStoreFileMetadata.iterator(); for (StoreFileMetadata inFile : inStoreFileMetadata) { assertThat(inFile.name(), equalTo(outFiles.next().name())); @@ -1001,6 +1015,25 @@ public void testStreamStoreFilesMetadata() throws Exception { assertThat(outStoreFileMetadata.peerRecoveryRetentionLeases(), equalTo(peerRecoveryRetentionLeases)); } + public void testStreamEmptyStoreFilesMetadata() throws Exception { + var outStoreFileMetadata = randomBoolean() + ? TransportNodesListShardStoreMetadata.StoreFilesMetadata.EMPTY + : new TransportNodesListShardStoreMetadata.StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList()); + var outBuffer = new ByteArrayOutputStream(); + var out = new OutputStreamStreamOutput(outBuffer); + var targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT); + out.setVersion(targetNodeVersion); + outStoreFileMetadata.writeTo(out); + + var inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); + var in = new InputStreamStreamInput(inBuffer); + in.setVersion(targetNodeVersion); + assertThat( + TransportNodesListShardStoreMetadata.StoreFilesMetadata.readFrom(in), + sameInstance(TransportNodesListShardStoreMetadata.StoreFilesMetadata.EMPTY) + ); + } + public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException { IndexWriterConfig iwc = newIndexWriterConfig(); final ShardId shardId = new ShardId("index", "_na_", 1); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index eb8038755d782..d116106b044ae 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -114,7 +114,7 @@ public static class PutCcrRestoreSessionResponse extends ActionResponse { PutCcrRestoreSessionResponse(StreamInput in) throws IOException { super(in); node = new DiscoveryNode(in); - storeFileMetadata = new Store.MetadataSnapshot(in); + storeFileMetadata = Store.MetadataSnapshot.readFrom(in); mappingVersion = in.readVLong(); }