Skip to content

Commit

Permalink
Prevent files still referenced by the on disk segments_N from deletion.
Browse files Browse the repository at this point in the history
This change updates the cleanup and validation steps after a replication event occurs to prevent
deleting files still referenced by both the on disk segments_N file and the in memory SegmentInfos.
After cleaning it validates the altest in memory SegmentInfos against the metadata sent from the primary shard.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Mar 4, 2022
1 parent 4d25000 commit 973ae50
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,38 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build()
);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) {
waitForDocs(initialDocCount, indexer);
}
flush(INDEX_NAME);
// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int totalDocs = initialDocCount + additionalDocCount;
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) {
waitForDocs(additionalDocCount, indexer);
}
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {};
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {}

public long getProcessedLocalCheckpoint() {
return 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -120,7 +116,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -345,21 +340,13 @@ public InternalEngine(EngineConfig engineConfig) {
}

@Override
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
assert engineConfig.isPrimary() == false : "Only replicas should update Infos";
SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen);
assert gen == infos.getGeneration();
externalReaderManager.internalReaderManager.updateSegments(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
}

private ChecksumIndexInput toIndexInput(byte[] input) {
return new BufferedChecksumIndexInput(
new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos")
);
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier
) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1522,8 +1522,8 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
);
}

public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infosBytes, gen, seqNo);
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infos, seqNo);
}

/**
Expand Down
44 changes: 44 additions & 0 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,50 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param remoteSnapshot The remote snapshot sent from primary shards.
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException {
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
final Store.MetadataSnapshot onDiskMetadata = getMetadata((IndexCommit) null);
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile) || remoteSnapshot.contains(existingFile) || onDiskMetadata.contains(existingFile)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
logger.info("Deleting file {}-{}", shardId, existingFile);
directory.deleteFile(reason, existingFile);
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS)
|| existingFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)
|| existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
verifyAfterCleanup(remoteSnapshot, localSnapshot);
} finally {
metadataLock.writeLock().unlock();
}
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
Expand All @@ -56,6 +60,7 @@
import org.opensearch.indices.replication.checkpoint.TransportCheckpointInfoResponse;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -138,7 +143,7 @@ public Store store() {
return store;
}

public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, ActionListener<Void> listener) {
public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoResponse, ActionListener<Void> listener) {
ActionListener.completeWith(listener, () -> {
// first, we go and move files that were created with the recovery id suffix to
// the actual names, its ok if we have a corrupted index here, since we have replicas
Expand All @@ -147,21 +152,29 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo,
final Store store = store();
store.incRef();
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", checkpointInfo.getSnapshot());
if (indexShard.getRetentionLeases().leases().isEmpty()) {
// if empty, may be a fresh IndexShard, so write an empty leases file to disk
indexShard.persistRetentionLeases();
assert indexShard.loadRetentionLeases().leases().isEmpty();
} else {
assert indexShard.assertRetentionLeasesPersisted();
}
final long segmentsGen = checkpointInfo.getCheckpoint().getSegmentsGen();
// force an fsync if we are receiving a new gen.
if (segmentsGen > indexShard.getLatestSegmentInfos().getGeneration()) {
final Directory directory = store().directory();
directory.sync(Arrays.asList(directory.listAll()));
}
indexShard.updateCurrentInfos(segmentsGen, checkpointInfo.getInfosBytes(), checkpointInfo.getCheckpoint().getSeqNo());

// Deserialize the new SegmentInfos object sent primary the primary.
final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint();

SegmentInfos infos = SegmentInfos.readCommit(
store.directory(),
toIndexInput(checkpointInfoResponse.getInfosBytes()),
responseCheckpoint.getSegmentsGen()
);

// clean up the local store of old segment files
// and validate the latest segment infos against the snapshot sent from the primary shard.
store.cleanupAndVerify("finalizeReplication", checkpointInfoResponse.getSnapshot(), store.getMetadata(infos));

// Update the current infos reference on the Engine's reader.
indexShard.updateCurrentInfos(infos, responseCheckpoint.getSeqNo());
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
Expand Down Expand Up @@ -192,6 +205,16 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo,
});
}

/**
* This method formats our byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be
* passed to SegmentInfos.readCommit
*/
private ChecksumIndexInput toIndexInput(byte[] input) {
return new BufferedChecksumIndexInput(
new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos")
);
}

public long getReplicationId() {
return replicationId;
}
Expand Down

0 comments on commit 973ae50

Please sign in to comment.