diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 970217444e20e..f7d0de08ed65c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -8,12 +8,15 @@ package org.opensearch.indices.replication; +import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -23,17 +26,21 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase { private static final int SHARD_COUNT = 1; private static final int REPLICA_COUNT = 1; + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) + .build(); + } + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(); final String nodeB = internalCluster().startNode(); - createIndex( - INDEX_NAME, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) - .put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true) - .build() - ); + createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); final int initialDocCount = scaledRandomIntBetween(0, 200); @@ -56,4 +63,64 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); } + + public void testReplicationAfterForceMerge() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) { + waitForDocs(initialDocCount, indexer); + } + flush(INDEX_NAME); + // wait a short amount of time to give replication a chance to complete. + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int totalDocs = initialDocCount + additionalDocCount; + try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) { + waitForDocs(additionalDocCount, indexer); + } + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + // This case tests that replicas preserve these files so the local store is not corrupt. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs); + } + + public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception { + final String nodeA = internalCluster().startNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + ensureGreen(INDEX_NAME); + + // Index a doc to create the first set of segments. _s1.si + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + // Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si) + flushAndRefresh(INDEX_NAME); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + // Index to create another segment + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get(); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + // This case tests that we are still sending these older segments to replicas so the index on disk is not corrupt. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + + final String nodeB = internalCluster().startNode(); + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + ensureGreen(INDEX_NAME); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 70df9b14812de..86b3b835c88e5 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -235,7 +235,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } - public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {}; + public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {} public long getProcessedLocalCheckpoint() { return 0L; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index c86628225f3c7..11ec8c573b4a2 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -62,10 +62,6 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.BufferedChecksumIndexInput; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.ByteBuffersIndexInput; -import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; @@ -119,7 +115,6 @@ import java.io.Closeable; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -341,21 +336,13 @@ public InternalEngine(EngineConfig engineConfig) { } @Override - public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException { - assert engineConfig.isReadOnly() == true : "Only read-only replicas should update Infos"; - SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen); - assert gen == infos.getGeneration(); + public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { + assert engineConfig.isReadOnly() : "Only replicas should update Infos"; externalReaderManager.internalReaderManager.updateSegments(infos); externalReaderManager.maybeRefresh(); localCheckpointTracker.markSeqNoAsProcessed(seqNo); } - private ChecksumIndexInput toIndexInput(byte[] input) { - return new BufferedChecksumIndexInput( - new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") - ); - } - private LocalCheckpointTracker createLocalCheckpointTracker( BiFunction localCheckpointTrackerSupplier ) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a792a8914c817..b1258fe3f1769 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -166,7 +166,6 @@ import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.indices.replication.copy.ReplicationFailedException; import org.opensearch.indices.replication.copy.SegmentReplicationState; -import org.opensearch.indices.replication.copy.TrackShardResponse; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -1349,6 +1348,10 @@ public void rollTranslogGeneration() { } public void forceMerge(ForceMergeRequest forceMerge) throws IOException { + if (indexSettings.isSegrepEnabled() && shardRouting.primary() == false) { + // With segment replication enabled, replicas do not perform this operation. + return; + } verifyActive(); if (logger.isTraceEnabled()) { logger.trace("force merge with {}", forceMerge); @@ -1444,8 +1447,8 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { ); } - public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws IOException { - getEngine().updateCurrentInfos(infosBytes, gen, seqNo); + public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException { + getEngine().updateCurrentInfos(infos, seqNo); } /** @@ -1952,6 +1955,9 @@ public void openEngineAndSkipTranslogRecovery() throws IOException { // TODO: Segrep - fix initial recovery stages from ReplicationTarget. if (indexSettings.isSegrepEnabled() == false) { recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + } else { + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); } loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); @@ -3011,25 +3017,14 @@ public void startRecovery( try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); if (indexSettings.isSegrepEnabled()) { - IndexShard indexShard = this; - segmentReplicationReplicaService.prepareForReplication( + // Start a "Recovery" using segment replication. This ensures the shard is tracked by the primary + // and started with the latest set of segments. + segmentReplicationReplicaService.startRecovery( this, recoveryState.getTargetNode(), recoveryState.getSourceNode(), - new ActionListener() { - @Override - public void onResponse(TrackShardResponse unused) { - segRepListener.onReplicationDone(segRepState); - recoveryState.getIndex().setFileDetailsComplete(); - finalizeRecovery(); - postRecovery("Shard setup complete."); - } - - @Override - public void onFailure(Exception e) { - segRepListener.onReplicationFailure(segRepState, new ReplicationFailedException(indexShard, e), true); - } - } + replicationSource, + segRepListener ); } else { peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener); @@ -3669,14 +3664,12 @@ public synchronized void onNewCheckpoint( return; } if (isReplicating()) { - logger.info("Ignore - shard is currently replicating to a checkpoint"); + logger.debug("Ignore - shard is currently replicating to a checkpoint"); return; } try { - markAsReplicating(); final ReplicationCheckpoint checkpoint = request.getCheckpoint(); logger.trace("Received new checkpoint {}", checkpoint); - // TODO: segrep - these are the states set after we perform our initial store recovery. segmentReplicationReplicaService.startReplication( checkpoint, this, @@ -3684,7 +3677,6 @@ public synchronized void onNewCheckpoint( new SegmentReplicationReplicaService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { - markReplicationComplete(); logger.debug("Replication complete to {}", getLatestReplicationCheckpoint()); } @@ -3694,7 +3686,6 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { - markReplicationComplete(); logger.error("Failure", e); } } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 34da6ea942140..e89abab2d1c49 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -64,6 +64,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.Streams; @@ -672,26 +673,7 @@ private static void failIfCorrupted(Directory directory) throws IOException { public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) throws IOException { metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - for (String existingFile : directory.listAll()) { - if (Store.isAutogenerated(existingFile) || sourceMetadata.contains(existingFile)) { - // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete - // checksum) - continue; - } - try { - directory.deleteFile(reason, existingFile); - // FNF should not happen since we hold a write lock? - } catch (IOException ex) { - if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { - // TODO do we need to also fail this if we can't delete the pending commit file? - // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit - // point around? - throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); - } - logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); - // ignore, we don't really care, will get deleted later on - } - } + cleanupFiles(reason, sourceMetadata, null); directory.syncMetaData(); final Store.MetadataSnapshot metadataOrEmpty = getMetadata((IndexCommit) null); verifyAfterCleanup(sourceMetadata, metadataOrEmpty); @@ -700,6 +682,55 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } } + /** + * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. + * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. + * In this case files from both snapshots must be preserved. + * @param reason the reason for this cleanup operation logged for each deleted file + * @param remoteSnapshot The remote snapshot sent from primary shards. + * @param localSnapshot The local snapshot from in memory SegmentInfos. + * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + */ + public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException { + // fetch a snapshot from the latest on disk Segments_N file. This can be behind + // the passed in local in memory snapshot, so we want to ensure files it references are not removed. + metadataLock.writeLock().lock(); + try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null); + cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata); + verifyAfterCleanup(remoteSnapshot, localSnapshot); + } finally { + metadataLock.writeLock().unlock(); + } + } + + private void cleanupFiles(String reason, MetadataSnapshot remoteSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + throws IOException { + assert metadataLock.isWriteLockedByCurrentThread(); + for (String existingFile : directory.listAll()) { + if (Store.isAutogenerated(existingFile) + || remoteSnapshot.contains(existingFile) + || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { + // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete + // checksum) + continue; + } + try { + directory.deleteFile(reason, existingFile); + // FNF should not happen since we hold a write lock? + } catch (IOException ex) { + if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { + // TODO do we need to also fail this if we can't delete the pending commit file? + // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit + // point around? + throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); + } + logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); + // ignore, we don't really care, will get deleted later on + } + } + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index e09add708b14c..1a61e1b717f70 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -767,7 +767,7 @@ private ShardRoutingReplicationListener(final ShardRouting shardRouting, final l @Override public void onReplicationDone(final SegmentReplicationState state) { - logger.info("Shard setup complete, ready for segment copy."); + logger.trace("Shard setup complete, ready for segment copy."); shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java index b1334818e3b23..ea4c4a04338ab 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationReplicaService.java @@ -15,6 +15,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListenerResponseHandler; +import org.opensearch.action.StepListener; import org.opensearch.action.support.RetryableAction; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.node.DiscoveryNode; @@ -30,6 +31,7 @@ import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardRecoveryException; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; @@ -149,7 +151,7 @@ private void setupReplicaShard(IndexShard indexShard) throws IndexShardRecoveryE ); store.associateIndexWithNewTranslog(translogUUID); indexShard.persistRetentionLeases(); - indexShard.openEngineAndRecoverFromTranslog(); + indexShard.openEngineAndSkipTranslogRecovery(); } catch (EngineException | IOException e) { throw new IndexShardRecoveryException(indexShard.shardId(), "failed to start replica shard", e); } finally { @@ -163,6 +165,7 @@ public void startReplication( PrimaryShardReplicationSource source, final SegmentReplicationListener listener ) { + indexShard.markAsReplicating(); final long replicationId = onGoingReplications.startReplication( checkpoint, indexShard, @@ -202,6 +205,31 @@ private void doReplication(final long replicationId) { } } + /** + * Start the recovery of a shard using Segment Replication. This method will first setup the shard and then start segment copy. + * + * @param indexShard {@link IndexShard} The target IndexShard. + * @param targetNode {@link DiscoveryNode} The IndexShard's DiscoveryNode + * @param sourceNode {@link DiscoveryNode} The source node. + * @param replicationSource {@link PrimaryShardReplicationSource} The source from where segments will be retrieved. + * @param replicationListener {@link ReplicationListener} listener. + */ + public void startRecovery( + IndexShard indexShard, + DiscoveryNode targetNode, + DiscoveryNode sourceNode, + PrimaryShardReplicationSource replicationSource, + SegmentReplicationListener replicationListener + ) { + indexShard.markAsReplicating(); + StepListener trackShardListener = new StepListener<>(); + trackShardListener.whenComplete( + r -> { startReplication(indexShard.getLatestReplicationCheckpoint(), indexShard, replicationSource, replicationListener); }, + e -> { replicationListener.onFailure(indexShard.getReplicationState(), new ReplicationFailedException(indexShard, e), true); } + ); + prepareForReplication(indexShard, targetNode, sourceNode, trackShardListener); + } + class ReplicationRunner extends AbstractRunnable { final long replicationId; @@ -258,6 +286,12 @@ private ReplicationResponseHandler(final long id, final IndexShard shard, final public void onResponse(ReplicationResponse replicationResponse) { // final TimeValue replicationTime = new TimeValue(timer.time()); logger.trace("Replication complete {}", replicationId); + if (shard.state() != IndexShardState.STARTED) { + // The first time our shard is set up we need to mark its recovery complete. + shard.recoveryState().getIndex().setFileDetailsComplete(); + shard.finalizeRecovery(); + shard.postRecovery("Shard setup complete."); + } onGoingReplications.markReplicationAsDone(replicationId); } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java index bacafbde36cfc..77f8643a5dce1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/TransportCheckpointInfoResponse.java @@ -11,31 +11,39 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.copy.ReplicationCheckpoint; import org.opensearch.transport.TransportResponse; import java.io.IOException; +import java.util.Set; public class TransportCheckpointInfoResponse extends TransportResponse { private final ReplicationCheckpoint checkpoint; private final Store.MetadataSnapshot snapshot; private final byte[] infosBytes; + // pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos + // but are still referenced by the latest commit point (Segments_N). + private final Set pendingDeleteFiles; public TransportCheckpointInfoResponse( final ReplicationCheckpoint checkpoint, final Store.MetadataSnapshot snapshot, - final byte[] infosBytes + final byte[] infosBytes, + final Set additionalFiles ) { this.checkpoint = checkpoint; this.snapshot = snapshot; this.infosBytes = infosBytes; + this.pendingDeleteFiles = additionalFiles; } public TransportCheckpointInfoResponse(StreamInput in) throws IOException { this.checkpoint = new ReplicationCheckpoint(in); this.snapshot = new Store.MetadataSnapshot(in); this.infosBytes = in.readByteArray(); + this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new); } @Override @@ -43,6 +51,7 @@ public void writeTo(StreamOutput out) throws IOException { checkpoint.writeTo(out); snapshot.writeTo(out); out.writeByteArray(infosBytes); + out.writeCollection(pendingDeleteFiles); } public ReplicationCheckpoint getCheckpoint() { @@ -57,4 +66,7 @@ public byte[] getInfosBytes() { return infosBytes; } + public Set getPendingDeleteFiles() { + return pendingDeleteFiles; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java index 7d57cf8188d16..1fa599c4bda49 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/CopyState.java @@ -8,35 +8,59 @@ package org.opensearch.indices.replication.copy; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashSet; +import java.util.Set; public class CopyState extends AbstractRefCounted { private final GatedCloseable segmentInfosRef; private final ReplicationCheckpoint checkpoint; private final Store.MetadataSnapshot metadataSnapshot; + private final GatedCloseable commitRef; + private final Set pendingDeleteFiles; private final byte[] infosBytes; + private final ShardId shardId; CopyState(IndexShard shard) throws IOException { super("replication-nrt-state"); + this.shardId = shard.shardId(); this.segmentInfosRef = shard.getLatestSegmentInfosSafe(); final SegmentInfos segmentInfos = segmentInfosRef.get(); this.checkpoint = new ReplicationCheckpoint( - shard.shardId(), + shardId, shard.getOperationPrimaryTerm(), segmentInfos.getGeneration(), shard.getProcessedLocalCheckpoint() ); + + // Send files that are merged away in the latest SegmentInfos but not in the latest on disk Segments_N. + // This ensures that the store on replicas is in sync with the store on primaries. + final GatedCloseable lastIndexCommit = shard.acquireLastIndexCommit(false); + final Store.MetadataSnapshot metadata = shard.store().getMetadata(lastIndexCommit.get()); this.metadataSnapshot = shard.store().getMetadata(segmentInfos); + final Store.RecoveryDiff diff = metadata.recoveryDiff(this.metadataSnapshot); + this.pendingDeleteFiles = new HashSet<>(diff.missing); + if (this.pendingDeleteFiles.isEmpty()) { + // If there are no additional files we can release the last commit immediately. + lastIndexCommit.close(); + this.commitRef = null; + } else { + this.commitRef = lastIndexCommit; + } + ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); try (ByteBuffersIndexOutput tmpIndexOutput = new ByteBuffersIndexOutput(buffer, "temporary", "temporary")) { segmentInfos.write(tmpIndexOutput); @@ -56,10 +80,17 @@ public byte[] getInfosBytes() { return infosBytes; } + public Set getPendingDeleteFiles() { + return pendingDeleteFiles; + } + @Override protected void closeInternal() { try { segmentInfosRef.close(); + if (commitRef != null) { + commitRef.close(); + } } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java index 2d44dfe6104e6..ee45994be1450 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/PrimaryShardReplicationHandler.java @@ -70,8 +70,9 @@ import java.util.Arrays; import java.util.Comparator; import java.util.Deque; +import java.util.HashSet; import java.util.List; -import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; @@ -157,12 +158,10 @@ public void sendFiles(CopyState copyState, ActionListener list final StepListener sendFileStep = new StepListener<>(); try { - // TODO: Segrep - Need validation here. - final Store.MetadataSnapshot metadataSnapshot = copyState.getMetadataSnapshot(); - final Map metadataMap = metadataSnapshot.asMap(); + Set storeFiles = new HashSet<>(Arrays.asList(shard.store().directory().listAll())); final StoreFileMetadata[] storeFileMetadata = request.getFilesToFetch() .stream() - .filter(file -> metadataMap.containsKey(file.name())) + .filter(file -> storeFiles.contains(file.name())) .toArray(StoreFileMetadata[]::new); sendFiles(shard.store(), storeFileMetadata, sendFileStep); } catch (final Exception e) { diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java index d787dc76aaef0..4f0818a71bbc3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationPrimaryService.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.action.support.ChannelActionListener; @@ -56,12 +57,14 @@ import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; +import org.opensearch.transport.Transports; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; /** * Request handlers for the primary shard during segment copy. @@ -130,7 +133,7 @@ public boolean hasCheckpoint(ReplicationCheckpoint checkpoint) { return checkpointCopyState.containsKey(checkpoint); } - public void removeCopyState(ReplicationCheckpoint checkpoint) { + public synchronized void removeCopyState(ReplicationCheckpoint checkpoint) { final Optional nrtCopyState = Optional.ofNullable(checkpointCopyState.get(checkpoint)); nrtCopyState.ifPresent((state) -> { if (state.decRef()) { @@ -141,7 +144,7 @@ public void removeCopyState(ReplicationCheckpoint checkpoint) { } } - private CopyState getCopyState(ReplicationCheckpoint checkpoint) throws IOException { + private synchronized CopyState getCopyState(ReplicationCheckpoint checkpoint) throws IOException { if (commitCache.hasCheckpoint(checkpoint)) { final CopyState copyState = commitCache.getCopyStateForCheckpoint(checkpoint); copyState.incRef(); @@ -165,7 +168,12 @@ public void messageReceived(StartReplicationRequest request, TransportChannel ch logger.trace("Received request for checkpoint {}", checkpoint); final CopyState copyState = getCopyState(checkpoint); channel.sendResponse( - new TransportCheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataSnapshot(), copyState.getInfosBytes()) + new TransportCheckpointInfoResponse( + copyState.getCheckpoint(), + copyState.getMetadataSnapshot(), + copyState.getInfosBytes(), + copyState.getPendingDeleteFiles() + ) ); } } @@ -228,7 +236,23 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel, throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } final StepListener addRetentionLeaseStep = new StepListener<>(); - final StepListener responseListener = new StepListener<>(); + final Consumer onFailure = e -> { + assert Transports.assertNotTransportThread(this + "[onFailure]"); + logger.error( + new ParameterizedMessage( + "Error marking shard {} as tracked for allocation ID {}", + shardId, + request.getTargetAllocationId() + ), + e + ); + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn("failed to send back failure on track shard request", inner); + } + }; PrimaryShardReplicationHandler.runUnderPrimaryPermit( () -> shard.cloneLocalPeerRecoveryRetentionLease( request.getTargetNode().getId(), @@ -247,7 +271,6 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel, new CancellableThreads(), logger ); - PrimaryShardReplicationHandler.runUnderPrimaryPermit( () -> shard.updateLocalCheckpointForShard(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), shardId + " marking " + targetAllocationId + " as in sync", @@ -255,15 +278,8 @@ public void messageReceived(TrackShardRequest request, TransportChannel channel, new CancellableThreads(), logger ); - PrimaryShardReplicationHandler.runUnderPrimaryPermit( - () -> shard.markAllocationIdAsInSync(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), - shardId + " marking " + targetAllocationId + " as in sync", - shard, - new CancellableThreads(), - logger - ); channel.sendResponse(new TrackShardResponse()); - }, responseListener::onFailure); + }, onFailure); } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java index d3179692a2bb4..2c8d1e19c8c92 100644 --- a/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/copy/SegmentReplicationTarget.java @@ -35,24 +35,31 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.Directory; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.common.lucene.Lucene; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoveryIndex; -import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.SegmentReplicationReplicaService; import org.opensearch.indices.replication.checkpoint.TransportCheckpointInfoResponse; import org.opensearch.indices.replication.common.ReplicationTarget; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -93,17 +100,17 @@ protected String getPrefix() { @Override protected void onDone() { - // might need to do something on index shard here. + indexShard.markReplicationComplete(); } @Override protected void onCancel(String reason) { - // TBD + indexShard.markReplicationComplete(); } @Override protected void onFail(OpenSearchException e, boolean sendShardFailure) { - // TBD + indexShard.markReplicationComplete(); } /** @@ -135,7 +142,33 @@ public void startReplication(ActionListener listener) { finalizeListener.whenComplete(r -> listener.onResponse(new ReplicationResponse()), listener::onFailure); } - public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, ActionListener listener) { + private void getFiles(TransportCheckpointInfoResponse checkpointInfo, StepListener getFilesListener) + throws IOException { + final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot(); + Store.MetadataSnapshot localMetadata = getMetadataSnapshot(); + final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata); + logger.debug("Recovery diff {}", diff); + final List filesToFetch = Stream.concat(diff.missing.stream(), diff.different.stream()) + .collect(Collectors.toList()); + + Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); + final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() + .stream() + .filter(f -> storeFiles.contains(f.name()) == false) + .collect(Collectors.toSet()); + + filesToFetch.addAll(pendingDeleteFiles); + + for (StoreFileMetadata file : filesToFetch) { + state.getIndex().addFileDetail(file.name(), file.length(), false); + } + if (filesToFetch.isEmpty()) { + getFilesListener.onResponse(new GetFilesResponse()); + } + source.getFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, getFilesListener); + } + + private void finalizeReplication(TransportCheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { ActionListener.completeWith(listener, () -> { // first, we go and move files that were created with the recovery id suffix to // the actual names, its ok if we have a corrupted index here, since we have replicas @@ -144,7 +177,6 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, final Store store = store(); store.incRef(); try { - store.cleanupAndVerify("recovery CleanFilesRequestHandler", checkpointInfo.getSnapshot()); if (indexShard.getRetentionLeases().leases().isEmpty()) { // if empty, may be a fresh IndexShard, so write an empty leases file to disk indexShard.persistRetentionLeases(); @@ -152,13 +184,24 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, } else { assert indexShard.assertRetentionLeasesPersisted(); } - final long segmentsGen = checkpointInfo.getCheckpoint().getSegmentsGen(); - // force an fsync if we are receiving a new gen. - if (segmentsGen > indexShard.getLatestSegmentInfos().getGeneration()) { - final Directory directory = store().directory(); - directory.sync(Arrays.asList(directory.listAll())); - } - indexShard.updateCurrentInfos(segmentsGen, checkpointInfo.getInfosBytes(), checkpointInfo.getCheckpoint().getSeqNo()); + + // Deserialize the new SegmentInfos object sent from the primary. + final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); + SegmentInfos infos = SegmentInfos.readCommit( + store.directory(), + toIndexInput(checkpointInfoResponse.getInfosBytes()), + responseCheckpoint.getSegmentsGen() + ); + // clean up the local store of old segment files + // and validate the latest segment infos against the snapshot sent from the primary shard. + store.cleanupAndVerify( + "finalize - clean with in memory infos", + checkpointInfoResponse.getSnapshot(), + store.getMetadata(infos) + ); + + // Update the current infos reference on the Engine's reader. + indexShard.updateCurrentInfos(infos, responseCheckpoint.getSeqNo()); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are @@ -189,25 +232,18 @@ public void finalizeReplication(TransportCheckpointInfoResponse checkpointInfo, }); } - private void getFiles(TransportCheckpointInfoResponse checkpointInfo, StepListener getFilesListener) - throws IOException { - final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot(); - Store.MetadataSnapshot localMetadata = getMetadataSnapshot(); - final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata); - logger.debug("Recovery diff {}", diff); - final List filesToFetch = Stream.concat(diff.missing.stream(), diff.different.stream()) - .collect(Collectors.toList()); - for (StoreFileMetadata file : filesToFetch) { - state.getIndex().addFileDetail(file.name(), file.length(), false); - } - if (filesToFetch.isEmpty()) { - getFilesListener.onResponse(new GetFilesResponse()); - } - source.getFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, getFilesListener); + /** + * This method formats our byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be + * passed to SegmentInfos.readCommit + */ + private ChecksumIndexInput toIndexInput(byte[] input) { + return new BufferedChecksumIndexInput( + new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") + ); } private Store.MetadataSnapshot getMetadataSnapshot() throws IOException { - if (indexShard.recoveryState().getStage() == RecoveryState.Stage.INIT) { + if (indexShard.state().equals(IndexShardState.STARTED) == false) { return Store.MetadataSnapshot.EMPTY; } return store.getMetadata(indexShard.getLatestSegmentInfos());