From d51a440e94b0f4b34b8521218496de14ac29bf80 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 3 Mar 2022 16:53:16 -0800 Subject: [PATCH 1/3] Ensure replica's store always contains the previous commit point. This change: 1. Updates the cleanup and validation steps after a replication event occurs to prevent deleting files still referenced by both the on disk segments_N file and the in memory SegmentInfos. 2. Sends metadata diff of on disk segments with each copy event. This allows replicas that are multiple commit points behind to catch up. 3. Update initial recovery in IndexShard to copy segments before lighting up as active. This fixes bug where replicas could not be added after primary. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 82 +++++++++++++++-- .../org/opensearch/index/engine/Engine.java | 2 +- .../index/engine/InternalEngine.java | 17 +--- .../opensearch/index/shard/IndexShard.java | 36 +++----- .../org/opensearch/index/store/Store.java | 43 +++++++++ .../cluster/IndicesClusterStateService.java | 2 +- .../SegmentReplicationReplicaService.java | 35 ++++++- .../TransportCheckpointInfoResponse.java | 12 ++- .../indices/replication/copy/CopyState.java | 35 +++++++ .../copy/PrimaryShardReplicationHandler.java | 9 +- .../SegmentReplicationPrimaryService.java | 28 +++--- .../copy/SegmentReplicationTarget.java | 91 +++++++++++++------ 12 files changed, 297 insertions(+), 95 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 970217444e20e..16d42a8264d7d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -8,12 +8,15 @@ package org.opensearch.indices.replication; +import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -23,17 +26,21 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase { private static final int SHARD_COUNT = 1; private static final int REPLICA_COUNT = 1; + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) + .build(); + } + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); - createIndex( - INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) - .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) - .build() - ); + createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); @@ -56,4 +63,63 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); } + + public void testReplicationAfterForceMerge() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) { + waitForDocs(initialDocCount, indexer); + } + flush(INDEX_NAME); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int totalDocs = initialDocCount + additionalDocCount; + try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) { + waitForDocs(additionalDocCount, indexer); + } + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + // This case tests that replicas preserve these files so the local store is not corrupt. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); + } + + public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { + final String nodeA = internalCluster().startNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + ensureGreen(INDEX_NAME); + + // Index a doc to create the first set of segments. _s1.si + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + // Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si) + flushAndRefresh(INDEX_NAME); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + // Index to create another segment + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + // This case tests that we are still sending these older segments to replicas so the index on disk is not corrupt. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + + final String nodeB = internalCluster().startNode(); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + ensureGreen(INDEX_NAME); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 70df9b14812de..86b3b835c88e5 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -235,7 +235,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } - public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {}; + public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {} public long getProcessedLocalCheckpoint() { return 0L; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index c86628225f3c7..11ec8c573b4a2 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -62,10 +62,6 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.BufferedChecksumIndexInput; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.ByteBuffersIndexInput; -import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; @@ -119,7 +115,6 @@ import java.io.Closeable; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -341,21 +336,13 @@ public InternalEngine(EngineConfig engineConfig) { } @Override - public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException { - assert engineConfig.isReadOnly() == true : "Only read-only replicas should update Infos"; - SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen); - assert gen == infos.getGeneration(); + public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { + assert engineConfig.isReadOnly() : "Only replicas should update Infos"; externalReaderManager.internalReaderManager.updateSegments(infos); externalReaderManager.maybeRefresh(); localCheckpointTracker.markSeqNoAsProcessed(seqNo); } - private ChecksumIndexInput toIndexInput(byte[] input) { - return new BufferedChecksumIndexInput( - new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") - ); - } - private LocalCheckpointTracker createLocalCheckpointTracker( BiFunction localCheckpointTrackerSupplier ) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a792a8914c817..2a9eac42e31b4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -166,7 +166,6 @@ import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.indices.replication.copy.ReplicationFailedException; import org.opensearch.indices.replication.copy.SegmentReplicationState; -import org.opensearch.indices.replication.copy.TrackShardResponse; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -1349,6 +1348,9 @@ public void rollTranslogGeneration() { } public void forceMerge(ForceMergeRequest forceMerge) throws IOException { + if (indexSettings.isSegrepEnabled() && shardRouting.primary() == false) { + return; + } verifyActive(); if (logger.isTraceEnabled()) { logger.trace("force merge with {}", forceMerge); @@ -1444,8 +1446,8 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { ); } - public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws IOException { - getEngine().updateCurrentInfos(infosBytes, gen, seqNo); + public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { + getEngine().updateCurrentInfos(infos, seqNo); } /** @@ -1952,6 +1954,9 @@ public void openEngineAndSkipTranslogRecovery() throws IOException { // TODO: Segrep - fix initial recovery stages from ReplicationTarget. if (indexSettings.isSegrepEnabled() == false) { recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + } else { + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); } loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); @@ -3011,25 +3016,13 @@ public void startRecovery( try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); if (indexSettings.isSegrepEnabled()) { - IndexShard indexShard = this; - segmentReplicationReplicaService.prepareForReplication( + markAsReplicating(); + segmentReplicationReplicaService.startRecovery( this, recoveryState.getTargetNode(), recoveryState.getSourceNode(), - new ActionListener() { - @Override - public void onResponse(TrackShardResponse unused) { - segRepListener.onReplicationDone(segRepState); - recoveryState.getIndex().setFileDetailsComplete(); - finalizeRecovery(); - postRecovery("Shard setup complete."); - } - - @Override - public void onFailure(Exception e) { - segRepListener.onReplicationFailure(segRepState, new ReplicationFailedException(indexShard, e), true); - } - } + replicationSource, + segRepListener ); } else { peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener); @@ -3669,14 +3662,13 @@ public synchronized void onNewCheckpoint( return; } if (isReplicating()) { - logger.info("Ignore - shard is currently replicating to a checkpoint"); + logger.debug("Ignore - shard is currently replicating to a checkpoint"); return; } try { markAsReplicating(); final ReplicationCheckpoint checkpoint = request.getCheckpoint(); logger.trace("Received new checkpoint {}", checkpoint); - // TODO: segrep - these are the states set after we perform our initial store recovery. segmentReplicationReplicaService.startReplication( checkpoint, this, @@ -3684,7 +3676,6 @@ public synchronized void onNewCheckpoint( new SegmentReplicationReplicaService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { - markReplicationComplete(); logger.debug("Replication complete to {}", getLatestReplicationCheckpoint()); } @@ -3694,7 +3685,6 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { - markReplicationComplete(); logger.error("Failure", e); } } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 34da6ea942140..2dea831be5d7b 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -700,6 +700,49 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } } + /** + * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. + * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. + * In this case files from both snapshots must be preserved. + * @param reason the reason for this cleanup operation logged for each deleted file + * @param remoteSnapshot The remote snapshot sent from primary shards. + * @param localSnapshot The local snapshot from in memory SegmentInfos. + * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + */ + public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException { + // fetch a snapshot from the latest on disk Segments_N file. This can be behind + // the passed in local in memory snapshot, so we want to ensure files it references are not removed. + final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null); + metadataLock.writeLock().lock(); + try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + for (String existingFile : directory.listAll()) { + if (Store.isAutogenerated(existingFile) + || remoteSnapshot.contains(existingFile) + || latestCommitPointMetadata.contains(existingFile)) { + // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete + // checksum) + continue; + } + try { + directory.deleteFile(reason, existingFile); + // FNF should not happen since we hold a write lock? + } catch (IOException ex) { + if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { + // TODO do we need to also fail this if we can't delete the pending commit file? + // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit + // point around? + throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); + } + logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); + // ignore, we don't really care, will get deleted later on + } + } + verifyAfterCleanup(remoteSnapshot, localSnapshot); + } finally { + metadataLock.writeLock().unlock(); + } + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index e09add708b14c..1a61e1b717f70 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -767,7 +767,7 @@ private ShardRoutingReplicationListener(final ShardRouting shardRouting, final l @Override public void onReplicationDone(final SegmentReplicationState state) { - logger.info("Shard setup complete, ready for segment copy."); + logger.trace("Shard setup complete, ready for segment copy."); shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java index b1334818e3b23..3a10afef67e85 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java @@ -15,6 +15,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListenerResponseHandler; +import org.opensearch.action.StepListener; import org.opensearch.action.support.RetryableAction; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.node.DiscoveryNode; @@ -30,6 +31,7 @@ import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardRecoveryException; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; @@ -149,7 +151,7 @@ private void setupReplicaShard(IndexShard indexShard) throws IndexShardRecoveryE ); store.associateIndexWithNewTranslog(translogUUID); indexShard.persistRetentionLeases(); - indexShard.openEngineAndRecoverFromTranslog(); + indexShard.openEngineAndSkipTranslogRecovery(); } catch (EngineException | IOException e) { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to start replica shard", e); } finally { @@ -202,6 +204,31 @@ private void doReplication(final long replicationId) { } } + /** + * Start the recovery of a shard using Segment Replication. This method will first setup the shard and then start segment copy. + * + * @param indexShard {@link IndexShard} The target IndexShard. + * @param targetNode {@link DiscoveryNode} The IndexShard's DiscoveryNode + * @param sourceNode {@link DiscoveryNode} The source node. + * @param replicationSource {@link PrimaryShardReplicationSource} The source from where segments will be retrieved. + * @param replicationListener {@link ReplicationListener} listener. + */ + public void startRecovery( + IndexShard indexShard, + DiscoveryNode targetNode, + DiscoveryNode sourceNode, + PrimaryShardReplicationSource replicationSource, + SegmentReplicationListener replicationListener + ) { + indexShard.markAsReplicating(); + StepListener trackShardListener = new StepListener<>(); + trackShardListener.whenComplete( + r -> { startReplication(indexShard.getLatestReplicationCheckpoint(), indexShard, replicationSource, replicationListener); }, + e -> { replicationListener.onFailure(indexShard.getReplicationState(), new ReplicationFailedException(indexShard, e), true); } + ); + prepareForReplication(indexShard, targetNode, sourceNode, trackShardListener); + } + class ReplicationRunner extends AbstractRunnable { final long replicationId; @@ -258,6 +285,12 @@ private ReplicationResponseHandler(final long id, final IndexShard shard, final public void onResponse(ReplicationResponse replicationResponse) { // final TimeValue replicationTime = new TimeValue(timer.time()); logger.trace("Replication complete {}", replicationId); + if (shard.state() != IndexShardState.STARTED) { + // The first time our shard is set up we need to mark its recovery complete. + shard.recoveryState().getIndex().setFileDetailsComplete(); + shard.finalizeRecovery(); + shard.postRecovery("Shard setup complete."); + } onGoingReplications.markReplicationAsDone(replicationId); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java index bacafbde36cfc..29b4b3c5707c5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java @@ -11,31 +11,37 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.transport.TransportResponse; import java.io.IOException; +import java.util.Set; public class TransportCheckpointInfoResponse extends TransportResponse { private final ReplicationCheckpoint checkpoint; private final Store.MetadataSnapshot snapshot; private final byte[] infosBytes; + private final Set additionalFiles; public TransportCheckpointInfoResponse( final ReplicationCheckpoint checkpoint, final Store.MetadataSnapshot snapshot, - final byte[] infosBytes + final byte[] infosBytes, + final Set additionalFiles ) { this.checkpoint = checkpoint; this.snapshot = snapshot; this.infosBytes = infosBytes; + this.additionalFiles = additionalFiles; } public TransportCheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); this.snapshot = new Store.MetadataSnapshot(in); this.infosBytes = in.readByteArray(); + this.additionalFiles = in.readSet(StoreFileMetadata::new); } @Override @@ -43,6 +49,7 @@ public void writeTo(StreamOutput out) throws IOException { checkpoint.writeTo(out); snapshot.writeTo(out); out.writeByteArray(infosBytes); + out.writeCollection(additionalFiles); } public ReplicationCheckpoint getCheckpoint() { @@ -57,4 +64,7 @@ public byte[] getInfosBytes() { return infosBytes; } + public Set getAdditionalFiles() { + return additionalFiles; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java index 7d57cf8188d16..05b467bd50abb 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java @@ -8,26 +8,39 @@ package org.opensearch.indices.replication.copy; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashSet; +import java.util.Set; public class CopyState extends AbstractRefCounted { + private static final Logger logger = LogManager.getLogger(CopyState.class); + private final GatedCloseable segmentInfosRef; private final ReplicationCheckpoint checkpoint; private final Store.MetadataSnapshot metadataSnapshot; + private final GatedCloseable commitRef; + private final Set pendingDeleteFiles; private final byte[] infosBytes; + private final ShardId shardId; CopyState(IndexShard shard) throws IOException { super("replication-nrt-state"); + this.shardId = shard.shardId(); this.segmentInfosRef = shard.getLatestSegmentInfosSafe(); final SegmentInfos segmentInfos = segmentInfosRef.get(); this.checkpoint = new ReplicationCheckpoint( @@ -36,7 +49,22 @@ public class CopyState extends AbstractRefCounted { segmentInfos.getGeneration(), shard.getProcessedLocalCheckpoint() ); + + // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. + // This ensures that the store on replicas is in sync with the store on primaries. + final GatedCloseable lastIndexCommit = shard.acquireLastIndexCommit(false); + final Store.MetadataSnapshot metadata = shard.store().getMetadata(lastIndexCommit.get()); this.metadataSnapshot = shard.store().getMetadata(segmentInfos); + final Store.RecoveryDiff diff = metadata.recoveryDiff(this.metadataSnapshot); + this.pendingDeleteFiles = new HashSet<>(diff.missing); + if (this.pendingDeleteFiles.isEmpty()) { + // If there are no additional files we can release the last commit immediately. + lastIndexCommit.close(); + this.commitRef = null; + } else { + this.commitRef = lastIndexCommit; + } + ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); try (ByteBuffersIndexOutput tmpIndexOutput = new ByteBuffersIndexOutput(buffer, "temporary", "temporary")) { segmentInfos.write(tmpIndexOutput); @@ -56,10 +84,17 @@ public byte[] getInfosBytes() { return infosBytes; } + public Set getPendingDeleteFiles() { + return pendingDeleteFiles; + } + @Override protected void closeInternal() { try { segmentInfosRef.close(); + if (commitRef != null) { + commitRef.close(); + } } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java index 2d44dfe6104e6..ee45994be1450 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java @@ -70,8 +70,9 @@ import java.util.Arrays; import java.util.Comparator; import java.util.Deque; +import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; @@ -157,12 +158,10 @@ public void sendFiles(CopyState copyState, ActionListener list final StepListener sendFileStep = new StepListener<>(); try { - // TODO: Segrep - Need validation here. - final Store.MetadataSnapshot metadataSnapshot = copyState.getMetadataSnapshot(); - final Map metadataMap = metadataSnapshot.asMap(); + Set storeFiles = new HashSet<>(Arrays.asList(shard.store().directory().listAll())); final StoreFileMetadata[] storeFileMetadata = request.getFilesToFetch() .stream() - .filter(file -> metadataMap.containsKey(file.name())) + .filter(file -> storeFiles.contains(file.name())) .toArray(StoreFileMetadata[]::new); sendFiles(shard.store(), storeFileMetadata, sendFileStep); } catch (final Exception e) { diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java index d787dc76aaef0..58ca0c0745c56 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java @@ -56,12 +56,14 @@ import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; +import org.opensearch.transport.Transports; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; /** * Request handlers for the primary shard during segment copy. @@ -130,7 +132,7 @@ public boolean hasCheckpoint(ReplicationCheckpoint checkpoint) { return checkpointCopyState.containsKey(checkpoint); } - public void removeCopyState(ReplicationCheckpoint checkpoint) { + public synchronized void removeCopyState(ReplicationCheckpoint checkpoint) { final Optional nrtCopyState = Optional.ofNullable(checkpointCopyState.get(checkpoint)); nrtCopyState.ifPresent((state) -> { if (state.decRef()) { @@ -141,7 +143,7 @@ public void removeCopyState(ReplicationCheckpoint checkpoint) { } } - private CopyState getCopyState(ReplicationCheckpoint checkpoint) throws IOException { + private synchronized CopyState getCopyState(ReplicationCheckpoint checkpoint) throws IOException { if (commitCache.hasCheckpoint(checkpoint)) { final CopyState copyState = commitCache.getCopyStateForCheckpoint(checkpoint); copyState.incRef(); @@ -165,7 +167,12 @@ public void messageReceived(StartReplicationRequest request, TransportChannel ch logger.trace("Received request for checkpoint {}", checkpoint); final CopyState copyState = getCopyState(checkpoint); channel.sendResponse( - new TransportCheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataSnapshot(), copyState.getInfosBytes()) + new TransportCheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataSnapshot(), + copyState.getInfosBytes(), + copyState.getPendingDeleteFiles() + ) ); } } @@ -228,7 +235,10 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel, throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } final StepListener addRetentionLeaseStep = new StepListener<>(); - final StepListener responseListener = new StepListener<>(); + final Consumer onFailure = e -> { + assert Transports.assertNotTransportThread(this + "[onFailure]"); + logger.error("Failure", e); + }; PrimaryShardReplicationHandler.runUnderPrimaryPermit( () -> shard.cloneLocalPeerRecoveryRetentionLease( request.getTargetNode().getId(), @@ -247,7 +257,6 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel, new CancellableThreads(), logger ); - PrimaryShardReplicationHandler.runUnderPrimaryPermit( () -> shard.updateLocalCheckpointForShard(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), shardId + " marking " + targetAllocationId + " as in sync", @@ -255,15 +264,8 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel, new CancellableThreads(), logger ); - PrimaryShardReplicationHandler.runUnderPrimaryPermit( - () -> shard.markAllocationIdAsInSync(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), - shardId + " marking " + targetAllocationId + " as in sync", - shard, - new CancellableThreads(), - logger - ); channel.sendResponse(new TrackShardResponse()); - }, responseListener::onFailure); + }, onFailure); } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index d3179692a2bb4..4796fae328edb 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -35,24 +35,31 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.Directory; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.common.lucene.Lucene; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoveryIndex; -import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.TransportCheckpointInfoResponse; import org.opensearch.indices.replication.common.ReplicationTarget; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -135,7 +142,33 @@ public void startReplication(ActionListener listener) { finalizeListener.whenComplete(r -> listener.onResponse(new ReplicationResponse()), listener::onFailure); } - public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, ActionListener listener) { + private void getFiles(TransportCheckpointInfoResponse checkpointInfo, StepListener getFilesListener) + throws IOException { + final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot(); + Store.MetadataSnapshot localMetadata = getMetadataSnapshot(); + final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata); + logger.debug("Recovery diff {}", diff); + final List filesToFetch = Stream.concat(diff.missing.stream(), diff.different.stream()) + .collect(Collectors.toList()); + + Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); + final Set additionalFiles = checkpointInfo.getAdditionalFiles() + .stream() + .filter(f -> storeFiles.contains(f.name()) == false) + .collect(Collectors.toSet()); + + filesToFetch.addAll(additionalFiles); + + for (StoreFileMetadata file : filesToFetch) { + state.getIndex().addFileDetail(file.name(), file.length(), false); + } + if (filesToFetch.isEmpty()) { + getFilesListener.onResponse(new GetFilesResponse()); + } + source.getFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, getFilesListener); + } + + private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { ActionListener.completeWith(listener, () -> { // first, we go and move files that were created with the recovery id suffix to // the actual names, its ok if we have a corrupted index here, since we have replicas @@ -144,7 +177,6 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, final Store store = store(); store.incRef(); try { - store.cleanupAndVerify("recovery CleanFilesRequestHandler", checkpointInfo.getSnapshot()); if (indexShard.getRetentionLeases().leases().isEmpty()) { // if empty, may be a fresh IndexShard, so write an empty leases file to disk indexShard.persistRetentionLeases(); @@ -152,13 +184,24 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, } else { assert indexShard.assertRetentionLeasesPersisted(); } - final long segmentsGen = checkpointInfo.getCheckpoint().getSegmentsGen(); - // force an fsync if we are receiving a new gen. - if (segmentsGen > indexShard.getLatestSegmentInfos().getGeneration()) { - final Directory directory = store().directory(); - directory.sync(Arrays.asList(directory.listAll())); - } - indexShard.updateCurrentInfos(segmentsGen, checkpointInfo.getInfosBytes(), checkpointInfo.getCheckpoint().getSeqNo()); + + // Deserialize the new SegmentInfos object sent primary the primary. + final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); + SegmentInfos infos = SegmentInfos.readCommit( + store.directory(), + toIndexInput(checkpointInfoResponse.getInfosBytes()), + responseCheckpoint.getSegmentsGen() + ); + // clean up the local store of old segment files + // and validate the latest segment infos against the snapshot sent from the primary shard. + store.cleanupAndVerify( + "finalize - clean with in memory infos", + checkpointInfoResponse.getSnapshot(), + store.getMetadata(infos) + ); + + // Update the current infos reference on the Engine's reader. + indexShard.updateCurrentInfos(infos, responseCheckpoint.getSeqNo()); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are @@ -184,30 +227,24 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, throw rfe; } finally { store.decRef(); + indexShard.markReplicationComplete(); } return null; }); } - private void getFiles(TransportCheckpointInfoResponse checkpointInfo, StepListener getFilesListener) - throws IOException { - final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot(); - Store.MetadataSnapshot localMetadata = getMetadataSnapshot(); - final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata); - logger.debug("Recovery diff {}", diff); - final List filesToFetch = Stream.concat(diff.missing.stream(), diff.different.stream()) - .collect(Collectors.toList()); - for (StoreFileMetadata file : filesToFetch) { - state.getIndex().addFileDetail(file.name(), file.length(), false); - } - if (filesToFetch.isEmpty()) { - getFilesListener.onResponse(new GetFilesResponse()); - } - source.getFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, getFilesListener); + /** + * This method formats our byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be + * passed to SegmentInfos.readCommit + */ + private ChecksumIndexInput toIndexInput(byte[] input) { + return new BufferedChecksumIndexInput( + new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") + ); } private Store.MetadataSnapshot getMetadataSnapshot() throws IOException { - if (indexShard.recoveryState().getStage() == RecoveryState.Stage.INIT) { + if (indexShard.state().equals(IndexShardState.STARTED) == false) { return Store.MetadataSnapshot.EMPTY; } return store.getMetadata(indexShard.getLatestSegmentInfos()); From 7d5ab35033164154c4a224b36b69f56d0b7f9337 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 22 Mar 2022 16:47:53 -0700 Subject: [PATCH 2/3] Chagnes from PR feedback. Signed-off-by: Marc Handalian --- .../main/java/org/opensearch/index/shard/IndexShard.java | 2 -- .../replication/SegmentReplicationReplicaService.java | 1 + .../org/opensearch/indices/replication/copy/CopyState.java | 6 +----- .../indices/replication/copy/SegmentReplicationTarget.java | 7 +++---- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 2a9eac42e31b4..b0f46029ccfaf 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3016,7 +3016,6 @@ public void startRecovery( try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); if (indexSettings.isSegrepEnabled()) { - markAsReplicating(); segmentReplicationReplicaService.startRecovery( this, recoveryState.getTargetNode(), @@ -3666,7 +3665,6 @@ public synchronized void onNewCheckpoint( return; } try { - markAsReplicating(); final ReplicationCheckpoint checkpoint = request.getCheckpoint(); logger.trace("Received new checkpoint {}", checkpoint); segmentReplicationReplicaService.startReplication( diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java index 3a10afef67e85..ea4c4a04338ab 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java @@ -165,6 +165,7 @@ public void startReplication( PrimaryShardReplicationSource source, final SegmentReplicationListener listener ) { + indexShard.markAsReplicating(); final long replicationId = onGoingReplications.startReplication( checkpoint, indexShard, diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java index 05b467bd50abb..1fa599c4bda49 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java @@ -8,8 +8,6 @@ package org.opensearch.indices.replication.copy; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; @@ -28,8 +26,6 @@ public class CopyState extends AbstractRefCounted { - private static final Logger logger = LogManager.getLogger(CopyState.class); - private final GatedCloseable segmentInfosRef; private final ReplicationCheckpoint checkpoint; private final Store.MetadataSnapshot metadataSnapshot; @@ -44,7 +40,7 @@ public class CopyState extends AbstractRefCounted { this.segmentInfosRef = shard.getLatestSegmentInfosSafe(); final SegmentInfos segmentInfos = segmentInfosRef.get(); this.checkpoint = new ReplicationCheckpoint( - shard.shardId(), + shardId, shard.getOperationPrimaryTerm(), segmentInfos.getGeneration(), shard.getProcessedLocalCheckpoint() diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index 4796fae328edb..b34207c5fd355 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -100,17 +100,17 @@ protected String getPrefix() { @Override protected void onDone() { - // might need to do something on index shard here. + indexShard.markReplicationComplete(); } @Override protected void onCancel(String reason) { - // TBD + indexShard.markReplicationComplete(); } @Override protected void onFail(OpenSearchException e, boolean sendShardFailure) { - // TBD + indexShard.markReplicationComplete(); } /** @@ -227,7 +227,6 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR throw rfe; } finally { store.decRef(); - indexShard.markReplicationComplete(); } return null; }); From 169cfc4333050947549f2df2be52af46b5297d61 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 24 Mar 2022 13:38:38 -0700 Subject: [PATCH 3/3] More PR review changes. - Updated TrackShardRequestHandler to send error case back to replicas. - Renamed additionalFiles to pendingDeleteFiles in TransportCheckpointInfoResponse. - Refactored Store.cleanupAndVerify methods to remove duplication. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 1 + .../opensearch/index/shard/IndexShard.java | 3 + .../org/opensearch/index/store/Store.java | 74 ++++++++----------- .../TransportCheckpointInfoResponse.java | 14 ++-- .../SegmentReplicationPrimaryService.java | 16 +++- .../copy/SegmentReplicationTarget.java | 6 +- 6 files changed, 61 insertions(+), 53 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 16d42a8264d7d..f7d0de08ed65c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -75,6 +75,7 @@ public void testReplicationAfterForceMerge() throws Exception { waitForDocs(initialDocCount, indexer); } flush(INDEX_NAME); + // wait a short amount of time to give replication a chance to complete. Thread.sleep(1000); assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b0f46029ccfaf..b1258fe3f1769 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1349,6 +1349,7 @@ public void rollTranslogGeneration() { public void forceMerge(ForceMergeRequest forceMerge) throws IOException { if (indexSettings.isSegrepEnabled() && shardRouting.primary() == false) { + // With segment replication enabled, replicas do not perform this operation. return; } verifyActive(); @@ -3016,6 +3017,8 @@ public void startRecovery( try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); if (indexSettings.isSegrepEnabled()) { + // Start a "Recovery" using segment replication. This ensures the shard is tracked by the primary + // and started with the latest set of segments. segmentReplicationReplicaService.startRecovery( this, recoveryState.getTargetNode(), diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 2dea831be5d7b..e89abab2d1c49 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -64,6 +64,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.Streams; @@ -672,26 +673,7 @@ private static void failIfCorrupted(Directory directory) throws IOException { public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) throws IOException { metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - for (String existingFile : directory.listAll()) { - if (Store.isAutogenerated(existingFile) || sourceMetadata.contains(existingFile)) { - // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete - // checksum) - continue; - } - try { - directory.deleteFile(reason, existingFile); - // FNF should not happen since we hold a write lock? - } catch (IOException ex) { - if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { - // TODO do we need to also fail this if we can't delete the pending commit file? - // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit - // point around? - throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); - } - logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); - // ignore, we don't really care, will get deleted later on - } - } + cleanupFiles(reason, sourceMetadata, null); directory.syncMetaData(); final Store.MetadataSnapshot metadataOrEmpty = getMetadata((IndexCommit) null); verifyAfterCleanup(sourceMetadata, metadataOrEmpty); @@ -712,37 +694,43 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException { // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. - final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null); metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - for (String existingFile : directory.listAll()) { - if (Store.isAutogenerated(existingFile) - || remoteSnapshot.contains(existingFile) - || latestCommitPointMetadata.contains(existingFile)) { - // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete - // checksum) - continue; - } - try { - directory.deleteFile(reason, existingFile); - // FNF should not happen since we hold a write lock? - } catch (IOException ex) { - if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { - // TODO do we need to also fail this if we can't delete the pending commit file? - // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit - // point around? - throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); - } - logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); - // ignore, we don't really care, will get deleted later on - } - } + final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null); + cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata); verifyAfterCleanup(remoteSnapshot, localSnapshot); } finally { metadataLock.writeLock().unlock(); } } + private void cleanupFiles(String reason, MetadataSnapshot remoteSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + throws IOException { + assert metadataLock.isWriteLockedByCurrentThread(); + for (String existingFile : directory.listAll()) { + if (Store.isAutogenerated(existingFile) + || remoteSnapshot.contains(existingFile) + || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { + // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete + // checksum) + continue; + } + try { + directory.deleteFile(reason, existingFile); + // FNF should not happen since we hold a write lock? + } catch (IOException ex) { + if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { + // TODO do we need to also fail this if we can't delete the pending commit file? + // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit + // point around? + throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); + } + logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); + // ignore, we don't really care, will get deleted later on + } + } + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java index 29b4b3c5707c5..77f8643a5dce1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java @@ -23,7 +23,9 @@ public class TransportCheckpointInfoResponse extends TransportResponse { private final ReplicationCheckpoint checkpoint; private final Store.MetadataSnapshot snapshot; private final byte[] infosBytes; - private final Set additionalFiles; + // pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos + // but are still referenced by the latest commit point (Segments_N). + private final Set pendingDeleteFiles; public TransportCheckpointInfoResponse( final ReplicationCheckpoint checkpoint, @@ -34,14 +36,14 @@ public TransportCheckpointInfoResponse( this.checkpoint = checkpoint; this.snapshot = snapshot; this.infosBytes = infosBytes; - this.additionalFiles = additionalFiles; + this.pendingDeleteFiles = additionalFiles; } public TransportCheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); this.snapshot = new Store.MetadataSnapshot(in); this.infosBytes = in.readByteArray(); - this.additionalFiles = in.readSet(StoreFileMetadata::new); + this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new); } @Override @@ -49,7 +51,7 @@ public void writeTo(StreamOutput out) throws IOException { checkpoint.writeTo(out); snapshot.writeTo(out); out.writeByteArray(infosBytes); - out.writeCollection(additionalFiles); + out.writeCollection(pendingDeleteFiles); } public ReplicationCheckpoint getCheckpoint() { @@ -64,7 +66,7 @@ public byte[] getInfosBytes() { return infosBytes; } - public Set getAdditionalFiles() { - return additionalFiles; + public Set getPendingDeleteFiles() { + return pendingDeleteFiles; } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java index 58ca0c0745c56..4f0818a71bbc3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.action.support.ChannelActionListener; @@ -237,7 +238,20 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel, final StepListener addRetentionLeaseStep = new StepListener<>(); final Consumer onFailure = e -> { assert Transports.assertNotTransportThread(this + "[onFailure]"); - logger.error("Failure", e); + logger.error( + new ParameterizedMessage( + "Error marking shard {} as tracked for allocation ID {}", + shardId, + request.getTargetAllocationId() + ), + e + ); + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn("failed to send back failure on track shard request", inner); + } }; PrimaryShardReplicationHandler.runUnderPrimaryPermit( () -> shard.cloneLocalPeerRecoveryRetentionLease( diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index b34207c5fd355..2c8d1e19c8c92 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -152,12 +152,12 @@ private void getFiles(TransportCheckpointInfoResponse checkpointInfo, StepListen .collect(Collectors.toList()); Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); - final Set additionalFiles = checkpointInfo.getAdditionalFiles() + final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() .stream() .filter(f -> storeFiles.contains(f.name()) == false) .collect(Collectors.toSet()); - filesToFetch.addAll(additionalFiles); + filesToFetch.addAll(pendingDeleteFiles); for (StoreFileMetadata file : filesToFetch) { state.getIndex().addFileDetail(file.name(), file.length(), false); @@ -185,7 +185,7 @@ private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoR assert indexShard.assertRetentionLeasesPersisted(); } - // Deserialize the new SegmentInfos object sent primary the primary. + // Deserialize the new SegmentInfos object sent from the primary. final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); SegmentInfos infos = SegmentInfos.readCommit( store.directory(),