From 7b9ed28f90559de1ed5e5629e9d38f0d6c79ec52 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 27 Jun 2022 20:59:28 -0700 Subject: [PATCH] Test opening engine after initial copy. Signed-off-by: Marc Handalian --- .../replication/SegmentReplicationIT.java | 155 ++++++++++++++++++ .../opensearch/common/util/FeatureFlags.java | 3 +- .../index/engine/NRTReplicationEngine.java | 5 + .../opensearch/index/shard/IndexShard.java | 31 +++- .../org/opensearch/index/store/Store.java | 3 +- .../opensearch/indices/IndicesService.java | 4 +- .../cluster/IndicesClusterStateService.java | 10 +- .../recovery/RecoverySourceHandler.java | 1 + .../SegmentReplicationSourceFactory.java | 4 +- .../SegmentReplicationSourceHandler.java | 77 +++++++-- .../replication/SegmentReplicationTarget.java | 39 ++++- .../SegmentReplicationTargetService.java | 44 ++++- ...actIndicesClusterStateServiceTestCase.java | 2 + ...ClusterStateServiceRandomUpdatesTests.java | 9 + .../snapshots/SnapshotResiliencyTests.java | 1 + 15 files changed, 361 insertions(+), 27 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java new file mode 100644 index 0000000000000..843df54aac1a5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -0,0 +1,155 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.junit.Assert; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; +import org.opensearch.index.engine.Segment; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationIT extends OpenSearchIntegTestCase { + + private static final String INDEX_NAME = "test-idx-1"; + private static final int SHARD_COUNT = 1; + private static final int REPLICA_COUNT = 1; + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + + // wait a short amount of time to give replication a chance to complete. + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + Thread.sleep(1000); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + private void assertSegmentStats(int numberOfReplicas) { + client().admin().indices().segments(new IndicesSegmentsRequest(), new ActionListener<>() { + @Override + public void onResponse(IndicesSegmentResponse indicesSegmentResponse) { + + List segmentsByIndex = indicesSegmentResponse.getIndices() + .values() + .stream() // get list of IndexSegments + .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group + .map(IndexShardSegments::getShards) // get list of segments across replication group + .collect(Collectors.toList()); + + // There will be an entry in the list for each index. + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = Arrays.stream(replicationGroupSegments) + .collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + + // create a map of the primary's segments keyed by segment name, allowing us to compare the same segment found on + // replicas. + final Map primarySegmentsMap = primaryShardSegments.getSegments() + .stream() + .collect(Collectors.toMap(Segment::getName, Function.identity())); + // For every replica, ensure that its segments are in the same state as on the primary. + // It is possible the primary has not cleaned up old segments that are not required on replicas, so we can't do a + // list comparison. + // This equality check includes search/committed properties on the Segment. Combined with docCount checks, + // this ensures the replica has correctly copied the latest segments and has all segments referenced by the latest + // commit point, even if they are not searchable. + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegments.size() + ); + + for (ShardSegments shardSegment : replicaShardSegments) { + for (Segment replicaSegment : shardSegment.getSegments()) { + final Segment primarySegment = primarySegmentsMap.get(replicaSegment.getName()); + assertEquals("Replica's segment should be identical to primary's version", replicaSegment, primarySegment); + } + } + } + } + + @Override + public void onFailure(Exception e) { + Assert.fail("Error fetching segment stats"); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index fa39dc9ac5aa0..5cce4ac0d1929 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -35,6 +35,7 @@ public class FeatureFlags { * and false otherwise. */ public static boolean isEnabled(String featureFlagName) { - return "true".equalsIgnoreCase(System.getProperty(featureFlagName)); + return true; +// return "true".equalsIgnoreCase(System.getProperty(featureFlagName)); } } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index e4f4bbbba8f16..fe63604981d4d 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -59,10 +59,13 @@ public class NRTReplicationEngine extends Engine { public NRTReplicationEngine(EngineConfig engineConfig) { super(engineConfig); + logger.info("Creating NRTReplicationEngine"); store.incRef(); NRTReplicationReaderManager readerManager = null; try { + logger.info("NRTEngine dir files {}", Arrays.asList(store.directory().listAll())); lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + logger.info("NRTEngine files {}", lastCommittedSegmentInfos.files(true)); readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId)); final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit( this.lastCommittedSegmentInfos.getUserData().entrySet() @@ -93,6 +96,8 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th this.lastCommittedSegmentInfos = infos; rollTranslogGeneration(); } + logger.info("ADVANCING PROCESSED SEQ NO TO {}", seqNo); + logger.info("persisted seq no {}", localCheckpointTracker.getPersistedCheckpoint()); localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d25847dde235c..fa0b46cf53677 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -161,9 +161,9 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -1396,6 +1396,15 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti * Returns the lastest Replication Checkpoint that shard received */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { + if (isActive() == false) { + return new ReplicationCheckpoint( + shardId, + getOperationPrimaryTerm(), + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED + ); + } try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { return Optional.ofNullable(snapshot.get()) .map( @@ -1944,7 +1953,12 @@ public void openEngineAndRecoverFromTranslog() throws IOException { */ public void openEngineAndSkipTranslogRecovery() throws IOException { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; - recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + if (indexSettings.isSegRepEnabled() == false) { + recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + } else { + recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); + recoveryState.setStage(RecoveryState.Stage.TRANSLOG); + } loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); getEngine().skipTranslogRecovery(); @@ -1970,6 +1984,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). + logger.info("ENGINE FACTORY TYPE {}", engineFactory.getClass()); final Engine newEngine = engineFactory.newReadWriteEngine(config); onNewEngine(newEngine); currentEngineReference.set(newEngine); @@ -2975,6 +2990,7 @@ public void startRecovery( RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, RecoveryListener recoveryListener, + SegmentReplicationTargetService segmentReplicationTargetService, RepositoriesService repositoriesService, Consumer mappingUpdateConsumer, IndicesService indicesService @@ -3004,7 +3020,16 @@ public void startRecovery( case PEER: try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); - recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener); + if (indexSettings.isSegRepEnabled()) { + // Start a "Recovery" using segment replication. This ensures the shard is tracked by the primary + // and started with the latest set of segments. + segmentReplicationTargetService.recoverShard( + this, + recoveryListener + ); + } else { + recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener); + } } catch (Exception e) { failShard("corrupted preexisting index", e); recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 2309004c0777d..25ecb2080ae3b 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -105,6 +105,7 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -1003,7 +1004,7 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director // version is written since 3.1+: we should have already hit IndexFormatTooOld. throw new IllegalArgumentException("expected valid version value: " + info.info.toString()); } - if (version.onOrAfter(maxVersion)) { + if (maxVersion == null || version.onOrAfter(maxVersion)) { maxVersion = version; } for (String file : info.files()) { diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index b2f6f10c19638..8271cb56f5079 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -139,6 +139,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.node.Node; import org.opensearch.plugins.IndexStorePlugin; @@ -847,6 +848,7 @@ public IndexShard createShard( final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final RecoveryListener recoveryListener, + final SegmentReplicationTargetService segmentReplicationTargetService, final RepositoriesService repositoriesService, final Consumer onShardFailure, final Consumer globalCheckpointSyncer, @@ -867,7 +869,7 @@ public IndexShard createShard( repositoriesService ); indexShard.addShardFailureCallback(onShardFailure); - indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { + indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, segmentReplicationTargetService, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "mapping update consumer only required by local shards recovery"; client.admin() diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index 7233b6893b03e..37304a033a5e6 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -80,6 +80,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.RepositoriesService; @@ -120,6 +121,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final ClusterService clusterService; private final ThreadPool threadPool; private final PeerRecoveryTargetService recoveryTargetService; + private final SegmentReplicationTargetService segmentReplicationTargetService; private final ShardStateAction shardStateAction; private final NodeMappingRefreshAction nodeMappingRefreshAction; @@ -148,6 +150,7 @@ public IndicesClusterStateService( final ClusterService clusterService, final ThreadPool threadPool, final PeerRecoveryTargetService recoveryTargetService, + final SegmentReplicationTargetService segmentReplicationTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, final RepositoriesService repositoriesService, @@ -166,6 +169,7 @@ public IndicesClusterStateService( threadPool, checkpointPublisher, recoveryTargetService, + segmentReplicationTargetService, shardStateAction, nodeMappingRefreshAction, repositoriesService, @@ -186,6 +190,7 @@ public IndicesClusterStateService( final ThreadPool threadPool, final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, + final SegmentReplicationTargetService segmentReplicationTargetService, final ShardStateAction shardStateAction, final NodeMappingRefreshAction nodeMappingRefreshAction, final RepositoriesService repositoriesService, @@ -198,11 +203,12 @@ public IndicesClusterStateService( ) { this.settings = settings; this.checkpointPublisher = checkpointPublisher; - this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService); + this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, segmentReplicationTargetService, searchService, snapshotShardsService); this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; this.recoveryTargetService = recoveryTargetService; + this.segmentReplicationTargetService = segmentReplicationTargetService; this.shardStateAction = shardStateAction; this.nodeMappingRefreshAction = nodeMappingRefreshAction; this.repositoriesService = repositoriesService; @@ -634,6 +640,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR checkpointPublisher, recoveryTargetService, new RecoveryListener(shardRouting, primaryTerm, this), + segmentReplicationTargetService, repositoriesService, failedShardHandler, globalCheckpointSyncer, @@ -992,6 +999,7 @@ T createShard( SegmentReplicationCheckpointPublisher checkpointPublisher, PeerRecoveryTargetService recoveryTargetService, RecoveryListener recoveryListener, + SegmentReplicationTargetService segmentReplicationTargetService, RepositoriesService repositoriesService, Consumer onShardFailure, Consumer globalCheckpointSyncer, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 9e219db5a4c96..3bc85279213da 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -219,6 +219,7 @@ && isTargetSameHistory() && shard.hasCompleteHistoryOperations(PEER_RECOVERY_NAME, request.startingSeqNo()) && ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); + logger.info("isSequenceNoBased {}", isSequenceNumberBasedRecovery); // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's // possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold. diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index afbb80d263805..dd8f7c9f18321 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -39,11 +39,11 @@ public SegmentReplicationSourceFactory( public SegmentReplicationSource get(IndexShard shard) { return new PrimaryShardReplicationSource( - clusterService.localNode(), + shard.recoveryState().getTargetNode(), shard.routingEntry().allocationId().getId(), transportService, recoverySettings, - getPrimaryNode(shard.shardId()) + shard.recoveryState().getSourceNode() ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index fdabd48c62929..d1450c669cd30 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -12,6 +12,8 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; +import org.opensearch.action.support.ThreadedActionListener; +import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; @@ -20,6 +22,7 @@ import org.opensearch.common.util.concurrent.ListenableFuture; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.RunUnderPrimaryPermit; @@ -39,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import static org.opensearch.index.seqno.ReplicationTracker.getPeerRecoveryRetentionLeaseId; + /** * Orchestrates sending requested segment files to a target shard. * @@ -112,17 +117,17 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene }; RunUnderPrimaryPermit.run(() -> { - final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); - ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); - if (targetShardRouting == null) { - logger.debug( - "delaying replication of {} as it is not listed as assigned to target node {}", - shard.shardId(), - request.getTargetNode() - ); - throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); - } - }, + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); + if (targetShardRouting == null) { + logger.debug( + "delaying replication of {} as it is not listed as assigned to target node {}", + shard.shardId(), + request.getTargetNode() + ); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } + }, shard.shardId() + " validating recovery target [" + request.getTargetAllocationId() + "] registered ", shard, cancellableThreads, @@ -141,16 +146,64 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene resources.add(transfer); transfer.start(); + final StepListener addRetentionLeaseStep = new StepListener<>(); + final String targetAllocationId = request.getTargetAllocationId(); sendFileStep.whenComplete(r -> { + final boolean contains = shard.getRetentionLeases().contains(getPeerRecoveryRetentionLeaseId(request.getTargetNode().getId())); + if (contains == false) { + RunUnderPrimaryPermit.run( + () -> shard.cloneLocalPeerRecoveryRetentionLease( + request.getTargetNode().getId(), + new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC, addRetentionLeaseStep, false) + ), + "Add retention lease step", + shard, + new CancellableThreads(), + logger + ); + } else { + addRetentionLeaseStep.onResponse(new ReplicationResponse()); + } + }, onFailure); + final StepListener lastStep = new StepListener<>(); + + addRetentionLeaseStep.whenComplete(r -> { + RunUnderPrimaryPermit.run( + () -> shard.initiateTracking(targetAllocationId), + shard.shardId() + " initiating tracking of " + targetAllocationId, + shard, + new CancellableThreads(), + logger + ); + RunUnderPrimaryPermit.run( + () -> shard.updateLocalCheckpointForShard(targetAllocationId, SequenceNumbers.NO_OPS_PERFORMED), + shard.shardId() + " marking " + targetAllocationId + " as in sync", + shard, + new CancellableThreads(), + logger + ); + RunUnderPrimaryPermit.run( + () -> shard.markAllocationIdAsInSync(targetAllocationId, request.getCheckpoint().getSeqNo()), + shard.shardId() + " marking " + targetAllocationId + " as in sync", + shard, + cancellableThreads, + logger + ); + lastStep.onResponse(null); + }, onFailure); + lastStep.whenComplete(r -> { try { future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); } finally { IOUtils.close(resources); } }, onFailure); - } catch (Exception e) { + + } catch ( + Exception e) { IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); } + } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index fb68e59f3b2ef..222f7e57f218e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -24,9 +24,12 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.MultiFileWriter; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; @@ -36,6 +39,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -43,6 +47,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY; + /** * Represents the target of a replication event. * @@ -53,6 +59,7 @@ public class SegmentReplicationTarget extends ReplicationTarget { private final ReplicationCheckpoint checkpoint; private final SegmentReplicationSource source; private final SegmentReplicationState state; + private final boolean isEmptyShard; protected final MultiFileWriter multiFileWriter; public SegmentReplicationTarget( @@ -62,10 +69,18 @@ public SegmentReplicationTarget( ReplicationListener listener ) { super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); + boolean isEmptyShard1; this.checkpoint = checkpoint; this.source = source; this.state = new SegmentReplicationState(stateIndex); this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount); + try { + isEmptyShard1 = store.directory().listAll().length == 0; + } catch (IOException e) { + fail(new OpenSearchException("Unable to read directory", e), true); + isEmptyShard1 = true; + } + this.isEmptyShard = isEmptyShard1; } @Override @@ -163,7 +178,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { + private synchronized void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { ActionListener.completeWith(listener, () -> { multiFileWriter.renameAllTempFiles(); final Store store = store(); @@ -201,6 +216,24 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); + if (isEmptyShard) { + final SegmentInfos lastCommitted = store.readLastCommittedSegmentsInfo(); + final long localCheckpoint = Long.parseLong(lastCommitted.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final String uuid = lastCommitted.userData.get(TRANSLOG_UUID_KEY); + Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + shardId(), + localCheckpoint, + indexShard.getPendingPrimaryTerm(), + uuid, + FileChannel::open + ); + } + if (indexShard.state() != IndexShardState.STARTED) { + indexShard.persistRetentionLeases(); + indexShard.openEngineAndSkipTranslogRecovery(); + } + logger.info("AFTER COPY {}", Arrays.asList(store.directory().listAll())); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos)); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { @@ -252,7 +285,7 @@ private ChecksumIndexInput toIndexInput(byte[] input) { } Store.MetadataSnapshot getMetadataSnapshot() throws IOException { - if (indexShard.getSegmentInfosSnapshot() == null) { + if (isEmptyShard) { return Store.MetadataSnapshot.EMPTY; } return store.getMetadata(indexShard.getSegmentInfosSnapshot().get()); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index c44b27911bb7a..e2d0ee82b65fe 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -13,12 +13,21 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; import org.opensearch.common.settings.Settings; +import org.opensearch.gateway.WriteStateException; +import org.opensearch.index.engine.EngineException; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardRecoveryException; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; +import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; @@ -31,6 +40,8 @@ import org.opensearch.transport.TransportRequestHandler; import org.opensearch.transport.TransportService; +import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; /** @@ -87,8 +98,9 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh /** * Invoked when a new checkpoint is received from a primary shard. * It checks if a new checkpoint should be processed or not and starts replication if needed. - * @param receivedCheckpoint received checkpoint that is checked for processing - * @param replicaShard replica shard on which checkpoint is received + * + * @param receivedCheckpoint received checkpoint that is checked for processing + * @param replicaShard replica shard on which checkpoint is received */ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { @@ -103,7 +115,8 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { @Override - public void onReplicationDone(SegmentReplicationState state) {} + public void onReplicationDone(SegmentReplicationState state) { + } @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { @@ -117,6 +130,31 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept } } + public void recoverShard(IndexShard indexShard, RecoveryListener recoveryListener) { + indexShard.prepareForIndexRecovery(); + startReplication( + indexShard.getLatestReplicationCheckpoint(), + indexShard, + new SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + if (indexShard.state() != IndexShardState.STARTED) { + // The first time our shard is set up we need to mark its recovery complete. + indexShard.recoveryState().getIndex().setFileDetailsComplete(); + indexShard.finalizeRecovery(); + indexShard.postRecovery("Shard setup complete."); + } + recoveryListener.onDone(indexShard.recoveryState()); + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + recoveryListener.onFailure(indexShard.recoveryState(), e, sendShardFailure); + } + } + ); + } + public void startReplication( final ReplicationCheckpoint checkpoint, final IndexShard indexShard, diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 0619e3e3f62a2..176c2191558d3 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -59,6 +59,7 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchTestCase; @@ -257,6 +258,7 @@ public MockIndexShard createShard( final SegmentReplicationCheckpointPublisher checkpointPublisher, final PeerRecoveryTargetService recoveryTargetService, final RecoveryListener recoveryListener, + final SegmentReplicationTargetService segmentReplicationTargetService, final RepositoriesService repositoriesService, final Consumer onShardFailure, final Consumer globalCheckpointSyncer, diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index d38d31f3ef43b..c73f64ca142c0 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -66,6 +66,8 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.TestThreadPool; @@ -556,6 +558,12 @@ private IndicesClusterStateService createIndicesClusterStateService( null, clusterService ); + final SegmentReplicationTargetService segmentReplicationTargetService = new SegmentReplicationTargetService( + threadPool, + null, + transportService, + new SegmentReplicationSourceFactory(transportService, null, clusterService) + ); final ShardStateAction shardStateAction = mock(ShardStateAction.class); final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class); return new IndicesClusterStateService( @@ -565,6 +573,7 @@ private IndicesClusterStateService createIndicesClusterStateService( threadPool, SegmentReplicationCheckpointPublisher.EMPTY, recoveryTargetService, + segmentReplicationTargetService, shardStateAction, null, repositoriesService, diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 68a6af25a7c82..b17b2a2bc7066 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1843,6 +1843,7 @@ public void onFailure(final Exception e) { clusterService, threadPool, new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService), + null, shardStateAction, new NodeMappingRefreshAction(transportService, metadataMappingService), repositoriesService,