diff --git a/CHANGELOG.md b/CHANGELOG.md index b27403096b7ea..25b6c86ea7785 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Deprecated ### Removed ### Fixed +- [Segment Replication] Fix for peer recovery ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344)) + ### Security [Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java new file mode 100644 index 0000000000000..bc07ef502dc79 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -0,0 +1,277 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.OpenSearchCorruptionException; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +/** + * This test class verifies primary shard relocation with segment replication as replication strategy. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationRelocationIT extends SegmentReplicationIT { + private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); + + private void createIndex() { + prepareCreate( + INDEX_NAME, + Settings.builder() + .put("index.number_of_shards", 1) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.number_of_replicas", 1) + ).get(); + } + + /** + * This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before + * relocation and after relocation documents are indexed and documents are verified + */ + public void testPrimaryRelocation() throws Exception { + final String oldPrimary = internalCluster().startNode(featureFlagSettings()); + createIndex(); + final String replica = internalCluster().startNode(featureFlagSettings()); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(0, 200); + ingestDocs(initialDocCount); + + logger.info("--> verifying count {}", initialDocCount); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + logger.info("--> start another node"); + final String newPrimary = internalCluster().startNode(featureFlagSettings()); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> get the state, verify shard 1 primary moved from node1 to node2"); + ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); + + logger.info("--> state {}", state); + + assertEquals( + state.getRoutingNodes().node(state.nodes().resolveNode(newPrimary).getId()).iterator().next().state(), + ShardRoutingState.STARTED + ); + + final int finalDocCount = initialDocCount; + ingestDocs(finalDocCount); + refresh(INDEX_NAME); + + logger.info("--> verifying count again {}", initialDocCount + finalDocCount); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount( + client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + assertHitCount( + client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + } + + /** + * This test verifies the primary relocation behavior when segment replication round fails during recovery. Post + * failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the + * replicas. + */ + public void testPrimaryRelocationWithSegRepFailure() throws Exception { + final String oldPrimary = internalCluster().startNode(featureFlagSettings()); + createIndex(); + final String replica = internalCluster().startNode(featureFlagSettings()); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(1, 100); + ingestDocs(initialDocCount); + + logger.info("--> verifying count {}", initialDocCount); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + logger.info("--> start another node"); + final String newPrimary = internalCluster().startNode(featureFlagSettings()); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + oldPrimary + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, newPrimary), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + throw new OpenSearchCorruptionException("expected"); + } + connection.sendRequest(requestId, action, request, options); + } + ); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + final int finalDocCount = initialDocCount; + ingestDocs(finalDocCount); + refresh(INDEX_NAME); + + logger.info("Verify older primary is still refreshing replica nodes"); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount( + client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + assertHitCount( + client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + } + + /** + * This test verifies primary recovery behavior with continuous ingestion + * + */ + public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { + final String primary = internalCluster().startNode(featureFlagSettings()); + prepareCreate( + INDEX_NAME, + Settings.builder() + .put("index.number_of_shards", 1) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", -1) + ).get(); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush to have segments on disk"); + client().admin().indices().prepareFlush().execute().actionGet(); + + logger.info("--> index more docs so there are ops in the transaction log"); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 10; i < 20; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + + final String replica = internalCluster().startNode(featureFlagSettings()); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> relocate the shard from primary to replica"); + ActionFuture relocationListener = client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) + .execute(); + for (int i = 20; i < 120; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + relocationListener.actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> verifying count"); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 120L); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index dfeda6809b77b..79e488bdd85af 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -63,6 +63,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; +import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; @@ -751,16 +752,23 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean(); /** - * Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided - * {@link Runnable} is executed after all operations are successfully blocked. + * Completes the relocation. Operations are blocked and current operations are drained before changing state to + * relocated. After all operations are successfully blocked, performSegRep is executed followed by target relocation + * handoff. * - * @param consumer a {@link Runnable} that is executed after operations are blocked + * @param performSegRep a {@link Runnable} that is executed after operations are blocked + * @param consumer a {@link Runnable} that is executed after performSegRep + * @param listener ActionListener * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation * @throws IllegalStateException if the relocation target is no longer part of the replication group * @throws InterruptedException if blocking operations is interrupted */ - public void relocated(final String targetAllocationId, final Consumer consumer) - throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { + public void relocated( + final String targetAllocationId, + final Consumer consumer, + final Consumer performSegRep, + final ActionListener listener + ) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { @@ -768,26 +776,33 @@ public void relocated(final String targetAllocationId, final Consumer segRepSyncListener = new StepListener<>(); + performSegRep.accept(segRepSyncListener); + segRepSyncListener.whenComplete(r -> { + /* + * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a + * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. + */ + verifyRelocatingState(); + final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId); try { - replicationTracker.abortRelocationHandoff(); - } catch (final Exception inner) { - e.addSuppressed(inner); + consumer.accept(primaryContext); + synchronized (mutex) { + verifyRelocatingState(); + replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under + // mutex + } + } catch (final Exception e) { + try { + replicationTracker.abortRelocationHandoff(); + } catch (final Exception inner) { + e.addSuppressed(inner); + } + throw e; } - throw e; - } + listener.onResponse(null); + }, listener::onFailure); }); } catch (TimeoutException e) { logger.warn("timed out waiting for relocation hand-off to complete"); @@ -1648,6 +1663,20 @@ static Engine.Searcher wrapSearcher( } } + /** + * Used with segment replication during relocation handoff, this method updates current read only engine to global + * checkpoint followed by changing to writeable engine + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + * + * @opensearch.internal + */ + public void resetToWriteableEngine() throws IOException, InterruptedException, TimeoutException { + indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); + } + /** * Wrapper for a non-closing reader * @@ -3379,6 +3408,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); } + /** + * With segment replication enabled for primary relocation, recover replica shard initially as read only and + * change to a writeable engine during relocation handoff after a round of segment replication. + */ + boolean isReadOnlyReplica = indexSettings.isSegRepEnabled() + && (shardRouting.primary() == false + || (shardRouting.isRelocationTarget() && recoveryState.getStage() != RecoveryState.Stage.FINALIZE)); return this.engineConfigFactory.newEngineConfig( shardId, @@ -3403,7 +3439,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro replicationTracker::getRetentionLeases, () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), - indexSettings.isSegRepEnabled() && shardRouting.primary() == false, + isReadOnlyReplica, translogFactorySupplier.apply(indexSettings, shardRouting) ); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java b/server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java new file mode 100644 index 0000000000000..2600097fd0f2a --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * Request to force a round of segment replication on primary target + * + * @opensearch.internal + */ +public class ForceSyncRequest extends RecoveryTransportRequest { + private final long recoveryId; + private final ShardId shardId; + + public ForceSyncRequest(long requestSeqNo, long recoveryId, ShardId shardId) { + super(requestSeqNo); + this.recoveryId = recoveryId; + this.shardId = shardId; + } + + public ForceSyncRequest(StreamInput in) throws IOException { + super(in); + this.recoveryId = in.readLong(); + this.shardId = new ShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(recoveryId); + shardId.writeTo(out); + } + + public long getRecoveryId() { + return recoveryId; + } + + public ShardId getShardId() { + return shardId; + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java index 9ffe61208b78c..4bb6fc1d23b9f 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java @@ -101,7 +101,6 @@ && isTargetSameHistory() final StepListener sendFileStep = new StepListener<>(); final StepListener prepareEngineStep = new StepListener<>(); final StepListener sendSnapshotStep = new StepListener<>(); - final StepListener finalizeStep = new StepListener<>(); if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 6259842c282bf..e2f5ec76f2bd1 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -820,19 +820,34 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis logger ); + final StepListener handoffListener = new StepListener<>(); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); + final Consumer forceSegRepConsumer = shard.indexSettings().isSegRepEnabled() + ? recoveryTarget::forceSegmentFileSync + : res -> res.onResponse(null); // TODO: make relocated async // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done - cancellableThreads.execute(() -> shard.relocated(request.targetAllocationId(), recoveryTarget::handoffPrimaryContext)); + cancellableThreads.execute( + () -> shard.relocated( + request.targetAllocationId(), + recoveryTarget::handoffPrimaryContext, + forceSegRepConsumer, + handoffListener + ) + ); /* * if the recovery process fails after disabling primary mode on the source shard, both relocation source and * target are failed (see {@link IndexShard#updateRoutingEntry}). */ + } else { + handoffListener.onResponse(null); } - stopWatch.stop(); - logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); - listener.onResponse(null); + handoffListener.whenComplete(res -> { + stopWatch.stop(); + logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); + listener.onResponse(null); + }, listener::onFailure); }, listener::onFailure); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index f03cbb650fccf..cf0fe28ac616c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -263,6 +263,11 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { + throw new UnsupportedOperationException("Method not supported on target!"); + } + @Override public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener listener) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java index c750c0e88364b..ef0d4abc44c7d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java @@ -53,6 +53,15 @@ public interface RecoveryTargetHandler extends FileChunkWriter { */ void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener); + /** + * Used with Segment replication only + * + * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files + * conflict with replicas when target is promoted as primary. + * @param listener segment replication event listener + */ + void forceSegmentFileSync(ActionListener listener); + /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates * the global checkpoint. diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index e7ae62c1bee7d..5f638103a021c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -45,6 +45,7 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.RemoteSegmentFileChunkWriter; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.transport.EmptyTransportResponseHandler; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportResponse; @@ -186,6 +187,23 @@ public void indexTranslogOperations( retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader); } + /** + * Used with Segment replication only + * + * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files + * conflict with replicas when target is promoted as primary. + * @param listener segment replication event listener + */ + @Override + public void forceSegmentFileSync(ActionListener listener) { + final String action = SegmentReplicationTargetService.Actions.FORCE_SYNC; + final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); + final ForceSyncRequest request = new ForceSyncRequest(requestSeqNo, recoveryId, shardId); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final ActionListener responseListener = ActionListener.map(listener, r -> null); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); + } + @Override public void receiveFileInfo( List phase1FileNames, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index b633f0fa3b9a0..85a34878af03f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -21,7 +21,9 @@ import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.ForceSyncRequest; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; @@ -33,9 +35,12 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; +import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; /** @@ -56,6 +61,8 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final Map latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap(); + private final IndicesService indicesService; + // Empty Implementation, only required while Segment Replication is under feature flag. public static final SegmentReplicationTargetService NO_OP = new SegmentReplicationTargetService() { @Override @@ -80,6 +87,7 @@ private SegmentReplicationTargetService() { recoverySettings = null; onGoingReplications = null; sourceFactory = null; + indicesService = null; } public ReplicationRef get(long replicationId) { @@ -93,18 +101,21 @@ public ReplicationRef get(long replicationId) { */ public static class Actions { public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk"; + public static final String FORCE_SYNC = "internal:index/shard/replication/segments_sync"; } public SegmentReplicationTargetService( final ThreadPool threadPool, final RecoverySettings recoverySettings, final TransportService transportService, - final SegmentReplicationSourceFactory sourceFactory + final SegmentReplicationSourceFactory sourceFactory, + final IndicesService indicesService ) { this.threadPool = threadPool; this.recoverySettings = recoverySettings; this.onGoingReplications = new ReplicationCollection<>(logger, threadPool); this.sourceFactory = sourceFactory; + this.indicesService = indicesService; transportService.registerRequestHandler( Actions.FILE_CHUNK, @@ -112,6 +123,12 @@ public SegmentReplicationTargetService( FileChunkRequest::new, new FileChunkTransportRequestHandler() ); + transportService.registerRequestHandler( + Actions.FORCE_SYNC, + ThreadPool.Names.GENERIC, + ForceSyncRequest::new, + new ForceSyncTransportRequestHandler() + ); } /** @@ -321,4 +338,60 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha } } } + + class ForceSyncTransportRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception { + assert indicesService != null; + final IndexShard indexShard = indicesService.getShardOrNull(request.getShardId()); + startReplication( + ReplicationCheckpoint.empty(request.getShardId()), + indexShard, + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication complete, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + try { + indexShard.resetToWriteableEngine(); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (InterruptedException | TimeoutException | IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication failed, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + if (sendShardFailure == true) { + indexShard.failShard("replication failure", e); + } + try { + channel.sendResponse(e); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + ); + } + } + } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 5384b3cee8d71..f7ac392463bb2 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1060,7 +1060,8 @@ protected Node( threadPool, recoverySettings, transportService, - new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService), + indicesService ) ); b.bind(SegmentReplicationSourceService.class) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index b7678ff1d39ed..1011729b2d58b 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -50,6 +50,7 @@ import org.apache.lucene.util.Constants; import org.junit.Assert; import org.opensearch.Assertions; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -141,6 +142,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; @@ -994,7 +996,22 @@ public void testOperationPermitOnReplicaShards() throws Exception { AllocationId.newRelocation(routing.allocationId()) ); IndexShardTestCase.updateRoutingEntry(indexShard, routing); - indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + indexShard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(indexShard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); engineClosed = false; break; } @@ -1947,7 +1964,22 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { Thread recoveryThread = new Thread(() -> { latch.countDown(); try { - shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1979,7 +2011,22 @@ public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { Thread recoveryThread = new Thread(() -> { try { startRecovery.await(); - shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> relocationStarted.countDown()); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> relocationStarted.countDown(), + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -2067,11 +2114,26 @@ public void run() { AtomicBoolean relocated = new AtomicBoolean(); final Thread recoveryThread = new Thread(() -> { try { - shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + relocated.set(true); + } + + @Override + public void onFailure(Exception e) { + relocated.set(false); + fail(e.toString()); + } + } + ); } catch (InterruptedException e) { throw new RuntimeException(e); } - relocated.set(true); }); // ensure we wait for all primary operation locks to be acquired allPrimaryOperationLocksAcquired.await(); @@ -2102,20 +2164,72 @@ public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedE final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); - shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting)); closeShards(shard); } - public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException { + public void testRelocatedSegRepConsumerError() throws IOException, InterruptedException { + final IndexShard shard = newStartedShard(true); + final ShardRouting originalRouting = shard.routingEntry(); + final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onFailure(new ReplicationFailedException("Segment replication failed")), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + fail("Expected failure"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(ExceptionsHelper.unwrapCause(e) instanceof ReplicationFailedException); + assertEquals(e.getMessage(), "Segment replication failed"); + } + } + ); + closeShards(shard); + } + + public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException, InterruptedException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting relocationRouting = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, relocationRouting); IndexShardTestCase.updateRoutingEntry(shard, originalRouting); - expectThrows( - IllegalIndexShardStateException.class, - () -> shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}) + shard.relocated( + relocationRouting.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + fail("IllegalIndexShardStateException expected!"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalIndexShardStateException); + } + } ); closeShards(shard); } @@ -2130,13 +2244,28 @@ public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, Thread relocationThread = new Thread(new AbstractRunnable() { @Override public void onFailure(Exception e) { - relocationException.set(e); + fail(e.toString()); } @Override protected void doRun() throws Exception { cyclicBarrier.await(); - shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + relocationRouting.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + relocationException.set(e); + } + } + ); } }); relocationThread.start(); @@ -2184,20 +2313,48 @@ public void testRelocateMissingTarget() throws Exception { final ShardRouting toNode2 = ShardRoutingHelper.relocate(original, "node_2"); IndexShardTestCase.updateRoutingEntry(shard, toNode2); final AtomicBoolean relocated = new AtomicBoolean(); - final IllegalStateException error = expectThrows( - IllegalStateException.class, - () -> shard.relocated(toNode1.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true)) - ); - assertThat( - error.getMessage(), - equalTo( - "relocation target [" - + toNode1.getTargetRelocatingShard().allocationId().getId() - + "] is no longer part of the replication group" - ) + shard.relocated( + toNode1.getTargetRelocatingShard().allocationId().getId(), + ctx -> relocated.set(true), + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + fail("Expected IllegalStateException!"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalStateException); + assertThat( + e.getMessage(), + equalTo( + "relocation target [" + + toNode1.getTargetRelocatingShard().allocationId().getId() + + "] is no longer part of the replication group" + ) + ); + } + } ); + assertFalse(relocated.get()); - shard.relocated(toNode2.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true)); + shard.relocated( + toNode2.getTargetRelocatingShard().allocationId().getId(), + ctx -> relocated.set(true), + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(relocated.get()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); assertTrue(relocated.get()); closeShards(shard); } @@ -2590,7 +2747,22 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting); - shard.relocated(inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); assertTrue(shard.isRelocatedPrimary()); try { IndexShardTestCase.updateRoutingEntry(shard, origRouting); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index ed1f5a9b06dc1..44771faf36871 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; @@ -34,6 +35,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.CheckpointInfoResponse; import org.opensearch.indices.replication.GetSegmentFilesResponse; import org.opensearch.indices.replication.SegmentReplicationSource; @@ -45,6 +47,8 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationFailedException; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -55,9 +59,12 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -222,7 +229,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { IndexShard primaryShard = newStartedShard(true); SegmentReplicationTargetService sut; - sut = prepareForReplication(primaryShard); + sut = prepareForReplication(primaryShard, null); SegmentReplicationTargetService spy = spy(sut); // Starting a new shard in PrimaryMode and shard routing primary. @@ -286,6 +293,91 @@ public void testReplicaReceivesGenIncrease() throws Exception { } } + public void testPrimaryRelocation() throws Exception { + final IndexShard primarySource = newStartedShard(true, settings); + int totalOps = randomInt(10); + for (int i = 0; i < totalOps; i++) { + indexDoc(primarySource, "_doc", Integer.toString(i)); + } + IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); + final IndexShard primaryTarget = newShard( + primarySource.routingEntry().getTargetRelocatingShard(), + settings, + new NRTReplicationEngineFactory() + ); + updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); + + BiFunction, ActionListener, List> replicatePrimaryFunction = ( + shardList, + listener) -> { + try { + assert shardList.size() >= 2; + final IndexShard primary = shardList.get(0); + return replicateSegments(primary, shardList.subList(1, shardList.size()), listener); + } catch (IOException | InterruptedException e) { + listener.onFailure(e); + throw new RuntimeException(e); + } + }; + recoverReplica(primaryTarget, primarySource, true, replicatePrimaryFunction); + + // check that local checkpoint of new primary is properly tracked after primary relocation + assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L)); + assertThat( + primaryTarget.getReplicationTracker() + .getTrackedLocalCheckpointForShard(primaryTarget.routingEntry().allocationId().getId()) + .getLocalCheckpoint(), + equalTo(totalOps - 1L) + ); + assertDocCount(primaryTarget, totalOps); + closeShards(primarySource, primaryTarget); + } + + public void testPrimaryRelocationWithSegRepFailure() throws Exception { + final IndexShard primarySource = newStartedShard(true, settings); + int totalOps = randomInt(10); + for (int i = 0; i < totalOps; i++) { + indexDoc(primarySource, "_doc", Integer.toString(i)); + } + IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); + final IndexShard primaryTarget = newShard( + primarySource.routingEntry().getTargetRelocatingShard(), + settings, + new NRTReplicationEngineFactory() + ); + updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); + + BiFunction, ActionListener, List> replicatePrimaryFunction = ( + shardList, + listener) -> { + listener.onFailure(new IOException("Expected failure")); + return null; + }; + Exception e = expectThrows( + Exception.class, + () -> recoverReplica( + primaryTarget, + primarySource, + (primary, sourceNode) -> new RecoveryTarget(primary, sourceNode, new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + throw new AssertionError("recovery must fail"); + } + + @Override + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("Expected failure")); + } + }), + true, + true, + replicatePrimaryFunction + ) + ); + assertThat(e, hasToString(containsString("Expected failure"))); + closeShards(primarySource, primaryTarget); + } + public void testReplicaReceivesLowerGeneration() throws Exception { // when a replica gets incoming segments that are lower than what it currently has on disk. @@ -748,7 +840,8 @@ private SegmentReplicationTargetService newTargetService(SegmentReplicationSourc threadPool, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), mock(TransportService.class), - sourceFactory + sourceFactory, + null ); } diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 7761f97769440..0f5bba4f0c332 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -1116,6 +1116,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) {} + @Override + public void forceSegmentFileSync(ActionListener listener) {} + @Override public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 9a9c5b989dea9..25625fbf68a64 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -71,7 +71,7 @@ public void setUp() throws Exception { replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource); - sut = prepareForReplication(primaryShard); + sut = prepareForReplication(primaryShard, null); initialCheckpoint = replicaShard.getLatestReplicationCheckpoint(); aheadCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 5732fc5bfa270..415a3d487b790 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1900,7 +1900,8 @@ public void onFailure(final Exception e) { threadPool, recoverySettings, transportService, - new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService), + indicesService ), SegmentReplicationSourceService.NO_OP, shardStateAction, diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 20fe47c1d4cc0..ad19473380063 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -544,7 +544,8 @@ public void recoverReplica( targetSupplier, markAsRecovering, inSyncIds, - routingTable + routingTable, + (a, b) -> null ); OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); computeReplicationTargets(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 0ee8cc1fcbe0c..bc5508e604131 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -101,6 +101,7 @@ import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogFactory; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.recovery.AsyncRecoveryTarget; @@ -176,6 +177,8 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase { private static final AtomicBoolean failOnShardFailures = new AtomicBoolean(true); + private RecoveryTarget recoveryTarget; + private static final Consumer DEFAULT_SHARD_FAILURE_HANDLER = failure -> { if (failOnShardFailures.get()) { throw new AssertionError(failure.reason, failure.cause); @@ -834,18 +837,45 @@ protected DiscoveryNode getFakeDiscoNode(String id) { ); } - /** recovers a replica from the given primary **/ protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { - recoverReplica(replica, primary, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener), true, startReplica); + recoverReplica(replica, primary, startReplica, (a, b) -> null); } /** recovers a replica from the given primary **/ + protected void recoverReplica( + IndexShard replica, + IndexShard primary, + boolean startReplica, + BiFunction, ActionListener, List> replicatePrimaryFunction + ) throws IOException { + recoverReplica( + replica, + primary, + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener), + true, + startReplica, + replicatePrimaryFunction + ); + } + protected void recoverReplica( final IndexShard replica, final IndexShard primary, final BiFunction targetSupplier, final boolean markAsRecovering, final boolean markAsStarted + ) throws IOException { + recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a, b) -> null); + } + + /** recovers a replica from the given primary **/ + protected void recoverReplica( + final IndexShard replica, + final IndexShard primary, + final BiFunction targetSupplier, + final boolean markAsRecovering, + final boolean markAsStarted, + final BiFunction, ActionListener, List> replicatePrimaryFunction ) throws IOException { IndexShardRoutingTable.Builder newRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId()); newRoutingTable.addShard(primary.routingEntry()); @@ -854,7 +884,7 @@ protected void recoverReplica( } final Set inSyncIds = Collections.singleton(primary.routingEntry().allocationId().getId()); final IndexShardRoutingTable routingTable = newRoutingTable.build(); - recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, routingTable); + recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, routingTable, replicatePrimaryFunction); if (markAsStarted) { startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); } @@ -877,7 +907,8 @@ protected final void recoverUnstartedReplica( final BiFunction targetSupplier, final boolean markAsRecovering, final Set inSyncIds, - final IndexShardRoutingTable routingTable + final IndexShardRoutingTable routingTable, + final BiFunction, ActionListener, List> replicatePrimaryFunction ) throws IOException { final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); @@ -911,7 +942,7 @@ protected final void recoverUnstartedReplica( recoverySettings.setChunkSize(new ByteSizeValue(fileChunkSizeInBytes)); final RecoverySourceHandler recovery = RecoverySourceHandlerFactory.create( primary, - new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), + new AsyncRecoveryTarget(recoveryTarget, threadPool.generic(), primary, replica, replicatePrimaryFunction), request, recoverySettings ); @@ -1226,14 +1257,17 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { * been configured to return the given primaryShard's current segments. * * @param primaryShard {@link IndexShard} - The primary shard to replicate from. + * @param target {@link IndexShard} - The target replica shard in segment replication. */ - public final SegmentReplicationTargetService prepareForReplication(IndexShard primaryShard) { + public final SegmentReplicationTargetService prepareForReplication(IndexShard primaryShard, IndexShard target) { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final IndicesService indicesService = mock(IndicesService.class); final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( threadPool, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), mock(TransportService.class), - sourceFactory + sourceFactory, + indicesService ); final SegmentReplicationSource replicationSource = new SegmentReplicationSource() { @Override @@ -1273,6 +1307,7 @@ public void getSegmentFiles( } }; when(sourceFactory.get(any())).thenReturn(replicationSource); + when(indicesService.getShardOrNull(any())).thenReturn(target); return targetService; } @@ -1284,16 +1319,10 @@ public void getSegmentFiles( * @param replicaShards - Replicas that will be updated. * @return {@link List} List of target components orchestrating replication. */ - public final List replicateSegments(IndexShard primaryShard, List replicaShards) - throws IOException, InterruptedException { - final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard); - return replicateSegments(targetService, primaryShard, replicaShards); - } - public final List replicateSegments( - SegmentReplicationTargetService targetService, IndexShard primaryShard, - List replicaShards + List replicaShards, + ActionListener... listeners ) throws IOException, InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); Map primaryMetadata; @@ -1303,6 +1332,7 @@ public final List replicateSegments( } List ids = new ArrayList<>(); for (IndexShard replica : replicaShards) { + final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); final SegmentReplicationTarget target = targetService.startReplication( ReplicationCheckpoint.empty(replica.shardId), replica, @@ -1316,7 +1346,13 @@ public void onReplicationDone(SegmentReplicationState state) { assertTrue(recoveryDiff.missing.isEmpty()); assertTrue(recoveryDiff.different.isEmpty()); assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + for (ActionListener listener : listeners) { + listener.onResponse(null); + } } catch (Exception e) { + for (ActionListener listener : listeners) { + listener.onFailure(e); + } throw ExceptionsHelper.convertToRuntime(e); } finally { countDownLatch.countDown(); diff --git a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java index b3ddec889c1e2..45b11c95b4102 100644 --- a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java @@ -38,12 +38,15 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.SegmentReplicationTarget; import java.util.List; import java.util.concurrent.Executor; +import java.util.function.BiFunction; /** * Wraps a {@link RecoveryTarget} to make all remote calls to be executed asynchronously using the provided {@code executor}. @@ -52,9 +55,32 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler { private final RecoveryTargetHandler target; private final Executor executor; + private final IndexShard primary; + + private final IndexShard replica; + + private final BiFunction, ActionListener, List> replicatePrimaryFunction; + public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) { this.executor = executor; this.target = target; + this.primary = null; + this.replica = null; + this.replicatePrimaryFunction = (a, b) -> null; + } + + public AsyncRecoveryTarget( + RecoveryTargetHandler target, + Executor executor, + IndexShard primary, + IndexShard replica, + BiFunction, ActionListener, List> replicatePrimaryFunction + ) { + this.executor = executor; + this.target = target; + this.primary = primary; + this.replica = replica; + this.replicatePrimaryFunction = replicatePrimaryFunction; } @Override @@ -62,6 +88,11 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener target.prepareForTranslogOperations(totalTranslogOps, listener)); } + @Override + public void forceSegmentFileSync(ActionListener listener) { + executor.execute(() -> this.replicatePrimaryFunction.apply(List.of(primary, replica), listener)); + } + @Override public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { executor.execute(() -> target.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener));