Skip to content

Commit

Permalink
Simplify Store$MetadataSnapshot and friends
Browse files Browse the repository at this point in the history
`TransportNodesListShardStoreMetadata$StoreFilesMetadata` and
`Store$MetadataSnapshot` are both morally-speaking records, and
`LoadedMetadata` is really the same as `MetadataSnapshot`. This commit
turns them into real records, gets rid of the unnecessary extra class,
and renames some of the accessors.

Spotted while working on elastic#84034
  • Loading branch information
DaveCTurner committed Feb 24, 2022
1 parent 24155ce commit 2201d70
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1072,10 +1072,10 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
logger.info(
"--> start recovery request: starting seq_no {}, commit {}",
startRecoveryRequest.startingSeqNo(),
startRecoveryRequest.metadataSnapshot().getCommitUserData()
startRecoveryRequest.metadataSnapshot().commitUserData()
);
SequenceNumbers.CommitInfo commitInfoAfterLocalRecovery = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
startRecoveryRequest.metadataSnapshot().getCommitUserData().entrySet()
startRecoveryRequest.metadataSnapshot().commitUserData().entrySet()
);
assertThat(commitInfoAfterLocalRecovery.localCheckpoint, equalTo(lastSyncedGlobalCheckpoint));
assertThat(commitInfoAfterLocalRecovery.maxSeqNo, equalTo(lastSyncedGlobalCheckpoint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2913,7 +2913,7 @@ private void doCheckIndex() throws IOException {
throw e;
}
final List<String> checkedFiles = new ArrayList<>(metadata.size());
for (Map.Entry<String, StoreFileMetadata> entry : metadata.asMap().entrySet()) {
for (Map.Entry<String, StoreFileMetadata> entry : metadata.fileMetadataMap().entrySet()) {
try {
Store.checkIntegrity(entry.getValue(), store.directory());
if (corrupt == null) {
Expand Down
141 changes: 51 additions & 90 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t
java.util.concurrent.locks.Lock lock = lockDirectory ? metadataLock.writeLock() : metadataLock.readLock();
lock.lock();
try (Closeable ignored = lockDirectory ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : () -> {}) {
return new MetadataSnapshot(commit, directory, logger);
return MetadataSnapshot.loadFromIndexCommit(commit, directory, logger);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex);
throw ex;
Expand Down Expand Up @@ -448,7 +448,7 @@ public static MetadataSnapshot readMetadataSnapshot(
Directory dir = new NIOFSDirectory(indexLocation)
) {
failIfCorrupted(dir);
return new MetadataSnapshot(null, dir, logger);
return MetadataSnapshot.loadFromIndexCommit(null, dir, logger);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
Expand Down Expand Up @@ -756,80 +756,24 @@ public String toString() {
* change concurrently for safety reasons.
*
* @see StoreFileMetadata
*
* @param numDocs the number of documents in this store snapshot
*/
public static final class MetadataSnapshot implements Iterable<StoreFileMetadata>, Writeable {
private final Map<String, StoreFileMetadata> metadata;
private final Map<String, String> commitUserData;
private final long numDocs;
public record MetadataSnapshot(Map<String, StoreFileMetadata> fileMetadataMap, Map<String, String> commitUserData, long numDocs)
implements
Iterable<StoreFileMetadata>,
Writeable {

public static final MetadataSnapshot EMPTY = new MetadataSnapshot(emptyMap(), emptyMap(), 0L);

public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, String> commitUserData, long numDocs) {
this.metadata = metadata;
this.commitUserData = commitUserData;
this.numDocs = numDocs;
}

MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
metadata = loadedMetadata.fileMetadata;
commitUserData = loadedMetadata.userData;
numDocs = loadedMetadata.numDocs;
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
}

public static MetadataSnapshot readFrom(StreamInput in) throws IOException {
final Map<String, StoreFileMetadata> metadata = in.readMapValues(StoreFileMetadata::new, StoreFileMetadata::name);
final var commitUserData = in.readMap(StreamInput::readString, StreamInput::readString);
final var numDocs = in.readLong();

if (metadata.size() == 0 && commitUserData.size() == 0 && numDocs == 0) {
return MetadataSnapshot.EMPTY;
} else {
return new MetadataSnapshot(metadata, commitUserData, numDocs);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMapValues(metadata);
out.writeMap(commitUserData, StreamOutput::writeString, StreamOutput::writeString);
out.writeLong(numDocs);
}

/**
* Returns the number of documents in this store snapshot
*/
public long getNumDocs() {
return numDocs;
}

@Nullable
public org.elasticsearch.Version getCommitVersion() {
String version = commitUserData.get(ES_VERSION);
return version == null ? null : org.elasticsearch.Version.fromString(version);
}

static class LoadedMetadata {
final Map<String, StoreFileMetadata> fileMetadata;
final Map<String, String> userData;
static MetadataSnapshot loadFromIndexCommit(IndexCommit commit, Directory directory, Logger logger) throws IOException {
final long numDocs;

LoadedMetadata(Map<String, StoreFileMetadata> fileMetadata, Map<String, String> userData, long numDocs) {
this.fileMetadata = fileMetadata;
this.userData = userData;
this.numDocs = numDocs;
}
}

static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
long numDocs;
Map<String, StoreFileMetadata> builder = new HashMap<>();
Map<String, String> commitUserDataBuilder = new HashMap<>();
final Map<String, StoreFileMetadata> metadataByFile = new HashMap<>();
final Map<String, String> commitUserData;
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
commitUserData = Map.copyOf(segmentCommitInfos.getUserData());
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentCommitInfos) {
Expand All @@ -849,7 +793,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
checksumFromLuceneFile(
directory,
file,
builder,
metadataByFile,
logger,
version.toString(),
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file)),
Expand All @@ -864,7 +808,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
checksumFromLuceneFile(
directory,
segmentsFile,
builder,
metadataByFile,
logger,
maxVersion.toString(),
true,
Expand All @@ -886,16 +830,41 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
ex
);
Lucene.checkSegmentInfoIntegrity(directory);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException cex) {
cex.addSuppressed(ex);
throw cex;
} catch (Exception inner) {
inner.addSuppressed(ex);
throw inner;
}
throw ex;
}
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
final var metadataSnapshot = new MetadataSnapshot(unmodifiableMap(metadataByFile), commitUserData, numDocs);
assert metadataSnapshot.fileMetadataMap.isEmpty() || metadataSnapshot.numSegmentFiles() == 1
: "numSegmentFiles: " + metadataSnapshot.numSegmentFiles();
return metadataSnapshot;
}

public static MetadataSnapshot readFrom(StreamInput in) throws IOException {
final Map<String, StoreFileMetadata> metadata = in.readMapValues(StoreFileMetadata::new, StoreFileMetadata::name);
final var commitUserData = in.readMap(StreamInput::readString, StreamInput::readString);
final var numDocs = in.readLong();

if (metadata.size() == 0 && commitUserData.size() == 0 && numDocs == 0) {
return MetadataSnapshot.EMPTY;
} else {
return new MetadataSnapshot(metadata, commitUserData, numDocs);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMapValues(fileMetadataMap);
out.writeMap(commitUserData, StreamOutput::writeString, StreamOutput::writeString);
out.writeLong(numDocs);
}

@Nullable
public org.elasticsearch.Version getCommitVersion() {
String version = commitUserData.get(ES_VERSION);
return version == null ? null : org.elasticsearch.Version.fromString(version);
}

private static void checksumFromLuceneFile(
Expand Down Expand Up @@ -956,15 +925,11 @@ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size)

@Override
public Iterator<StoreFileMetadata> iterator() {
return metadata.values().iterator();
return fileMetadataMap.values().iterator();
}

public StoreFileMetadata get(String name) {
return metadata.get(name);
}

public Map<String, StoreFileMetadata> asMap() {
return metadata;
return fileMetadataMap.get(name);
}

private static final String SEGMENT_INFO_EXTENSION = "si";
Expand Down Expand Up @@ -1079,13 +1044,13 @@ public RecoveryDiff recoveryDiff(final MetadataSnapshot targetSnapshot) {
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing)
);
assert recoveryDiff.size() == metadata.size()
assert recoveryDiff.size() == fileMetadataMap.size()
: "some files are missing: recoveryDiff is ["
+ recoveryDiff
+ "] comparing: ["
+ metadata
+ fileMetadataMap
+ "] to ["
+ targetSnapshot.metadata
+ targetSnapshot.fileMetadataMap
+ "]";
return recoveryDiff;
}
Expand All @@ -1094,11 +1059,7 @@ public RecoveryDiff recoveryDiff(final MetadataSnapshot targetSnapshot) {
* Returns the number of files in this snapshot
*/
public int size() {
return metadata.size();
}

public Map<String, String> getCommitUserData() {
return commitUserData;
return fileMetadataMap.size();
}

/**
Expand All @@ -1112,7 +1073,7 @@ public String getHistoryUUID() {
* Returns true iff this metadata contains the given file.
*/
public boolean contains(String existingFile) {
return metadata.containsKey(existingFile);
return fileMetadataMap.containsKey(existingFile);
}

/**
Expand All @@ -1124,7 +1085,7 @@ public StoreFileMetadata getSegmentsFile() {
return file;
}
}
assert metadata.isEmpty();
assert fileMetadataMap.isEmpty();
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public static StartRecoveryRequest getStartRecoveryRequest(
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.
try {
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
final String expectedTranslogUUID = metadataSnapshot.commitUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
} catch (IOException | TranslogCorruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,10 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A
for (String name : snapshot.getFileNames()) {
final StoreFileMetadata md = recoverySourceMetadata.get(name);
if (md == null) {
logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap());
logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.fileMetadataMap());
throw new CorruptIndexException(
"Snapshot differs from actual index - maybe index was removed metadata has "
+ recoverySourceMetadata.asMap().size()
+ recoverySourceMetadata.fileMetadataMap().size()
+ " files",
name
);
Expand Down Expand Up @@ -624,11 +624,11 @@ void recoverFilesFromSourceAndSnapshot(
}

for (StoreFileMetadata md : shardRecoveryPlan.getSourceFilesToRecover()) {
if (request.metadataSnapshot().asMap().containsKey(md.name())) {
if (request.metadataSnapshot().fileMetadataMap().containsKey(md.name())) {
logger.trace(
"recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
md.name(),
request.metadataSnapshot().asMap().get(md.name()),
request.metadataSnapshot().fileMetadataMap().get(md.name()),
md
);
} else {
Expand All @@ -638,11 +638,11 @@ void recoverFilesFromSourceAndSnapshot(

for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : shardRecoveryPlan.getSnapshotFilesToRecover()) {
final StoreFileMetadata md = fileInfo.metadata();
if (request.metadataSnapshot().asMap().containsKey(md.name())) {
if (request.metadataSnapshot().fileMetadataMap().containsKey(md.name())) {
logger.trace(
"recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
md.name(),
request.metadataSnapshot().asMap().get(md.name()),
request.metadataSnapshot().fileMetadataMap().get(md.name()),
md
);
} else {
Expand Down Expand Up @@ -986,32 +986,32 @@ boolean canSkipPhase1(Store.MetadataSnapshot source, Store.MetadataSnapshot targ
if (source.getSyncId() == null || source.getSyncId().equals(target.getSyncId()) == false) {
return false;
}
if (source.getNumDocs() != target.getNumDocs()) {
if (source.numDocs() != target.numDocs()) {
throw new IllegalStateException(
"try to recover "
+ request.shardId()
+ " from primary shard with sync id but number "
+ "of docs differ: "
+ source.getNumDocs()
+ source.numDocs()
+ " ("
+ request.sourceNode().getName()
+ ", primary) vs "
+ target.getNumDocs()
+ target.numDocs()
+ "("
+ request.targetNode().getName()
+ ")"
);
}
SequenceNumbers.CommitInfo sourceSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(source.getCommitUserData().entrySet());
SequenceNumbers.CommitInfo targetSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(target.getCommitUserData().entrySet());
SequenceNumbers.CommitInfo sourceSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(source.commitUserData().entrySet());
SequenceNumbers.CommitInfo targetSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(target.commitUserData().entrySet());
if (sourceSeqNos.localCheckpoint != targetSeqNos.localCheckpoint || targetSeqNos.maxSeqNo != sourceSeqNos.maxSeqNo) {
final String message = "try to recover "
+ request.shardId()
+ " with sync id but "
+ "seq_no stats are mismatched: ["
+ source.getCommitUserData()
+ source.commitUserData()
+ "] vs ["
+ target.getCommitUserData()
+ target.commitUserData()
+ "]";
assert false : message;
throw new IllegalStateException(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,14 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
}
}

public static class StoreFilesMetadata implements Iterable<StoreFileMetadata>, Writeable {
private final Store.MetadataSnapshot metadataSnapshot;
private final List<RetentionLease> peerRecoveryRetentionLeases;
public record StoreFilesMetadata(Store.MetadataSnapshot metadataSnapshot, List<RetentionLease> peerRecoveryRetentionLeases)
implements
Iterable<StoreFileMetadata>,
Writeable {

private static final ShardId FAKE_SHARD_ID = new ShardId("_na_", "_na_", 0);
public static final StoreFilesMetadata EMPTY = new StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList());

public StoreFilesMetadata(Store.MetadataSnapshot metadataSnapshot, List<RetentionLease> peerRecoveryRetentionLeases) {
this.metadataSnapshot = metadataSnapshot;
this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases;
}

public static StoreFilesMetadata readFrom(StreamInput in) throws IOException {
if (in.getVersion().before(Version.V_8_2_0)) {
new ShardId(in);
Expand Down Expand Up @@ -240,11 +236,11 @@ public Iterator<StoreFileMetadata> iterator() {
}

public boolean fileExists(String name) {
return metadataSnapshot.asMap().containsKey(name);
return metadataSnapshot.fileMetadataMap().containsKey(name);
}

public StoreFileMetadata file(String name) {
return metadataSnapshot.asMap().get(name);
return metadataSnapshot.fileMetadataMap().get(name);
}

/**
Expand All @@ -260,10 +256,6 @@ public long getPeerRecoveryRetentionLeaseRetainingSeqNo(DiscoveryNode node) {
.orElse(-1L);
}

public List<RetentionLease> peerRecoveryRetentionLeases() {
return peerRecoveryRetentionLeases;
}

/**
* @return commit sync id if exists, else null
*/
Expand Down
Loading

0 comments on commit 2201d70

Please sign in to comment.