Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use static empty store files metadata #84034

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/84034.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84034
summary: Use static empty store files metadata
area: Allocation
type: enhancement
issues: []
43 changes: 15 additions & 28 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -759,25 +759,17 @@ public String toString() {
*/
public static final class MetadataSnapshot implements Iterable<StoreFileMetadata>, Writeable {
private final Map<String, StoreFileMetadata> metadata;

public static final MetadataSnapshot EMPTY = new MetadataSnapshot();

private final Map<String, String> commitUserData;

private final long numDocs;

public static final MetadataSnapshot EMPTY = new MetadataSnapshot(emptyMap(), emptyMap(), 0L);

public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, String> commitUserData, long numDocs) {
this.metadata = metadata;
this.commitUserData = commitUserData;
this.numDocs = numDocs;
}

MetadataSnapshot() {
metadata = emptyMap();
commitUserData = emptyMap();
numDocs = 0;
}

MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
metadata = loadedMetadata.fileMetadata;
Expand All @@ -786,26 +778,21 @@ public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, Str
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
}

/**
* Read from a stream.
*/
public MetadataSnapshot(StreamInput in) throws IOException {
final int size = in.readVInt();
Map<String, StoreFileMetadata> metadata = new HashMap<>();
for (int i = 0; i < size; i++) {
StoreFileMetadata meta = new StoreFileMetadata(in);
metadata.put(meta.name(), meta);
}
Map<String, String> commitUserData = new HashMap<>();
int num = in.readVInt();
for (int i = num; i > 0; i--) {
commitUserData.put(in.readString(), in.readString());
public static MetadataSnapshot readFrom(StreamInput in) throws IOException {
final int metadataSize = in.readVInt();
final Map<String, StoreFileMetadata> metadata = metadataSize == 0 ? emptyMap() : new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: we might use org.elasticsearch.common.util.Maps#newMapWithExpectedSize to avoid resizing the map

Copy link
Contributor

@idegtiarenko idegtiarenko Feb 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also may be it is worth generalizing this pattern of reading with something like:
Map<K, V> readMapFromList(Writeable.Reader<V> valueReader, Function<V, K> keyCreator) throws IOException that would internally handle map creation, initialization and sizing?

I think this is not the only place where we create map from list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ see 488a05b

Looks like we are not properly sizing the map produced by StreamInput#readMap and StreamInput#readOrderedMap either. I'm not touching that in this PR, nor the map-from-list thing, but seems reasonable I think.

Copy link
Contributor

@idegtiarenko idegtiarenko Feb 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#84045 is a pr for pre-sizing maps in readMap. We could discuss if there any concerns around that topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could also open a followup pr on map-from-list

for (int i = 0; i < metadataSize; i++) {
final var storeFileMetadata = new StoreFileMetadata(in);
metadata.put(storeFileMetadata.name(), storeFileMetadata);
}
final var commitUserData = in.readMap(StreamInput::readString, StreamInput::readString);
final var numDocs = in.readLong();

this.metadata = unmodifiableMap(metadata);
this.commitUserData = unmodifiableMap(commitUserData);
this.numDocs = in.readLong();
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
if (metadataSize == 0 && commitUserData.size() == 0 && numDocs == 0) {
return MetadataSnapshot.EMPTY;
} else {
return new MetadataSnapshot(metadata, commitUserData, numDocs);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public RecoveryCleanFilesRequest(
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
snapshotFiles = new Store.MetadataSnapshot(in);
snapshotFiles = Store.MetadataSnapshot.readFrom(in);
totalTranslogOps = in.readVInt();
globalCheckpoint = in.readZLong();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public StartRecoveryRequest(StreamInput in) throws IOException {
targetAllocationId = in.readString();
sourceNode = new DiscoveryNode(in);
targetNode = new DiscoveryNode(in);
metadataSnapshot = new Store.MetadataSnapshot(in);
metadataSnapshot = Store.MetadataSnapshot.readFrom(in);
primaryRelocation = in.readBoolean();
startingSeqNo = in.readLong();
if (in.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -45,12 +46,13 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyList;

public class TransportNodesListShardStoreMetadata extends TransportNodesAction<
TransportNodesListShardStoreMetadata.Request,
TransportNodesListShardStoreMetadata.NodesStoreFilesMetadata,
Expand Down Expand Up @@ -132,18 +134,17 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
if (indexShard != null) {
try {
final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata(
shardId,
indexShard.snapshotStoreMetadata(),
indexShard.getPeerRecoveryRetentionLeases()
);
exists = true;
return storeFilesMetadata;
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e);
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
return StoreFilesMetadata.EMPTY;
} catch (IOException e) {
logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e);
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
return StoreFilesMetadata.EMPTY;
}
}
}
Expand All @@ -166,7 +167,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
}
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);
if (shardPath == null) {
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
return StoreFilesMetadata.EMPTY;
}
// note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means:
// 1) a shard is being constructed, which means the master will not use a copy of this replica
Expand All @@ -180,7 +181,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
);
// We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when
// we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard.
return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList());
return new StoreFilesMetadata(metadataSnapshot, emptyList());
} finally {
TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
if (exists) {
Expand All @@ -192,37 +193,43 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
}

public static class StoreFilesMetadata implements Iterable<StoreFileMetadata>, Writeable {
private final ShardId shardId;
private final Store.MetadataSnapshot metadataSnapshot;
private final List<RetentionLease> peerRecoveryRetentionLeases;

public StoreFilesMetadata(
ShardId shardId,
Store.MetadataSnapshot metadataSnapshot,
List<RetentionLease> peerRecoveryRetentionLeases
) {
this.shardId = shardId;
private static final ShardId FAKE_SHARD_ID = new ShardId("_na_", "_na_", 0);
public static final StoreFilesMetadata EMPTY = new StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList());

public StoreFilesMetadata(Store.MetadataSnapshot metadataSnapshot, List<RetentionLease> peerRecoveryRetentionLeases) {
this.metadataSnapshot = metadataSnapshot;
this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases;
}

public StoreFilesMetadata(StreamInput in) throws IOException {
this.shardId = new ShardId(in);
this.metadataSnapshot = new Store.MetadataSnapshot(in);
this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new);
public static StoreFilesMetadata readFrom(StreamInput in) throws IOException {
if (in.getVersion().before(Version.V_8_2_0)) {
new ShardId(in);
}
final var metadataSnapshot = Store.MetadataSnapshot.readFrom(in);
final var peerRecoveryRetentionLeases = in.readList(RetentionLease::new);
if (metadataSnapshot == Store.MetadataSnapshot.EMPTY && peerRecoveryRetentionLeases.isEmpty()) {
return EMPTY;
} else {
return new StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
if (out.getVersion().before(Version.V_8_2_0)) {
// no compatible version cares about the shard ID, we can just make one up
FAKE_SHARD_ID.writeTo(out);

// NB only checked this for versions back to 7.17.0, we are assuming that we don't use this with earlier versions:
assert out.getVersion().onOrAfter(Version.V_7_17_0) : out.getVersion();
}
metadataSnapshot.writeTo(out);
out.writeList(peerRecoveryRetentionLeases);
}

public ShardId shardId() {
return this.shardId;
}

public boolean isEmpty() {
return metadataSnapshot.size() == 0;
}
Expand Down Expand Up @@ -267,8 +274,6 @@ public String syncId() {
@Override
public String toString() {
return "StoreFilesMetadata{"
+ ", shardId="
+ shardId
+ ", metadataSnapshot{size="
+ metadataSnapshot.size()
+ ", syncId="
Expand Down Expand Up @@ -385,7 +390,7 @@ public static class NodeStoreFilesMetadata extends BaseNodeResponse {

public NodeStoreFilesMetadata(StreamInput in, DiscoveryNode node) throws IOException {
super(in, node);
storeFilesMetadata = new StoreFilesMetadata(in);
storeFilesMetadata = StoreFilesMetadata.readFrom(in);
}

public NodeStoreFilesMetadata(DiscoveryNode node, StoreFilesMetadata storeFilesMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,6 @@ TestAllocator addData(
data.put(
node,
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(
shardId,
new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()),
peerRecoveryRetentionLeases
)
Expand Down
51 changes: 42 additions & 9 deletions server/src/test/java/org/elasticsearch/index/store/StoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongUnaryOperator;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -93,6 +96,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;

public class StoreTests extends ESTestCase {

Expand Down Expand Up @@ -918,7 +922,7 @@ public void testMetadataSnapshotStreaming() throws Exception {
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
Store.MetadataSnapshot inMetadataSnapshot = new Store.MetadataSnapshot(in);
Store.MetadataSnapshot inMetadataSnapshot = Store.MetadataSnapshot.readFrom(in);
Map<String, StoreFileMetadata> origEntries = new HashMap<>();
origEntries.putAll(outMetadataSnapshot.asMap());
for (Map.Entry<String, StoreFileMetadata> entry : inMetadataSnapshot.asMap().entrySet()) {
Expand All @@ -928,6 +932,21 @@ public void testMetadataSnapshotStreaming() throws Exception {
assertThat(inMetadataSnapshot.getCommitUserData(), equalTo(outMetadataSnapshot.getCommitUserData()));
}

public void testEmptyMetadataSnapshotStreaming() throws Exception {
var outMetadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY : new Store.MetadataSnapshot(emptyMap(), emptyMap(), 0L);
var targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);

var outBuffer = new ByteArrayOutputStream();
var out = new OutputStreamStreamOutput(outBuffer);
out.setVersion(targetNodeVersion);
outMetadataSnapshot.writeTo(out);

var inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
var in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
assertThat(Store.MetadataSnapshot.readFrom(in), sameInstance(Store.MetadataSnapshot.EMPTY));
}

protected Store.MetadataSnapshot createMetadataSnapshot() {
StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("segments", 1, "666", MIN_SUPPORTED_LUCENE_VERSION.toString());
StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("no_segments", 1, "666", MIN_SUPPORTED_LUCENE_VERSION.toString());
Expand Down Expand Up @@ -978,21 +997,16 @@ public void testStreamStoreFilesMetadata() throws Exception {
);
}
TransportNodesListShardStoreMetadata.StoreFilesMetadata outStoreFileMetadata =
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(
new ShardId("test", "_na_", 0),
metadataSnapshot,
peerRecoveryRetentionLeases
);
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases);
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
org.elasticsearch.Version targetNodeVersion = randomVersion(random());
org.elasticsearch.Version targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);
out.setVersion(targetNodeVersion);
outStoreFileMetadata.writeTo(out);
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
TransportNodesListShardStoreMetadata.StoreFilesMetadata inStoreFileMetadata =
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(in);
var inStoreFileMetadata = TransportNodesListShardStoreMetadata.StoreFilesMetadata.readFrom(in);
Iterator<StoreFileMetadata> outFiles = outStoreFileMetadata.iterator();
for (StoreFileMetadata inFile : inStoreFileMetadata) {
assertThat(inFile.name(), equalTo(outFiles.next().name()));
Expand All @@ -1001,6 +1015,25 @@ public void testStreamStoreFilesMetadata() throws Exception {
assertThat(outStoreFileMetadata.peerRecoveryRetentionLeases(), equalTo(peerRecoveryRetentionLeases));
}

public void testStreamEmptyStoreFilesMetadata() throws Exception {
var outStoreFileMetadata = randomBoolean()
? TransportNodesListShardStoreMetadata.StoreFilesMetadata.EMPTY
: new TransportNodesListShardStoreMetadata.StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList());
var outBuffer = new ByteArrayOutputStream();
var out = new OutputStreamStreamOutput(outBuffer);
var targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);
out.setVersion(targetNodeVersion);
outStoreFileMetadata.writeTo(out);

var inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
var in = new InputStreamStreamInput(inBuffer);
in.setVersion(targetNodeVersion);
assertThat(
TransportNodesListShardStoreMetadata.StoreFilesMetadata.readFrom(in),
sameInstance(TransportNodesListShardStoreMetadata.StoreFilesMetadata.EMPTY)
);
}

public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
final ShardId shardId = new ShardId("index", "_na_", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static class PutCcrRestoreSessionResponse extends ActionResponse {
PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
super(in);
node = new DiscoveryNode(in);
storeFileMetadata = new Store.MetadataSnapshot(in);
storeFileMetadata = Store.MetadataSnapshot.readFrom(in);
mappingVersion = in.readVLong();
}

Expand Down