From bca1cdea29e7de9a0d1bf8c977e54665029fa020 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 13 Dec 2022 09:49:51 -0800 Subject: [PATCH] Address review comments Signed-off-by: Suraj Singh --- CHANGELOG.md | 2 +- .../opensearch/index/shard/IndexShardIT.java | 4 +- .../replication/SegmentReplicationIT.java | 181 +----------- .../SegmentReplicationRelocationIT.java | 276 ++++++++++++++++++ .../routing/IndexShardRoutingTable.java | 5 - .../opensearch/common/util/FeatureFlags.java | 2 +- .../org/opensearch/index/IndexService.java | 7 +- .../opensearch/index/shard/IndexShard.java | 62 +--- .../opensearch/indices/IndicesService.java | 12 +- .../cluster/IndicesClusterStateService.java | 9 +- .../indices/recovery/ForceSyncRequest.java | 52 ++++ .../LocalStorePeerRecoverySourceHandler.java | 1 - .../recovery/PeerRecoveryTargetService.java | 30 +- .../recovery/RecoverySourceHandler.java | 25 +- .../indices/recovery/RecoveryTarget.java | 90 +++++- .../recovery/RecoveryTargetHandler.java | 9 + .../recovery/RemoteRecoveryTargetHandler.java | 17 ++ .../main/java/org/opensearch/node/Node.java | 41 +-- .../RemoveCorruptedShardDataCommandTests.java | 4 +- .../SegmentReplicationIndexShardTests.java | 8 +- ...dicesLifecycleListenerSingleNodeTests.java | 4 +- ...actIndicesClusterStateServiceTestCase.java | 4 +- ...ClusterStateServiceRandomUpdatesTests.java | 3 +- ...alStorePeerRecoverySourceHandlerTests.java | 16 +- .../recovery/ReplicationCollectionTests.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 8 +- ...enSearchIndexLevelReplicationTestCase.java | 52 ++-- .../index/shard/IndexShardTestCase.java | 143 ++++----- .../indices/recovery/AsyncRecoveryTarget.java | 19 ++ 29 files changed, 665 insertions(+), 424 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java create mode 100644 server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 638f3e5d19710..6857b76d3c939 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518)) - Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) - Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459)) -- [Segment Replication] Update peer recovery logic for segment replication ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344)) ### Dependencies - Bumps `log4j-core` from 2.18.0 to 2.19.0 @@ -95,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fixed compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) - Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299)) - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) +- [Segment Replication] Fix for peer recovery ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 50828c79566aa..959b04b7861a3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -85,7 +85,6 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; @@ -702,8 +701,7 @@ public static final IndexShard newIndexShard( cbs, new InternalTranslogFactory(), SegmentReplicationCheckpointPublisher.EMPTY, - null, - SegmentReplicationTargetService.NO_OP + null ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index e87da06b7bfca..4c4665fae275f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -11,14 +11,11 @@ import com.carrotsearch.randomizedtesting.RandomizedTest; import org.junit.BeforeClass; import org.opensearch.OpenSearchCorruptionException; -import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; -import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.opensearch.action.admin.indices.segments.IndexShardSegments; import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; -import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; @@ -27,9 +24,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; -import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; @@ -51,7 +46,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Arrays; import java.util.List; @@ -72,14 +66,13 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends OpenSearchIntegTestCase { - private static final String INDEX_NAME = "test-idx-1"; - private static final int SHARD_COUNT = 1; - private static final int REPLICA_COUNT = 1; + protected static final String INDEX_NAME = "test-idx-1"; + protected static final int SHARD_COUNT = 1; + protected static final int REPLICA_COUNT = 1; @BeforeClass public static void assumeFeatureFlag() { - // assumeTrue("Segment replication Feature flag is enabled", - // Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); } @Override @@ -103,8 +96,6 @@ protected boolean addMockInternalEngine() { return false; } - private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); - public void ingestDocs(int docCount) throws Exception { try ( BackgroundIndexer indexer = new BackgroundIndexer( @@ -124,166 +115,6 @@ public void ingestDocs(int docCount) throws Exception { } } - /** - * This test relocates a primary shard to a newly added node in the cluster. Before relocation and after relocation - * we index documents. We don't perform any flush before relocation is done. - */ - public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exception { - logger.info("--> starting [primary] ..."); - final String old_primary = internalCluster().startNode(); - - logger.info("--> creating test index ..."); - prepareCreate( - INDEX_NAME, - Settings.builder() - .put("index.number_of_shards", 1) - .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) - .put("index.number_of_replicas", 1) - ).get(); - - final String replica = internalCluster().startNode(); - - ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(0, 200); - ingestDocs(initialDocCount); - - logger.info("--> verifying count {}", initialDocCount); - assertHitCount(client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); - - logger.info("--> start another node"); - final String new_primary = internalCluster().startNode(); - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("3") - .execute() - .actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - - ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); - logger.info("--> cluster state before relocation {}", state); - - logger.info("--> relocate the shard"); - - client().admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary)) - .execute() - .actionGet(); - clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .setTimeout(ACCEPTABLE_RELOCATION_TIME) - .execute() - .actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - - logger.info("--> get the state, verify shard 1 primary moved from node1 to node2"); - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - // assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1)); - assertThat( - state.getRoutingNodes().node(state.nodes().resolveNode(new_primary).getId()).iterator().next().state(), - equalTo(ShardRoutingState.STARTED) - ); - - final int finalDocCount = 1; - client().prepareIndex(INDEX_NAME).setId("201").setSource("bar", "baz").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - refresh(INDEX_NAME); - waitForReplicaUpdate(); - - logger.info("--> verifying count again {} + {}", initialDocCount, finalDocCount); - client().admin().indices().prepareRefresh().execute().actionGet(); - assertHitCount( - client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); - assertHitCount( - client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), - initialDocCount + finalDocCount - ); - - } - - public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { - logger.info("--> starting [primary] ..."); - final String primary = internalCluster().startNode(); - - logger.info("--> creating test index ..."); - prepareCreate( - INDEX_NAME, - Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).put("index.refresh_interval", -1) - ).get(); - - logger.info("--> index 10 docs"); - for (int i = 0; i < 10; i++) { - client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); - } - logger.info("--> flush so we have an actual index"); - client().admin().indices().prepareFlush().execute().actionGet(); - logger.info("--> index more docs so we have something in the translog"); - final List> pendingIndexResponses = new ArrayList<>(); - for (int i = 10; i < 20; i++) { - pendingIndexResponses.add( - client().prepareIndex(INDEX_NAME) - .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i) - .execute() - ); - } - - logger.info("--> start another node"); - final String replica = internalCluster().startNode(); - ClusterHealthResponse clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNodes("2") - .execute() - .actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - - logger.info("--> relocate the shard from primary to replica"); - ActionFuture relocationListener = client().admin() - .cluster() - .prepareReroute() - .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) - .execute(); - logger.info("--> index 100 docs while relocating"); - for (int i = 20; i < 120; i++) { - pendingIndexResponses.add( - client().prepareIndex(INDEX_NAME) - .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i) - .execute() - ); - } - relocationListener.actionGet(); - clusterHealthResponse = client().admin() - .cluster() - .prepareHealth() - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .setTimeout(ACCEPTABLE_RELOCATION_TIME) - .execute() - .actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - - logger.info("--> verifying count"); - assertBusy(() -> { - client().admin().indices().prepareRefresh().execute().actionGet(); - assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); - }, 1, TimeUnit.MINUTES); - assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), 120); - } - public void testPrimaryStopped_ReplicaPromoted() throws Exception { final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -950,7 +781,7 @@ private void waitForReplicaUpdate() throws Exception { // if we don't have any segments yet, proceed. final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false) { + if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); for (ShardSegments shardSegments : replicaShardSegments) { @@ -965,7 +796,7 @@ private void waitForReplicaUpdate() throws Exception { }); } - private IndexShard getIndexShard(String node) { + protected IndexShard getIndexShard(String node) { final Index index = resolveIndex(INDEX_NAME); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); IndexService indexService = indicesService.indexServiceSafe(index); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java new file mode 100644 index 0000000000000..b87c9d07d04c4 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -0,0 +1,276 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.OpenSearchCorruptionException; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +/** + * This test class verifies primary shard relocation with segment replication as replication strategy. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationRelocationIT extends SegmentReplicationIT { + private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); + + private void createIndex() { + prepareCreate( + INDEX_NAME, + Settings.builder() + .put("index.number_of_shards", 1) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.number_of_replicas", 1) + ).get(); + } + + /** + * This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before + * relocation and after relocation documents are indexed and document is verified + */ + public void testSimplePrimaryRelocationWithoutFlushBeforeRelocation() throws Exception { + final String old_primary = internalCluster().startNode(); + createIndex(); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(0, 200); + ingestDocs(initialDocCount); + + logger.info("--> verifying count {}", initialDocCount); + assertHitCount(client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + logger.info("--> start another node"); + final String new_primary = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary)) + .execute() + .actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> get the state, verify shard 1 primary moved from node1 to node2"); + ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); + + logger.info("--> state {}", state); + + assertThat( + state.getRoutingNodes().node(state.nodes().resolveNode(new_primary).getId()).iterator().next().state(), + equalTo(ShardRoutingState.STARTED) + ); + + final int finalDocCount = initialDocCount; + ingestDocs(finalDocCount); + + logger.info("--> verifying count again {}", initialDocCount + finalDocCount); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount( + client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + assertHitCount( + client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + } + + /** + * This test verifies the primary to primary relocation behavior when segment replication round fails on new primary. + * Post failure, test asserts that the old primary continues segment replication rounds to refresh replicas. + */ + public void testPrimaryRelocationWithSegRepFailure() throws Exception { + final String old_primary = internalCluster().startNode(); + createIndex(); + final String replica = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(0, 200); + ingestDocs(initialDocCount); + + logger.info("--> verifying count {}", initialDocCount); + assertHitCount(client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + logger.info("--> start another node"); + final String new_primary = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + old_primary + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, new_primary), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + throw new OpenSearchCorruptionException("expected"); + } + connection.sendRequest(requestId, action, request, options); + } + ); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary)) + .execute() + .actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + final IndexShard indexShard = getIndexShard(old_primary); + logger.info("Verify old primary shard is not disabled to perform segrep on replicas"); + assertFalse(indexShard.isBlockInternalCheckPointRefresh()); + + final int finalDocCount = initialDocCount; + ingestDocs(finalDocCount); + + logger.info("Verify all documents are available on both old primary and replica"); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount( + client(old_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + assertHitCount( + client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + } + + public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { + final String primary = internalCluster().startNode(); + prepareCreate( + INDEX_NAME, + Settings.builder() + .put("index.number_of_shards", 1) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", -1) + ).get(); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush to have segments on disk"); + client().admin().indices().prepareFlush().execute().actionGet(); + + logger.info("--> index more docs so there are ops in the transaction log"); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 10; i < 20; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + + final String replica = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> relocate the shard from primary to replica"); + ActionFuture relocationListener = client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) + .execute(); + for (int i = 20; i < 120; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + relocationListener.actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> verifying count"); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 3767304e3827c..9026e7068e9fe 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -32,8 +32,6 @@ package org.opensearch.cluster.routing; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.Nullable; @@ -97,8 +95,6 @@ public class IndexShardRoutingTable implements Iterable { */ final List allInitializingShards; - private static final Logger logger = LogManager.getLogger(IndexShardRoutingTable.class); - IndexShardRoutingTable(ShardId shardId, List shards) { this.shardId = shardId; this.shuffler = new RotationShardShuffler(Randomness.get().nextInt()); @@ -868,7 +864,6 @@ public Builder removeShard(ShardRouting shardEntry) { public IndexShardRoutingTable build() { // don't allow more than one shard copy with same id to be allocated to same node - logger.info("--> shards {}", shards); assert distinctNodes(shards) : "more than one shard with same id assigned to same node (shards: " + shards + ")"; return new IndexShardRoutingTable(shardId, Collections.unmodifiableList(new ArrayList<>(shards))); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index b432b30068f0d..31dd621f678ad 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -49,6 +49,6 @@ public class FeatureFlags { * and false otherwise. */ public static boolean isEnabled(String featureFlagName) { - return true;// "true".equalsIgnoreCase(System.getProperty(featureFlagName)); + return "true".equalsIgnoreCase(System.getProperty(featureFlagName)); } } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 208f81a236270..92f957633db84 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -95,7 +95,6 @@ import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.script.ScriptService; @@ -435,8 +434,7 @@ public synchronized IndexShard createShard( final ShardRouting routing, final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, - final SegmentReplicationCheckpointPublisher checkpointPublisher, - final SegmentReplicationTargetService segmentReplicationTargetService + final SegmentReplicationCheckpointPublisher checkpointPublisher ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); /* @@ -553,8 +551,7 @@ public synchronized IndexShard createShard( // TODO Replace with remote translog factory in the follow up PR this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(), this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, - remoteStore, - segmentReplicationTargetService + remoteStore ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 86f9bfbd04098..be2789aab73fa 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -55,7 +55,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; @@ -163,11 +162,8 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.SegmentReplicationState; -import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; -import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -329,8 +325,6 @@ public void setBlockInternalCheckPointRefresh(boolean blockInternalCheckPointRef */ private boolean blockInternalCheckPointRefresh; - private final SegmentReplicationTargetService segmentReplicationTargetService; - public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -354,8 +348,7 @@ public IndexShard( final CircuitBreakerService circuitBreakerService, final TranslogFactory translogFactory, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, - @Nullable final Store remoteStore, - @Nullable final SegmentReplicationTargetService segmentReplicationTargetService + @Nullable final Store remoteStore ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -441,43 +434,6 @@ public boolean shouldCache(Query query) { this.checkpointPublisher = checkpointPublisher; this.remoteStore = remoteStore; this.translogFactory = translogFactory; - - this.segmentReplicationTargetService = segmentReplicationTargetService; - } - - /** - * Used with Segment replication only - * - * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files - * conflict with replicas when target is promoted as primary. - * @param listener segment replication event listener - */ - public void performSegmentReplicationRefresh(StepListener listener) { - this.segmentReplicationTargetService.startReplication( - ReplicationCheckpoint.empty(this.shardId()), - this, - new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - try { - indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); - listener.onResponse(null); - } catch (InterruptedException | TimeoutException | IOException e) { - listener.onFailure(e); - throw new RuntimeException(e); - } - } - - @Override - public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - logger.error("segment replication failure post recovery", e); - listener.onFailure(e); - if (sendShardFailure == true) { - failShard("segment replication failure post recovery", e); - } - } - } - ); } public ThreadPool getThreadPool() { @@ -1666,6 +1622,10 @@ static Engine.Searcher wrapSearcher( } } + public void resetEngine() throws IOException, InterruptedException, TimeoutException { + indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); + } + /** * Wrapper for a non-closing reader * @@ -3348,6 +3308,10 @@ private DocumentMapperForType docMapper() { } private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) throws IOException { + return this.newEngineConfig(globalCheckpointSupplier, false); + } + + private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier, boolean forceReadWriteEngine) throws IOException { final Sort indexSort = indexSortSupplier.get(); final Engine.Warmer warmer = reader -> { assert Thread.holdsLock(mutex) == false : "warming engine under mutex"; @@ -3365,16 +3329,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); } - - logger.info("--> recoveryState.getStage() {}", recoveryState.getStage()); /** * With segment replication enabled, recover replica shard as read only and primary shard as writeable during * translog recovery state. This condition assumes this method is not called during finalize recovery state. */ boolean isReadOnlyReplica = indexSettings.isSegRepEnabled() - && (shardRouting.primary() == false - || shardRouting.isRelocationTarget() && recoveryState.getStage() != RecoveryState.Stage.TRANSLOG); - + && (shardRouting.primary() == false || (shardRouting.isRelocationTarget() && forceReadWriteEngine == false)); return this.engineConfigFactory.newEngineConfig( shardId, threadPool, @@ -4188,7 +4148,7 @@ public void close() throws IOException { } }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); - newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); + newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker, true))); onNewEngine(newEngineReference.get()); } final TranslogRecoveryRunner translogRunner = (snapshot) -> runTranslogRecovery( diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 6821e4af7a980..204bf4204511e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -140,7 +140,6 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.node.Node; import org.opensearch.plugins.IndexStorePlugin; @@ -972,21 +971,14 @@ public IndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode, - final SegmentReplicationTargetService segmentReplicationTargetService + final DiscoveryNode sourceNode ) throws IOException { Objects.requireNonNull(retentionLeaseSyncer); ensureChangesAllowed(); IndexService indexService = indexService(shardRouting.index()); assert indexService != null; RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); - IndexShard indexShard = indexService.createShard( - shardRouting, - globalCheckpointSyncer, - retentionLeaseSyncer, - checkpointPublisher, - segmentReplicationTargetService - ); + IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer, checkpointPublisher); indexShard.addShardFailureCallback(onShardFailure); indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, mapping -> { assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index eb3f729d25fa1..83f4e0c7cbed9 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -152,8 +152,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final SegmentReplicationTargetService segmentReplicationTargetService; - @Inject public IndicesClusterStateService( final Settings settings, @@ -239,7 +237,6 @@ public IndicesClusterStateService( this.globalCheckpointSyncer = globalCheckpointSyncer; this.retentionLeaseSyncer = Objects.requireNonNull(retentionLeaseSyncer); this.sendRefreshMapping = settings.getAsBoolean("indices.cluster.send_refresh_mapping", true); - this.segmentReplicationTargetService = segmentReplicationTargetService; } @Override @@ -669,8 +666,7 @@ private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardR globalCheckpointSyncer, retentionLeaseSyncer, nodes.getLocalNode(), - sourceNode, - segmentReplicationTargetService + sourceNode ); } catch (Exception e) { failAndRemoveShard(shardRouting, true, "failed to create shard", e, state); @@ -1103,8 +1099,7 @@ T createShard( Consumer globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, DiscoveryNode targetNode, - @Nullable DiscoveryNode sourceNode, - @Nullable SegmentReplicationTargetService segmentReplicationTargetService + @Nullable DiscoveryNode sourceNode ) throws IOException; /** diff --git a/server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java b/server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java new file mode 100644 index 0000000000000..2600097fd0f2a --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * Request to force a round of segment replication on primary target + * + * @opensearch.internal + */ +public class ForceSyncRequest extends RecoveryTransportRequest { + private final long recoveryId; + private final ShardId shardId; + + public ForceSyncRequest(long requestSeqNo, long recoveryId, ShardId shardId) { + super(requestSeqNo); + this.recoveryId = recoveryId; + this.shardId = shardId; + } + + public ForceSyncRequest(StreamInput in) throws IOException { + super(in); + this.recoveryId = in.readLong(); + this.shardId = new ShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(recoveryId); + shardId.writeTo(out); + } + + public long getRecoveryId() { + return recoveryId; + } + + public ShardId getShardId() { + return shardId; + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java index 1cf6acd26ae0a..54be091366213 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java @@ -101,7 +101,6 @@ && isTargetSameHistory() final StepListener sendFileStep = new StepListener<>(); final StepListener prepareEngineStep = new StepListener<>(); final StepListener sendSnapshotStep = new StepListener<>(); - final StepListener finalizeStep = new StepListener<>(); if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 4cc599d968624..7f5e84629efad 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -65,6 +65,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogCorruptedException; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; import org.opensearch.indices.replication.common.ReplicationTimer; @@ -111,6 +112,7 @@ public static class Actions { public static final String PREPARE_TRANSLOG = "internal:index/shard/recovery/prepare_translog"; public static final String FINALIZE = "internal:index/shard/recovery/finalize"; public static final String HANDOFF_PRIMARY_CONTEXT = "internal:index/shard/recovery/handoff_primary_context"; + public static final String FORCE_SYNC = "internal:index/shard/recovery/segments_sync"; } private final ThreadPool threadPool; @@ -122,17 +124,21 @@ public static class Actions { private final ReplicationCollection onGoingRecoveries; + private final SegmentReplicationTargetService segmentReplicationTargetService; + public PeerRecoveryTargetService( ThreadPool threadPool, TransportService transportService, RecoverySettings recoverySettings, - ClusterService clusterService + ClusterService clusterService, + SegmentReplicationTargetService segmentReplicationTargetService ) { this.threadPool = threadPool; this.transportService = transportService; this.recoverySettings = recoverySettings; this.clusterService = clusterService; this.onGoingRecoveries = new ReplicationCollection<>(logger, threadPool); + this.segmentReplicationTargetService = segmentReplicationTargetService; transportService.registerRequestHandler( Actions.FILES_INFO, @@ -176,6 +182,12 @@ public PeerRecoveryTargetService( RecoveryHandoffPrimaryContextRequest::new, new HandoffPrimaryContextRequestHandler() ); + transportService.registerRequestHandler( + Actions.FORCE_SYNC, + ThreadPool.Names.GENERIC, + ForceSyncRequest::new, + new ForceSyncTransportRequestHandler() + ); } @Override @@ -188,7 +200,7 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) { // create a new recovery status, and process... final long recoveryId = onGoingRecoveries.start( - new RecoveryTarget(indexShard, sourceNode, listener), + new RecoveryTarget(indexShard, sourceNode, listener, segmentReplicationTargetService), recoverySettings.activityTimeout() ); // we fork off quickly here and go async but this is called from the cluster state applier thread too and that can cause @@ -563,6 +575,20 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha } } + class ForceSyncTransportRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception { + try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.getRecoveryId(), request.getShardId())) { + final RecoveryTarget recoveryTarget = recoveryRef.get(); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FORCE_SYNC, request); + if (listener == null) { + return; + } + recoveryTarget.forceSegmentFileSync(listener); + } + } + } + class RecoveryRunner extends AbstractRunnable { final long recoveryId; diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index d191cfa657e89..9109879ceac53 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -189,12 +189,19 @@ public void recoverToTarget(ActionListener listener) { } protected void preventRefreshOnReplicas() { - // Block refresh on replicas + // Disable this shard from performing segment replication on replicas if (shard.indexSettings().isSegRepEnabled() && request.isPrimaryRelocation() == true) { shard.setBlockInternalCheckPointRefresh(true); } } + protected void resumeSegmentReplicationRefresh() { + // Enable this shard from performing segment replication on replicas + if (shard.indexSettings().isSegRepEnabled() && request.isPrimaryRelocation() == true) { + shard.setBlockInternalCheckPointRefresh(false); + } + } + protected abstract void innerRecoveryToTarget(ActionListener listener, Consumer onFailure) throws IOException; @@ -815,7 +822,21 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery final StepListener finalizeListener = new StepListener<>(); cancellableThreads.checkForCancel(); - recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener); + final StepListener segRepSyncListener = new StepListener<>(); + // Force a round of segment replication before finalizing the recovery + if (shard.indexSettings().isSegRepEnabled() && request.isPrimaryRelocation() == true) { + recoveryTarget.forceSegmentFileSync(segRepSyncListener); + } else { + segRepSyncListener.onResponse(null); + } + segRepSyncListener.whenComplete( + r -> { recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener); }, + e -> { + this.resumeSegmentReplicationRefresh(); + listener.onFailure(e); + } + ); + finalizeListener.whenComplete(r -> { RunUnderPrimaryPermit.run( () -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 3047dbf65ee7e..80084eda2156d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -32,13 +32,13 @@ package org.opensearch.indices.recovery; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.opensearch.Assertions; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; -import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.UUIDs; @@ -57,6 +57,9 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.SegmentReplicationState; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; @@ -68,6 +71,7 @@ import java.nio.file.Path; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; import static org.opensearch.index.translog.Translog.TRANSLOG_UUID_KEY; @@ -87,12 +91,36 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH // latch that can be used to blockingly wait for RecoveryTarget to be closed private final CountDownLatch closedLatch = new CountDownLatch(1); + private final SegmentReplicationTargetService segmentReplicationTargetService; + /** * Creates a new recovery target object that represents a recovery to the provided shard. * * @param indexShard local shard where we want to recover to * @param sourceNode source node of the recovery where we recover from * @param listener called when recovery is completed/failed + * @param segmentReplicationTargetService used to force a segment replication round + */ + public RecoveryTarget( + IndexShard indexShard, + DiscoveryNode sourceNode, + ReplicationListener listener, + SegmentReplicationTargetService segmentReplicationTargetService + ) { + super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener); + this.sourceNode = sourceNode; + indexShard.recoveryStats().incCurrentAsTarget(); + final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + "."; + this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount); + this.segmentReplicationTargetService = segmentReplicationTargetService; + } + + /** + * Creates a new recovery target object that represents a recovery to the provided shard. Used for tests. + * + * @param indexShard local shard where we want to recover to + * @param sourceNode source node of the recovery where we recover from + * @param listener called when recovery is completed/failed */ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) { super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener); @@ -100,6 +128,7 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, Replicati indexShard.recoveryStats().incCurrentAsTarget(); final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + "."; this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount); + this.segmentReplicationTargetService = SegmentReplicationTargetService.NO_OP; } /** @@ -108,7 +137,7 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, Replicati * @return a copy of this recovery target */ public RecoveryTarget retryCopy() { - return new RecoveryTarget(indexShard, sourceNode, listener); + return new RecoveryTarget(indexShard, sourceNode, listener, segmentReplicationTargetService); } public IndexShard indexShard() { @@ -218,17 +247,52 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { + segmentReplicationTargetService.startReplication( + ReplicationCheckpoint.empty(shardId()), + indexShard, + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication complete, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + try { + indexShard.resetEngine(); + listener.onResponse(null); + } catch (InterruptedException | TimeoutException | IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication failed, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + if (sendShardFailure == true) { + indexShard.failShard("replication failure", e); + } + listener.onFailure(e); + } + } + ); + } + @Override public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener listener) { - final StepListener segrepListener = new StepListener<>(); - // Force segment refresh on primary relocation which is recovered as replica initially & reset engine - if (this.indexShard.indexSettings().isSegRepEnabled() && this.state().getPrimary() == true) { - logger.info("--> Force segment replication event on new primary and switch to ReadWrite (Internal) engine"); - this.indexShard.performSegmentReplicationRefresh(segrepListener); - } else { - segrepListener.onResponse(null); - } - segrepListener.whenComplete(r -> { + ActionListener.completeWith(listener, () -> { indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); // Persist the global checkpoint. indexShard.sync(); @@ -248,8 +312,8 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true)); } indexShard.finalizeRecovery(); - listener.onResponse(null); - }, listener::onFailure); + return null; + }); } private boolean hasUncommittedOperations() throws IOException { diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java index c750c0e88364b..ef0d4abc44c7d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java @@ -53,6 +53,15 @@ public interface RecoveryTargetHandler extends FileChunkWriter { */ void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener); + /** + * Used with Segment replication only + * + * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files + * conflict with replicas when target is promoted as primary. + * @param listener segment replication event listener + */ + void forceSegmentFileSync(ActionListener listener); + /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates * the global checkpoint. diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index e7ae62c1bee7d..8476d18688aa6 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -186,6 +186,23 @@ public void indexTranslogOperations( retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader); } + /** + * Used with Segment replication only + * + * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files + * conflict with replicas when target is promoted as primary. + * @param listener segment replication event listener + */ + @Override + public void forceSegmentFileSync(ActionListener listener) { + final String action = PeerRecoveryTargetService.Actions.FORCE_SYNC; + final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); + final ForceSyncRequest request = new ForceSyncRequest(requestSeqNo, recoveryId, shardId); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final ActionListener responseListener = ActionListener.map(listener, r -> null); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); + } + @Override public void receiveFileInfo( List phase1FileNames, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 25b821430ce4e..3ab043e091d91 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -987,6 +987,19 @@ protected Node( resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); + final SegmentReplicationTargetService segmentReplicationTargetService = FeatureFlags.isEnabled(REPLICATION_TYPE) + ? new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ) + : SegmentReplicationTargetService.NO_OP; + + final SegmentReplicationSourceService segmentReplicationSourceService = FeatureFlags.isEnabled(REPLICATION_TYPE) + ? new SegmentReplicationSourceService(indicesService, transportService, recoverySettings) + : SegmentReplicationSourceService.NO_OP; + modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); @@ -1036,23 +1049,17 @@ protected Node( b.bind(PeerRecoverySourceService.class) .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) - .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); - if (FeatureFlags.isEnabled(REPLICATION_TYPE)) { - b.bind(SegmentReplicationTargetService.class) - .toInstance( - new SegmentReplicationTargetService( - threadPool, - recoverySettings, - transportService, - new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) - ) - ); - b.bind(SegmentReplicationSourceService.class) - .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); - } else { - b.bind(SegmentReplicationTargetService.class).toInstance(SegmentReplicationTargetService.NO_OP); - b.bind(SegmentReplicationSourceService.class).toInstance(SegmentReplicationSourceService.NO_OP); - } + .toInstance( + new PeerRecoveryTargetService( + threadPool, + transportService, + recoverySettings, + clusterService, + segmentReplicationTargetService + ) + ); + b.bind(SegmentReplicationTargetService.class).toInstance(segmentReplicationTargetService); + b.bind(SegmentReplicationSourceService.class).toInstance(segmentReplicationSourceService); } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java index 6875765280e92..a6ecb7053f286 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -65,7 +65,6 @@ import org.opensearch.index.translog.TestTranslog; import org.opensearch.index.translog.TranslogCorruptedException; import org.opensearch.index.translog.TranslogException; -import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.test.CorruptionUtils; import org.opensearch.test.DummyShardLock; import org.junit.Before; @@ -553,8 +552,7 @@ private IndexShard reopenIndexShard(boolean corrupted) throws IOException { indexShard.getGlobalCheckpointSyncer(), indexShard.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER, - null, - SegmentReplicationTargetService.NO_OP + null ); } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index b466d464ad258..5321e5152518a 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -70,10 +70,6 @@ public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelRepli .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - public void testPrimaryRecovery() { - - } - /** * Test that latestReplicationCheckpoint returns null only for docrep enabled indices */ @@ -289,15 +285,13 @@ public void testPrimaryPrimaryRelocation() throws Exception { final IndexShard primary = shards.getPrimary(); SegmentReplicationTargetService segmentReplicationTargetService = prepareForReplication(primary); - // final IndexShard replica_1 = shards.getReplicas().get(0); int numDocs = randomIntBetween(10, 100); shards.indexDocs(numDocs); flushShard(primary, false); - // replicateSegments(primary, List.of(replica_1)); IndexShardTestCase.updateRoutingEntry(primary, primary.routingEntry().relocate("s2", -1)); final IndexShard primaryTarget = shards.addReplica(primary.routingEntry().getTargetRelocatingShard(), false); - shards.recoverReplica(primaryTarget); + shards.recoverReplica(primaryTarget, segmentReplicationTargetService); // check that local checkpoint of new primary is properly tracked after primary relocation assertThat(primaryTarget.getLocalCheckpoint(), equalTo(numDocs - 1L)); diff --git a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java index 47affd2cf5e10..0989bf869f18e 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesLifecycleListenerSingleNodeTests.java @@ -49,7 +49,6 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.test.OpenSearchSingleNodeTestCase; @@ -154,8 +153,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem newRouting, s -> {}, RetentionLeaseSyncer.EMPTY, - SegmentReplicationCheckpointPublisher.EMPTY, - SegmentReplicationTargetService.NO_OP + SegmentReplicationCheckpointPublisher.EMPTY ); IndexShardTestCase.updateRoutingEntry(shard, newRouting); assertEquals(5, counter.get()); diff --git a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index b283192f88d68..0619e3e3f62a2 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/server/src/test/java/org/opensearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -59,7 +59,6 @@ import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; -import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.test.OpenSearchTestCase; @@ -263,8 +262,7 @@ public MockIndexShard createShard( final Consumer globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final DiscoveryNode targetNode, - final DiscoveryNode sourceNode, - final SegmentReplicationTargetService segmentReplicationTargetService + final DiscoveryNode sourceNode ) throws IOException { failRandomly(); RecoveryState recoveryState = new RecoveryState(shardRouting, targetNode, sourceNode); diff --git a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index ce4142363a54e..0903ec57df54d 100644 --- a/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/server/src/test/java/org/opensearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -560,7 +560,8 @@ private IndicesClusterStateService createIndicesClusterStateService( threadPool, transportService, null, - clusterService + clusterService, + SegmentReplicationTargetService.NO_OP ); final ShardStateAction shardStateAction = mock(ShardStateAction.class); final PrimaryReplicaSyncer primaryReplicaSyncer = mock(PrimaryReplicaSyncer.class); diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 8f9890048fdfb..0f5bba4f0c332 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -88,6 +88,7 @@ import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardRelocatedException; import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; @@ -121,7 +122,6 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -654,7 +654,7 @@ public void writeFileChunk( IOUtils.close(store); } - public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOException, ExecutionException, InterruptedException { + public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOException { final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service); final StartRecoveryRequest request = getStartRecoveryRequest(); final IndexShard shard = mock(IndexShard.class); @@ -737,11 +737,10 @@ void phase2( }; PlainActionFuture future = new PlainActionFuture<>(); - // expectThrows(IndexShardRelocatedException.class, () -> { - handler.recoverToTarget(future); - // future.actionGet(); - // }); - future.get(); + expectThrows(IndexShardRelocatedException.class, () -> { + handler.recoverToTarget(future); + future.actionGet(); + }); assertFalse(phase1Called.get()); assertFalse(prepareTargetForTranslogCalled.get()); assertFalse(phase2Called.get()); @@ -1117,6 +1116,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) {} + @Override + public void forceSegmentFileSync(ActionListener listener) {} + @Override public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {} diff --git a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java index 75ac1075e8ee0..b5cbc05f2c0ab 100644 --- a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java +++ b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java @@ -38,6 +38,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; @@ -175,7 +176,7 @@ public void testResetRecovery() throws Exception { shards.recoverReplica(shard, (s, n) -> { assertSame(s, newRecoveryRef.get().indexShard()); return newRecoveryRef.get(); - }, false); + }, false, SegmentReplicationTargetService.NO_OP); } shards.assertAllEqual(numDocs); assertNull("recovery is done", collection.get(recoveryId)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 663c325db12c2..9d3b4d67f356b 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1892,7 +1892,13 @@ public void onFailure(final Exception e) { indicesService, clusterService, threadPool, - new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService), + new PeerRecoveryTargetService( + threadPool, + transportService, + recoverySettings, + clusterService, + SegmentReplicationTargetService.NO_OP + ), new SegmentReplicationTargetService( threadPool, recoverySettings, diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index a586199c3c2d2..f1d9e1706d9ea 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -242,16 +242,7 @@ protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException protected ReplicationGroup(final IndexMetadata indexMetadata, final SegmentReplicationTargetService segmentReplicationTargetService) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard( - primaryRouting, - indexMetadata, - null, - getEngineFactory(primaryRouting), - () -> {}, - retentionLeaseSyncer, - null, - segmentReplicationTargetService - ); + primary = newShard(primaryRouting, indexMetadata, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer, null); replicas = new CopyOnWriteArrayList<>(); this.indexMetadata = indexMetadata; updateAllocationIDsOnPrimary(); @@ -378,8 +369,7 @@ public IndexShard addReplica(ShardRouting replicaRouting, boolean addReplicaTrac getEngineFactory(replicaRouting), () -> {}, retentionLeaseSyncer, - null, - SegmentReplicationTargetService.NO_OP + null ); if (addReplicaTracking) { addReplica(replica); @@ -400,8 +390,7 @@ public IndexShard addReplica() throws IOException { getEngineFactory(replicaRouting), () -> {}, retentionLeaseSyncer, - null, - SegmentReplicationTargetService.NO_OP + null ); addReplica(replica); return replica; @@ -443,8 +432,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP () -> {}, retentionLeaseSyncer, EMPTY_EVENT_LISTENER, - null, - SegmentReplicationTargetService.NO_OP + null ); replicas.add(newReplica); if (replicationTargets != null) { @@ -530,18 +518,39 @@ public synchronized boolean removeReplica(IndexShard replica) throws IOException } public void recoverReplica(IndexShard replica) throws IOException { - recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener)); + this.recoverReplica(replica, SegmentReplicationTargetService.NO_OP); } public void recoverReplica(IndexShard replica, BiFunction targetSupplier) throws IOException { - recoverReplica(replica, targetSupplier, true); + recoverReplica( + replica, + targetSupplier, + SegmentReplicationTargetService.NO_OP + ); + } + + public void recoverReplica(IndexShard replica, SegmentReplicationTargetService segmentReplicationTargetService) throws IOException { + recoverReplica( + replica, + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, segmentReplicationTargetService), + segmentReplicationTargetService + ); } public void recoverReplica( IndexShard replica, BiFunction targetSupplier, - boolean markAsRecovering + SegmentReplicationTargetService segmentReplicationTargetService + ) throws IOException { + recoverReplica(replica, targetSupplier, true, segmentReplicationTargetService); + } + + public void recoverReplica( + IndexShard replica, + BiFunction targetSupplier, + boolean markAsRecovering, + SegmentReplicationTargetService segmentReplicationTargetService ) throws IOException { final IndexShardRoutingTable routingTable = routingTable(Function.identity()); final Set inSyncIds = activeIds(); @@ -551,7 +560,8 @@ public void recoverReplica( targetSupplier, markAsRecovering, inSyncIds, - routingTable + routingTable, + segmentReplicationTargetService ); OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); computeReplicationTargets(); @@ -566,7 +576,7 @@ public Future asyncRecoverReplica( final BiFunction targetSupplier ) { final FutureTask task = new FutureTask<>(() -> { - recoverReplica(replica, targetSupplier); + recoverReplica(replica, targetSupplier, SegmentReplicationTargetService.NO_OP); return null; }); threadPool.generic().execute(task); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index a7867c727ddc7..10515eca4d4a2 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -108,6 +108,7 @@ import org.opensearch.indices.recovery.RecoverySourceHandlerFactory; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; +import org.opensearch.indices.recovery.RecoveryTargetHandler; import org.opensearch.indices.recovery.StartRecoveryRequest; import org.opensearch.indices.replication.CheckpointInfoResponse; import org.opensearch.indices.replication.GetSegmentFilesResponse; @@ -170,6 +171,8 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase { private static final AtomicBoolean failOnShardFailures = new AtomicBoolean(true); + private RecoveryTarget recoveryTarget; + private static final Consumer DEFAULT_SHARD_FAILURE_HANDLER = failure -> { if (failOnShardFailures.get()) { throw new AssertionError(failure.reason, failure.cause); @@ -318,17 +321,7 @@ protected IndexShard newShard( .settings(indexSettings) .primaryTerm(0, primaryTerm) .putMapping("{ \"properties\": {} }"); - return newShard( - shardRouting, - metadata.build(), - null, - engineFactory, - () -> {}, - RetentionLeaseSyncer.EMPTY, - null, - SegmentReplicationTargetService.NO_OP, - listeners - ); + return newShard(shardRouting, metadata.build(), null, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, null, listeners); } /** @@ -365,11 +358,11 @@ protected IndexShard newShard( IndexMetadata indexMetadata, @Nullable CheckedFunction readerWrapper ) throws IOException { - return newShard(shardId, primary, nodeId, indexMetadata, readerWrapper, () -> {}, SegmentReplicationTargetService.NO_OP); + return newShard(shardId, primary, nodeId, indexMetadata, readerWrapper, () -> {}); } /** - * creates a new initializing shard. The shard will be put in its proper path under the + * creates a new initializing shard. The shard will will be put in its proper path under the * supplied node id. * * @param shardId the shard id to use @@ -382,8 +375,7 @@ protected IndexShard newShard( String nodeId, IndexMetadata indexMetadata, @Nullable CheckedFunction readerWrapper, - Runnable globalCheckpointSyncer, - SegmentReplicationTargetService segmentReplicationTargetService + Runnable globalCheckpointSyncer ) throws IOException { ShardRouting shardRouting = TestShardRouting.newShardRouting( shardId, @@ -399,13 +391,12 @@ protected IndexShard newShard( new InternalEngineFactory(), globalCheckpointSyncer, RetentionLeaseSyncer.EMPTY, - null, - segmentReplicationTargetService + null ); } /** - * creates a new initializing shard. The shard will be put in its proper path under the + * creates a new initializing shard. The shard will will be put in its proper path under the * current node id the shard is assigned to. * * @param routing shard routing to use @@ -417,20 +408,9 @@ protected IndexShard newShard( IndexMetadata indexMetadata, @Nullable CheckedFunction indexReaderWrapper, EngineFactory engineFactory, - SegmentReplicationTargetService segmentReplicationTargetService, IndexingOperationListener... listeners ) throws IOException { - return newShard( - routing, - indexMetadata, - indexReaderWrapper, - engineFactory, - () -> {}, - RetentionLeaseSyncer.EMPTY, - null, - segmentReplicationTargetService, - listeners - ); + return newShard(routing, indexMetadata, indexReaderWrapper, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, null, listeners); } /** @@ -450,7 +430,6 @@ protected IndexShard newShard( Runnable globalCheckpointSyncer, RetentionLeaseSyncer retentionLeaseSyncer, Store remoteStore, - SegmentReplicationTargetService segmentReplicationTargetService, IndexingOperationListener... listeners ) throws IOException { // add node id as name to settings for proper logging @@ -469,7 +448,6 @@ protected IndexShard newShard( retentionLeaseSyncer, EMPTY_EVENT_LISTENER, remoteStore, - segmentReplicationTargetService, listeners ); } @@ -499,50 +477,6 @@ protected IndexShard newShard( IndexEventListener indexEventListener, Store remoteStore, IndexingOperationListener... listeners - ) throws IOException { - return this.newShard( - routing, - shardPath, - indexMetadata, - storeProvider, - indexReaderWrapper, - engineFactory, - engineConfigFactory, - globalCheckpointSyncer, - retentionLeaseSyncer, - indexEventListener, - remoteStore, - SegmentReplicationTargetService.NO_OP, - listeners - ); - } - - /** - * creates a new initializing shard. The shard will will be put in its proper path under the - * current node id the shard is assigned to. - * @param routing shard routing to use - * @param shardPath path to use for shard data - * @param indexMetadata indexMetadata for the shard, including any mapping - * @param storeProvider an optional custom store provider to use. If null a default file based store will be created - * @param indexReaderWrapper an optional wrapper to be used during search - * @param globalCheckpointSyncer callback for syncing global checkpoints - * @param indexEventListener index event listener - * @param listeners an optional set of listeners to add to the shard - */ - protected IndexShard newShard( - ShardRouting routing, - ShardPath shardPath, - IndexMetadata indexMetadata, - @Nullable CheckedFunction storeProvider, - @Nullable CheckedFunction indexReaderWrapper, - @Nullable EngineFactory engineFactory, - @Nullable EngineConfigFactory engineConfigFactory, - Runnable globalCheckpointSyncer, - RetentionLeaseSyncer retentionLeaseSyncer, - IndexEventListener indexEventListener, - Store remoteStore, - SegmentReplicationTargetService segmentReplicationTargetService, - IndexingOperationListener... listeners ) throws IOException { return newShard( routing, @@ -557,7 +491,6 @@ protected IndexShard newShard( indexEventListener, SegmentReplicationCheckpointPublisher.EMPTY, remoteStore, - segmentReplicationTargetService, listeners ); } @@ -587,7 +520,6 @@ protected IndexShard newShard( IndexEventListener indexEventListener, SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable Store remoteStore, - SegmentReplicationTargetService segmentReplicationTargetService, IndexingOperationListener... listeners ) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); @@ -648,8 +580,7 @@ protected IndexShard newShard( breakerService, new InternalTranslogFactory(), checkpointPublisher, - remoteStore, - segmentReplicationTargetService + remoteStore ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; @@ -733,7 +664,6 @@ protected IndexShard reinitShard( current.getRetentionLeaseSyncer(), EMPTY_EVENT_LISTENER, remoteStore, - SegmentReplicationTargetService.NO_OP, listeners ); } @@ -873,9 +803,30 @@ protected DiscoveryNode getFakeDiscoNode(String id) { ); } + protected void recoverReplica( + IndexShard replica, + IndexShard primary, + boolean startReplica, + SegmentReplicationTargetService segmentReplicationTargetService + ) throws IOException { + recoverReplica( + replica, + primary, + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, segmentReplicationTargetService), + true, + startReplica + ); + } + /** recovers a replica from the given primary **/ protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { - recoverReplica(replica, primary, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener), true, startReplica); + recoverReplica( + replica, + primary, + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, SegmentReplicationTargetService.NO_OP), + true, + startReplica + ); } /** recovers a replica from the given primary **/ @@ -899,6 +850,29 @@ protected void recoverReplica( } } + private RecoveryTargetHandler getRecoveryTarget(RecoveryTarget recoveryTarget) { + return new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()); + } + + protected final void recoverUnstartedReplica( + final IndexShard replica, + final IndexShard primary, + final BiFunction targetSupplier, + final boolean markAsRecovering, + final Set inSyncIds, + final IndexShardRoutingTable routingTable + ) throws IOException { + this.recoverUnstartedReplica( + replica, + primary, + targetSupplier, + markAsRecovering, + inSyncIds, + routingTable, + SegmentReplicationTargetService.NO_OP + ); + } + /** * Recovers a replica from the give primary, allow the user to supply a custom recovery target. A typical usage of a custom recovery * target is to assert things in the various stages of recovery. @@ -916,7 +890,8 @@ protected final void recoverUnstartedReplica( final BiFunction targetSupplier, final boolean markAsRecovering, final Set inSyncIds, - final IndexShardRoutingTable routingTable + final IndexShardRoutingTable routingTable, + final SegmentReplicationTargetService segmentReplicationTargetService ) throws IOException { final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); @@ -950,7 +925,7 @@ protected final void recoverUnstartedReplica( recoverySettings.setChunkSize(new ByteSizeValue(fileChunkSizeInBytes)); final RecoverySourceHandler recovery = RecoverySourceHandlerFactory.create( primary, - new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), + new AsyncRecoveryTarget(recoveryTarget, threadPool.generic(), segmentReplicationTargetService), request, recoverySettings ); diff --git a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java index b3ddec889c1e2..d778815733a49 100644 --- a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java @@ -41,6 +41,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import java.util.List; import java.util.concurrent.Executor; @@ -52,9 +53,22 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler { private final RecoveryTargetHandler target; private final Executor executor; + private final SegmentReplicationTargetService segmentReplicationTargetService; + public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) { this.executor = executor; this.target = target; + this.segmentReplicationTargetService = SegmentReplicationTargetService.NO_OP; + } + + public AsyncRecoveryTarget( + RecoveryTargetHandler target, + Executor executor, + SegmentReplicationTargetService segmentReplicationTargetService + ) { + this.executor = executor; + this.target = target; + this.segmentReplicationTargetService = segmentReplicationTargetService; } @Override @@ -62,6 +76,11 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener target.prepareForTranslogOperations(totalTranslogOps, listener)); } + @Override + public void forceSegmentFileSync(ActionListener listener) { + executor.execute(() -> target.forceSegmentFileSync(listener)); + } + @Override public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { executor.execute(() -> target.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener));