From 2b76583d7491c497e316c456ec84be7c6fda2b67 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Fri, 6 Jan 2023 10:24:57 -0800 Subject: [PATCH] [Segment Replication] Update peer recovery logic for segment replication (#5344) * [Segment Replication] Update peer recovery logic for segment replication Signed-off-by: Suraj Singh * Add integration test for indexing during relocation Signed-off-by: Suraj Singh * Address review comments Signed-off-by: Suraj Singh * Spotless check apply fixes & one failing unit test Signed-off-by: Suraj Singh * Delay force segment replication till relocation handoff Signed-off-by: Suraj Singh * Spotless fix Signed-off-by: Suraj Singh * Unit test fix Signed-off-by: Suraj Singh * Address review comment, move force segrep sync handler to SegRepTargetService Signed-off-by: Suraj Singh * Resolve conflicts and enabled tests Signed-off-by: Suraj Singh * Spotless fix Signed-off-by: Suraj Singh Signed-off-by: Suraj Singh --- CHANGELOG.md | 125 +++++++- .../replication/SegmentReplicationIT.java | 101 ++++--- .../SegmentReplicationRelocationIT.java | 277 ++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 218 +++++++------- .../indices/recovery/ForceSyncRequest.java | 52 ++++ .../LocalStorePeerRecoverySourceHandler.java | 1 - .../recovery/RecoverySourceHandler.java | 23 +- .../indices/recovery/RecoveryTarget.java | 5 + .../recovery/RecoveryTargetHandler.java | 9 + .../recovery/RemoteRecoveryTargetHandler.java | 18 ++ .../SegmentReplicationTargetService.java | 75 ++++- .../main/java/org/opensearch/node/Node.java | 3 +- .../index/shard/IndexShardTests.java | 222 ++++++++++++-- .../SegmentReplicationIndexShardTests.java | 102 ++++++- ...alStorePeerRecoverySourceHandlerTests.java | 3 + .../SegmentReplicationTargetServiceTests.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 3 +- ...enSearchIndexLevelReplicationTestCase.java | 3 +- .../index/shard/IndexShardTestCase.java | 107 ++++--- .../indices/recovery/AsyncRecoveryTarget.java | 31 ++ 20 files changed, 1145 insertions(+), 235 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java create mode 100644 server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index f1f630ea4bd25..af5484aa0fd1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,23 +1,132 @@ - # CHANGELOG All notable changes to this project are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on how to add changelog entries. -## [Unreleased 2.5] +## [Unreleased 3.0] ### Added +- Add support of default replica count cluster setting ([#5610](https://github.com/opensearch-project/OpenSearch/pull/5610)) +- Hardened token permissions in GitHub workflows ([#4587](https://github.com/opensearch-project/OpenSearch/pull/4587)) +- Support for HTTP/2 (server-side) ([#3847](https://github.com/opensearch-project/OpenSearch/pull/3847)) +- Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636)) +- Apply reproducible builds configuration for OpenSearch plugins through gradle plugin ([#4746](https://github.com/opensearch-project/OpenSearch/pull/4746)) +- Add project health badges to the README.md ([#4843](https://github.com/opensearch-project/OpenSearch/pull/4843)) +- [Test] Add IAE test for deprecated edgeNGram analyzer name ([#5040](https://github.com/opensearch-project/OpenSearch/pull/5040)) +- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) +- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211)) +- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366)) +- Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495)) +- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615))) +- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) +- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459)) +- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668)) +- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959)) +- Add query for initialized extensions ([#5658](https://github.com/opensearch-project/OpenSearch/pull/5658)) + ### Dependencies -- OpenJDK Update (January 2023 Patch releases) ([#6075](https://github.com/opensearch-project/OpenSearch/pull/6075)) -- Bumps `Mockito` from 4.7.0 to 5.1.0, `ByteBuddy` from 1.12.18 to 1.12.22 ([#6089](https://github.com/opensearch-project/OpenSearch/pull/6089)) -- Bumps `joda` from 2.10.13 to 2.12.2 ([#6096](https://github.com/opensearch-project/OpenSearch/pull/6096)) -- Bumps `Netty` from 4.1.86.Final to 4.1.87.Final ([#6130](https://github.com/opensearch-project/OpenSearch/pull/6130)) -- Bumps `Jackson` from 2.14.1 to 2.14.2 ([#6129](https://github.com/opensearch-project/OpenSearch/pull/6129)) +- Bumps `log4j-core` from 2.18.0 to 2.19.0 +- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23 +- Bumps `jettison` from 1.5.0 to 1.5.1 +- Bumps `forbiddenapis` from 3.3 to 3.4 +- Bumps `gson` from 2.9.0 to 2.10 +- Bumps `avro` from 1.11.0 to 1.11.1 +- Bumps `woodstox-core` from 6.3.0 to 6.3.1 +- Bumps `xmlbeans` from 5.1.0 to 5.1.1 ([#4354](https://github.com/opensearch-project/OpenSearch/pull/4354)) +- Bumps `azure-storage-common` from 12.18.0 to 12.18.1 ([#4164](https://github.com/opensearch-project/OpenSearch/pull/4664)) +- Bumps `org.gradle.test-retry` from 1.4.0 to 1.4.1 ([#4411](https://github.com/opensearch-project/OpenSearch/pull/4411)) +- Bumps `reactor-netty-core` from 1.0.19 to 1.0.22 ([#4447](https://github.com/opensearch-project/OpenSearch/pull/4447)) +- Bumps `reactive-streams` from 1.0.3 to 1.0.4 ([#4488](https://github.com/opensearch-project/OpenSearch/pull/4488)) +- Bumps `com.diffplug.spotless` from 6.10.0 to 6.11.0 ([#4547](https://github.com/opensearch-project/OpenSearch/pull/4547)) +- Bumps `reactor-core` from 3.4.23 to 3.5.1 ([#5604](https://github.com/opensearch-project/OpenSearch/pull/5604)) +- Bumps `jempbox` from 1.8.16 to 1.8.17 ([#4550](https://github.com/opensearch-project/OpenSearch/pull/4550)) +- Bumps `commons-compress` from 1.21 to 1.22 +- Bumps `jcodings` from 1.0.57 to 1.0.58 ([#5233](https://github.com/opensearch-project/OpenSearch/pull/5233)) +- Bumps `google-http-client-jackson2` from 1.35.0 to 1.42.3 ([#5234](https://github.com/opensearch-project/OpenSearch/pull/5234)) +- Bumps `azure-core` from 1.33.0 to 1.34.0 ([#5235](https://github.com/opensearch-project/OpenSearch/pull/5235)) +- Bumps `azure-core-http-netty` from 1.12.4 to 1.12.7 ([#5235](https://github.com/opensearch-project/OpenSearch/pull/5235)) +- Bumps `spock-core` from 2.1-groovy-3.0 to 2.3-groovy-3.0 ([#5315](https://github.com/opensearch-project/OpenSearch/pull/5315)) +- Bumps `json-schema-validator` from 1.0.69 to 1.0.73 ([#5316](https://github.com/opensearch-project/OpenSearch/pull/5316)) +- Bumps `proto-google-common-protos` from 2.8.0 to 2.10.0 ([#5318](https://github.com/opensearch-project/OpenSearch/pull/5318)) +- Update to Gradle 7.6 and JDK-19 ([#4973](https://github.com/opensearch-project/OpenSearch/pull/4973)) +- Update Apache Lucene to 9.5.0-snapshot-d5cef1c ([#5570](https://github.com/opensearch-project/OpenSearch/pull/5570)) +- Bump antlr4 from 4.9.3 to 4.11.1 ([#4546](https://github.com/opensearch-project/OpenSearch/pull/4546)) +- Bumps `maven-model` from 3.6.2 to 3.8.6 ([#5599](https://github.com/opensearch-project/OpenSearch/pull/5599)) +- Bumps `maxmind-db` from 2.1.0 to 3.0.0 ([#5601](https://github.com/opensearch-project/OpenSearch/pull/5601)) +- Bumps `protobuf-java` from 3.21.11 to 3.21.12 ([#5603](https://github.com/opensearch-project/OpenSearch/pull/5603)) +- Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704)) ### Changed +- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) +- Relax visibility of the HTTP_CHANNEL_KEY and HTTP_SERVER_CHANNEL_KEY to make it possible for the plugins to access associated Netty4HttpChannel / Netty4HttpServerChannel instance ([#4638](https://github.com/opensearch-project/OpenSearch/pull/4638)) +- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725)) +- Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459)) +- Support remote translog transfer for request level durability([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480)) +- Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) +- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) +- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955)) + ### Deprecated + ### Removed +- Remove deprecated code to add node name into log pattern of log4j property file ([#4568](https://github.com/opensearch-project/OpenSearch/pull/4568)) +- Unused object and import within TransportClusterAllocationExplainAction ([#4639](https://github.com/opensearch-project/OpenSearch/pull/4639)) +- Remove LegacyESVersion.V_7_0_* and V_7_1_* Constants ([#2768](https://https://github.com/opensearch-project/OpenSearch/pull/2768)) +- Remove LegacyESVersion.V_7_2_ and V_7_3_ Constants ([#4702](https://github.com/opensearch-project/OpenSearch/pull/4702)) +- Always auto release the flood stage block ([#4703](https://github.com/opensearch-project/OpenSearch/pull/4703)) +- Remove LegacyESVersion.V_7_4_ and V_7_5_ Constants ([#4704](https://github.com/opensearch-project/OpenSearch/pull/4704)) +- Remove Legacy Version support from Snapshot/Restore Service ([#4728](https://github.com/opensearch-project/OpenSearch/pull/4728)) +- Remove deprecated serialization logic from pipeline aggs ([#4847](https://github.com/opensearch-project/OpenSearch/pull/4847)) +- Remove unused private methods ([#4926](https://github.com/opensearch-project/OpenSearch/pull/4926)) +- Remove LegacyESVersion.V_7_8_ and V_7_9_ Constants ([#4855](https://github.com/opensearch-project/OpenSearch/pull/4855)) +- Remove LegacyESVersion.V_7_6_ and V_7_7_ Constants ([#4837](https://github.com/opensearch-project/OpenSearch/pull/4837)) +- Remove LegacyESVersion.V_7_10_ Constants ([#5018](https://github.com/opensearch-project/OpenSearch/pull/5018)) +- Remove Version.V_1_ Constants ([#5021](https://github.com/opensearch-project/OpenSearch/pull/5021)) + ### Fixed +- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) +- Fixed compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) +- Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299)) +- Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) +- Increasing timeout of testQuorumRecovery to 90 seconds from 30 ([#5651](https://github.com/opensearch-project/OpenSearch/pull/5651)) +- [Segment Replication] Fix for peer recovery ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344)) + +### Security + +## [Unreleased 2.x] +### Added +- Prevent deletion of snapshots that are backing searchable snapshot indexes ([#5069](https://github.com/opensearch-project/OpenSearch/pull/5069)) +- Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229)) +- Added support to apply index create block ([#4603](https://github.com/opensearch-project/OpenSearch/issues/4603)) + +### Dependencies +- Bumps `bcpg-fips` from 1.0.5.1 to 1.0.7.1 +- Bumps `azure-storage-blob` from 12.16.1 to 12.20.0 ([#4995](https://github.com/opensearch-project/OpenSearch/pull/4995)) +- Bumps `commons-compress` from 1.21 to 1.22 ([#5104](https://github.com/opensearch-project/OpenSearch/pull/5104)) +- Bump `opencensus-contrib-http-util` from 0.18.0 to 0.31.1 ([#3633](https://github.com/opensearch-project/OpenSearch/pull/3633)) +- Bump `geoip2` from 3.0.2 to 4.0.0 ([#5634](https://github.com/opensearch-project/OpenSearch/pull/5634)) +- Bump gradle-extra-configurations-plugin from 7.0.0 to 8.0.0 ([#4808](https://github.com/opensearch-project/OpenSearch/pull/4808)) +- Bumps `gradle-info-plugin` from 11.3.3 to 12.0.0 ([#5600](https://github.com/opensearch-project/OpenSearch/pull/5600)) +- Bumps `apache-rat` from 0.13 to 0.15 ([#5675](https://github.com/opensearch-project/OpenSearch/pull/5675)) +- Bumps `reactor-netty` from 1.0.18 to 1.1.1 ([#5676](https://github.com/opensearch-project/OpenSearch/pull/5676)) + +### Changed +- Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253)) +- Integrate remote segment store in the failover flow ([#5579](https://github.com/opensearch-project/OpenSearch/pull/5579)) + +### Deprecated +- Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433)) + +### Removed +### Fixed +- Fix 1.x compatibility bug with stored Tasks ([#5412](https://github.com/opensearch-project/OpenSearch/pull/5412)) +- Fix case sensitivity for wildcard queries ([#5462](https://github.com/opensearch-project/OpenSearch/pull/5462)) +- Apply cluster manager throttling settings during bootstrap ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524)) +- Update thresholds map when cluster manager throttling setting is removed ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524)) +- Fix backward compatibility for static cluster manager throttling threshold setting ([#5633](https://github.com/opensearch-project/OpenSearch/pull/5633)) +- Fix index exclusion behavior in snapshot restore and clone APIs ([#5626](https://github.com/opensearch-project/OpenSearch/pull/5626)) + ### Security -[Unreleased 2.5]: https://github.com/opensearch-project/OpenSearch/compare/b8a8b6c4d7fc7a7e32eb2cb68ecad8057a4636ad...2.5 +[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.4...HEAD +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.4...2.x diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 5ab1fc79fa68a..dc634fb10e387 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,7 +9,6 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.junit.BeforeClass; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.indices.segments.IndexShardSegments; @@ -66,14 +65,9 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationIT extends OpenSearchIntegTestCase { - private static final String INDEX_NAME = "test-idx-1"; - private static final int SHARD_COUNT = 1; - private static final int REPLICA_COUNT = 1; - - @BeforeClass - public static void assumeFeatureFlag() { - assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); - } + protected static final String INDEX_NAME = "test-idx-1"; + protected static final int SHARD_COUNT = 1; + protected static final int REPLICA_COUNT = 1; @Override protected Collection> nodePlugins() { @@ -96,11 +90,35 @@ protected boolean addMockInternalEngine() { return false; } + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); + } + + public void ingestDocs(int docCount) throws Exception { + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(docCount); + waitForDocs(docCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + } + } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -127,7 +145,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); // start another node, index another doc and replicate. - String nodeC = internalCluster().startNode(); + String nodeC = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); @@ -138,10 +156,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { } public void testRestartPrimary() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); assertEquals(getNodeContainingPrimaryShard().getName(), primary); @@ -167,10 +185,10 @@ public void testRestartPrimary() throws Exception { public void testCancelPrimaryAllocation() throws Exception { // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); final int initialDocCount = 1; @@ -201,10 +219,13 @@ public void testCancelPrimaryAllocation() throws Exception { /** * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. + * + * TODO: Ignoring this test as its flaky and needs separate fix */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testAddNewReplicaFailure() throws Exception { logger.info("--> starting [Primary Node] ..."); - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); logger.info("--> creating test index ..."); prepareCreate( @@ -227,7 +248,7 @@ public void testAddNewReplicaFailure() throws Exception { assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); logger.info("--> start empty node to add replica shard"); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); // Mock transport service to add behaviour of throwing corruption exception during segment replication process. MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( @@ -268,9 +289,10 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -310,8 +332,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { } public void testIndexReopenClose() throws Exception { - final String primary = internalCluster().startNode(); - final String replica = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); + final String replica = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -355,8 +377,8 @@ public void testMultipleShards() throws Exception { .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, indexSettings); ensureGreen(INDEX_NAME); @@ -396,8 +418,8 @@ public void testMultipleShards() throws Exception { } public void testReplicationAfterForceMerge() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -441,11 +463,11 @@ public void testReplicationAfterForceMerge() throws Exception { } public void testCancellation() throws Exception { - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); ensureYellow(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance( SegmentReplicationSourceService.class, @@ -499,8 +521,9 @@ public void testCancellation() throws Exception { assertDocCounts(docCount, primaryNode); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); ensureGreen(INDEX_NAME); @@ -523,7 +546,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { .prepareUpdateSettings(INDEX_NAME) .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) ); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); @@ -538,8 +561,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { } public void testDeleteOperations() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -601,10 +624,10 @@ public void testDeleteOperations() throws Exception { } public void testUpdateOperations() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellow(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); final int initialDocCount = scaledRandomIntBetween(0, 200); try ( @@ -705,10 +728,10 @@ public void testDropPrimaryDuringReplication() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY); + final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(featureFlagSettings()); + final String primaryNode = internalCluster().startDataOnlyNode(featureFlagSettings()); createIndex(INDEX_NAME, settings); - internalCluster().startDataOnlyNodes(6); + internalCluster().startDataOnlyNodes(6, featureFlagSettings()); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); @@ -731,7 +754,7 @@ public void testDropPrimaryDuringReplication() throws Exception { ensureYellow(INDEX_NAME); // start another replica. - internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(featureFlagSettings()); ensureGreen(INDEX_NAME); // index another doc and refresh - without this the new replica won't catch up. @@ -762,7 +785,7 @@ private void waitForReplicaUpdate() throws Exception { // if we don't have any segments yet, proceed. final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); logger.debug("Primary Segments: {}", primaryShardSegments.getSegments()); - if (primaryShardSegments.getSegments().isEmpty() == false) { + if (primaryShardSegments.getSegments().isEmpty() == false && replicaShardSegments != null) { final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); for (ShardSegments shardSegments : replicaShardSegments) { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java new file mode 100644 index 0000000000000..bc07ef502dc79 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationRelocationIT.java @@ -0,0 +1,277 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.OpenSearchCorruptionException; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +/** + * This test class verifies primary shard relocation with segment replication as replication strategy. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationRelocationIT extends SegmentReplicationIT { + private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); + + private void createIndex() { + prepareCreate( + INDEX_NAME, + Settings.builder() + .put("index.number_of_shards", 1) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.number_of_replicas", 1) + ).get(); + } + + /** + * This test verifies happy path when primary shard is relocated newly added node (target) in the cluster. Before + * relocation and after relocation documents are indexed and documents are verified + */ + public void testPrimaryRelocation() throws Exception { + final String oldPrimary = internalCluster().startNode(featureFlagSettings()); + createIndex(); + final String replica = internalCluster().startNode(featureFlagSettings()); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(0, 200); + ingestDocs(initialDocCount); + + logger.info("--> verifying count {}", initialDocCount); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + logger.info("--> start another node"); + final String newPrimary = internalCluster().startNode(featureFlagSettings()); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> get the state, verify shard 1 primary moved from node1 to node2"); + ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState(); + + logger.info("--> state {}", state); + + assertEquals( + state.getRoutingNodes().node(state.nodes().resolveNode(newPrimary).getId()).iterator().next().state(), + ShardRoutingState.STARTED + ); + + final int finalDocCount = initialDocCount; + ingestDocs(finalDocCount); + refresh(INDEX_NAME); + + logger.info("--> verifying count again {}", initialDocCount + finalDocCount); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount( + client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + assertHitCount( + client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + } + + /** + * This test verifies the primary relocation behavior when segment replication round fails during recovery. Post + * failure, more documents are ingested and verified on replica; which confirms older primary still refreshing the + * replicas. + */ + public void testPrimaryRelocationWithSegRepFailure() throws Exception { + final String oldPrimary = internalCluster().startNode(featureFlagSettings()); + createIndex(); + final String replica = internalCluster().startNode(featureFlagSettings()); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(1, 100); + ingestDocs(initialDocCount); + + logger.info("--> verifying count {}", initialDocCount); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + logger.info("--> start another node"); + final String newPrimary = internalCluster().startNode(featureFlagSettings()); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("3") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + oldPrimary + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, newPrimary), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + throw new OpenSearchCorruptionException("expected"); + } + connection.sendRequest(requestId, action, request, options); + } + ); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + final int finalDocCount = initialDocCount; + ingestDocs(finalDocCount); + refresh(INDEX_NAME); + + logger.info("Verify older primary is still refreshing replica nodes"); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertHitCount( + client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + assertHitCount( + client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + initialDocCount + finalDocCount + ); + } + + /** + * This test verifies primary recovery behavior with continuous ingestion + * + */ + public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { + final String primary = internalCluster().startNode(featureFlagSettings()); + prepareCreate( + INDEX_NAME, + Settings.builder() + .put("index.number_of_shards", 1) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", -1) + ).get(); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet(); + } + logger.info("--> flush to have segments on disk"); + client().admin().indices().prepareFlush().execute().actionGet(); + + logger.info("--> index more docs so there are ops in the transaction log"); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 10; i < 20; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + + final String replica = internalCluster().startNode(featureFlagSettings()); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> relocate the shard from primary to replica"); + ActionFuture relocationListener = client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, primary, replica)) + .execute(); + for (int i = 20; i < 120; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + relocationListener.actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + logger.info("--> verifying count"); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 120L); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a1a755cd34e9b..5a5356282681e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -59,10 +59,10 @@ import org.apache.lucene.util.ThreadInterruptedException; import org.opensearch.Assertions; import org.opensearch.ExceptionsHelper; -import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; +import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; @@ -118,7 +118,6 @@ import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; -import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.ReadOnlyEngine; import org.opensearch.index.engine.RefreshFailedEngineException; import org.opensearch.index.engine.SafeCommitInfo; @@ -160,6 +159,7 @@ import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogFactory; +import org.opensearch.index.translog.TranslogRecoveryRunner; import org.opensearch.index.translog.TranslogStats; import org.opensearch.index.warmer.ShardIndexWarmerService; import org.opensearch.index.warmer.WarmerStats; @@ -206,7 +206,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -320,9 +319,8 @@ Runnable getGlobalCheckpointSyncer() { private final AtomicReference pendingRefreshLocation = new AtomicReference<>(); private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; - private final Store remoteStore; - private final BiFunction translogFactorySupplier; + private final TranslogFactory translogFactory; public IndexShard( final ShardRouting shardRouting, @@ -345,7 +343,7 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - final BiFunction translogFactorySupplier, + final TranslogFactory translogFactory, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore ) throws IOException { @@ -432,7 +430,7 @@ public boolean shouldCache(Query query) { this.refreshPendingLocationListener = new RefreshPendingLocationListener(); this.checkpointPublisher = checkpointPublisher; this.remoteStore = remoteStore; - this.translogFactorySupplier = translogFactorySupplier; + this.translogFactory = translogFactory; } public ThreadPool getThreadPool() { @@ -652,20 +650,17 @@ public void updateShardState( * the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes. */ final Engine engine = getEngine(); - engine.restoreLocalHistoryFromTranslog( - (resettingEngine, snapshot) -> runTranslogRecovery( - resettingEngine, - snapshot, - Engine.Operation.Origin.LOCAL_RESET, - () -> {} - ) - ); + engine.translogManager() + .restoreLocalHistoryFromTranslog( + engine.getProcessedLocalCheckpoint(), + (snapshot) -> runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {}) + ); /* Rolling the translog generation is not strictly needed here (as we will never have collisions between * sequence numbers in a translog generation in a new primary as it takes the last known sequence number * as a starting point), but it simplifies reasoning about the relationship between primary terms and * translog generations. */ - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); engine.fillSeqNoGaps(newPrimaryTerm); replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @@ -750,16 +745,23 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta private final AtomicBoolean primaryReplicaResyncInProgress = new AtomicBoolean(); /** - * Completes the relocation. Operations are blocked and current operations are drained before changing state to relocated. The provided - * {@link Runnable} is executed after all operations are successfully blocked. + * Completes the relocation. Operations are blocked and current operations are drained before changing state to + * relocated. After all operations are successfully blocked, performSegRep is executed followed by target relocation + * handoff. * - * @param consumer a {@link Runnable} that is executed after operations are blocked + * @param performSegRep a {@link Runnable} that is executed after operations are blocked + * @param consumer a {@link Runnable} that is executed after performSegRep + * @param listener ActionListener * @throws IllegalIndexShardStateException if the shard is not relocating due to concurrent cancellation * @throws IllegalStateException if the relocation target is no longer part of the replication group * @throws InterruptedException if blocking operations is interrupted */ - public void relocated(final String targetAllocationId, final Consumer consumer) - throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { + public void relocated( + final String targetAllocationId, + final Consumer consumer, + final Consumer performSegRep, + final ActionListener listener + ) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { @@ -767,26 +769,33 @@ public void relocated(final String targetAllocationId, final Consumer segRepSyncListener = new StepListener<>(); + performSegRep.accept(segRepSyncListener); + segRepSyncListener.whenComplete(r -> { + /* + * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a + * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. + */ + verifyRelocatingState(); + final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId); try { - replicationTracker.abortRelocationHandoff(); - } catch (final Exception inner) { - e.addSuppressed(inner); + consumer.accept(primaryContext); + synchronized (mutex) { + verifyRelocatingState(); + replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under + // mutex + } + } catch (final Exception e) { + try { + replicationTracker.abortRelocationHandoff(); + } catch (final Exception inner) { + e.addSuppressed(inner); + } + throw e; } - throw e; - } + listener.onResponse(null); + }, listener::onFailure); }); } catch (TimeoutException e) { logger.warn("timed out waiting for relocation hand-off to complete"); @@ -1273,7 +1282,7 @@ public FieldDataStats fieldDataStats(String... fields) { } public TranslogStats translogStats() { - return getEngine().getTranslogStats(); + return getEngine().translogManager().getTranslogStats(); } public CompletionStats completionStats(String... fields) { @@ -1308,7 +1317,7 @@ public void flush(FlushRequest request) { public void trimTranslog() { verifyNotClosed(); final Engine engine = getEngine(); - engine.trimUnreferencedTranslogFiles(); + engine.translogManager().trimUnreferencedTranslogFiles(); } /** @@ -1316,7 +1325,7 @@ public void trimTranslog() { */ public void rollTranslogGeneration() { final Engine engine = getEngine(); - engine.rollTranslogGeneration(); + engine.translogManager().rollTranslogGeneration(); } public void forceMerge(ForceMergeRequest forceMerge) throws IOException { @@ -1388,6 +1397,14 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } + private Optional getReplicationEngine() { + if (getEngine() instanceof NRTReplicationEngine) { + return Optional.of((NRTReplicationEngine) getEngine()); + } else { + return Optional.empty(); + } + } + public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException { if (getReplicationEngine().isPresent()) { getReplicationEngine().get().updateSegments(infos, seqNo); @@ -1616,6 +1633,20 @@ static Engine.Searcher wrapSearcher( } } + /** + * Used with segment replication during relocation handoff, this method updates current read only engine to global + * checkpoint followed by changing to writeable engine + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + * + * @opensearch.internal + */ + public void resetToWriteableEngine() throws IOException, InterruptedException, TimeoutException { + indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); + } + /** * Wrapper for a non-closing reader * @@ -1761,10 +1792,10 @@ private long recoverLocallyUpToGlobalCheckpoint() { return safeCommit.get().localCheckpoint + 1; } try { - final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { + final TranslogRecoveryRunner translogRecoveryRunner = (snapshot) -> { recoveryState.getTranslog().totalLocal(snapshot.totalOperations()); final int recoveredOps = runTranslogRecovery( - engine, + getEngine(), snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, recoveryState.getTranslog()::incrementRecoveredOperations @@ -1773,7 +1804,8 @@ private long recoverLocallyUpToGlobalCheckpoint() { return recoveredOps; }; innerOpenEngineAndTranslog(() -> globalCheckpoint); - getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint); + getEngine().translogManager() + .recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), globalCheckpoint); logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint)); } finally { synchronized (engineMutex) { @@ -1850,7 +1882,7 @@ private void validateLocalRecoveryState() { } public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { - getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); + getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo); } /** @@ -1990,11 +2022,11 @@ public void openEngineAndRecoverFromTranslog() throws IOException { maybeCheckIndex(); recoveryState.setStage(RecoveryState.Stage.TRANSLOG); final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog(); - final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { + final TranslogRecoveryRunner translogRecoveryRunner = (snapshot) -> { translogRecoveryStats.totalOperations(snapshot.totalOperations()); translogRecoveryStats.totalOperationsOnStart(snapshot.totalOperations()); return runTranslogRecovery( - engine, + getEngine(), snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, translogRecoveryStats::incrementRecoveredOperations @@ -2002,12 +2034,13 @@ public void openEngineAndRecoverFromTranslog() throws IOException { }; // Do not load the global checkpoint if this is a remote snapshot index - if (indexSettings.isRemoteSnapshot() == false) { + if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings) == false) { loadGlobalCheckpointToReplicationTracker(); } innerOpenEngineAndTranslog(replicationTracker); - getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); + getEngine().translogManager() + .recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE); } /** @@ -2019,7 +2052,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException { recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); - getEngine().skipTranslogRecovery(); + getEngine().translogManager().skipTranslogRecovery(); } private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException { @@ -2060,7 +2093,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t } private boolean assertSequenceNumbersInCommit() throws IOException { - final Map userData = fetchUserData(); + final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint"; assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number"; assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid"; @@ -2075,19 +2108,9 @@ private boolean assertSequenceNumbersInCommit() throws IOException { return true; } - private Map fetchUserData() throws IOException { - if (indexSettings.isRemoteSnapshot() && indexSettings.getExtendedCompatibilitySnapshotVersion() != null) { - // Inefficient method to support reading old Lucene indexes - return Lucene.readSegmentInfosExtendedCompatibility(store.directory(), indexSettings.getExtendedCompatibilitySnapshotVersion()) - .getUserData(); - } else { - return SegmentInfos.readLatestCommit(store.directory()).getUserData(); - } - } - private void onNewEngine(Engine newEngine) { assert Thread.holdsLock(engineMutex); - refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation); + refreshListeners.setCurrentRefreshLocationSupplier(newEngine.translogManager()::getTranslogLastWriteLocation); } /** @@ -2371,7 +2394,7 @@ boolean shouldRollTranslogGeneration() { final Engine engine = getEngineOrNull(); if (engine != null) { try { - return engine.shouldRollTranslogGeneration(); + return engine.translogManager().shouldRollTranslogGeneration(); } catch (final AlreadyClosedException e) { // we are already closed, no need to flush or roll } @@ -2424,7 +2447,7 @@ public Closeable acquireHistoryRetentionLock() { /** * Creates a new history snapshot for reading operations since * the provided starting seqno (inclusive) and ending seqno (inclusive) - * The returned snapshot is retrieved from a Lucene index. + * The returned snapshot can be retrieved from either Lucene index or translog files. */ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo, boolean accurateCount) throws IOException { @@ -2432,15 +2455,14 @@ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, } /** - * Creates a new history snapshot from the translog file instead of the lucene index - * - * @deprecated reading history operations from the translog file is deprecated and will be removed in the next release - * - * Use {@link IndexShard#getHistoryOperations(String, long, long, boolean)} instead + * Creates a new history snapshot from the translog instead of the lucene index. Required for cross cluster replication. + * Use the recommended {@link #getHistoryOperations(String, long, long, boolean)} method for other cases. + * This method should only be invoked if Segment Replication or Remote Store is not enabled. */ - @Deprecated - public Translog.Snapshot getHistoryOperationsFromTranslogFile(String reason, long startingSeqNo, long endSeqNo) throws IOException { - return getEngine().newChangesSnapshotFromTranslogFile(reason, startingSeqNo, endSeqNo, true); + public Translog.Snapshot getHistoryOperationsFromTranslog(long startingSeqNo, long endSeqNo) throws IOException { + assert (indexSettings.isSegRepEnabled() || indexSettings.isRemoteStoreEnabled()) == false + : "unsupported operation for segment replication enabled indices or remote store backed indices"; + return getEngine().translogManager().newChangesSnapshot(startingSeqNo, endSeqNo, true); } /** @@ -2806,24 +2828,11 @@ public long getLocalCheckpoint() { } /** - * Fetch the latest checkpoint that has been processed but not necessarily persisted. This should be used only when Segment Replication is enabled. + * Fetch the latest checkpoint that has been processed but not necessarily persisted. * Also see {@link #getLocalCheckpoint()}. */ public long getProcessedLocalCheckpoint() { - // Returns checkpoint only if the current engine is an instance of NRTReplicationEngine or InternalEngine - return getReplicationEngine().map(NRTReplicationEngine::getProcessedLocalCheckpoint).orElseGet(() -> { - final Engine engine = getEngine(); - assert engine instanceof InternalEngine; - return ((InternalEngine) engine).getProcessedLocalCheckpoint(); - }); - } - - private Optional getReplicationEngine() { - if (getEngine() instanceof NRTReplicationEngine) { - return Optional.of((NRTReplicationEngine) getEngine()); - } else { - return Optional.empty(); - } + return getEngine().getProcessedLocalCheckpoint(); } /** @@ -3239,7 +3248,7 @@ public RetentionLease addPeerRecoveryRetentionLease( ) { assert assertPrimaryMode(); // only needed for BWC reasons involving rolling upgrades from versions that do not support PRRLs: - assert indexSettings.getIndexVersionCreated().before(LegacyESVersion.V_7_4_0) || indexSettings.isSoftDeleteEnabled() == false; + assert indexSettings.isSoftDeleteEnabled() == false; return replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); } @@ -3347,6 +3356,13 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); } + /** + * With segment replication enabled for primary relocation, recover replica shard initially as read only and + * change to a writeable engine during relocation handoff after a round of segment replication. + */ + boolean isReadOnlyReplica = indexSettings.isSegRepEnabled() + && (shardRouting.primary() == false + || (shardRouting.isRelocationTarget() && recoveryState.getStage() != RecoveryState.Stage.FINALIZE)); return this.engineConfigFactory.newEngineConfig( shardId, @@ -3371,8 +3387,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro replicationTracker::getRetentionLeases, () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), - indexSettings.isSegRepEnabled() && shardRouting.primary() == false, - translogFactorySupplier.apply(indexSettings, shardRouting) + isReadOnlyReplica, + translogFactory ); } @@ -3682,7 +3698,7 @@ private void innerAcquireReplicaOperationPermit( if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) { resetEngineToGlobalCheckpoint(); } else { - getEngine().rollTranslogGeneration(); + getEngine().translogManager().rollTranslogGeneration(); } }, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null); @@ -3736,7 +3752,7 @@ private static AsyncIOProcessor createTranslogSyncProcessor( @Override protected void write(List>> candidates) throws IOException { try { - engineSupplier.get().ensureTranslogSynced(candidates.stream().map(Tuple::v1)); + engineSupplier.get().translogManager().ensureTranslogSynced(candidates.stream().map(Tuple::v1)); } catch (AlreadyClosedException ex) { // that's fine since we already synced everything on engine close - this also is conform with the methods // documentation @@ -3764,14 +3780,14 @@ public final void sync(Translog.Location location, Consumer syncListe public void sync() throws IOException { verifyNotClosed(); - getEngine().syncTranslog(); + getEngine().translogManager().syncTranslog(); } /** * Checks if the underlying storage sync is required. */ public boolean isSyncNeeded() { - return getEngine().isTranslogSyncNeeded(); + return getEngine().translogManager().isTranslogSyncNeeded(); } /** @@ -3955,7 +3971,7 @@ public final boolean hasRefreshPending() { } private void setRefreshPending(Engine engine) { - final Translog.Location lastWriteLocation = engine.getTranslogLastWriteLocation(); + final Translog.Location lastWriteLocation = engine.translogManager().getTranslogLastWriteLocation(); pendingRefreshLocation.updateAndGet(curr -> { if (curr == null || curr.compareTo(lastWriteLocation) <= 0) { return lastWriteLocation; @@ -3971,7 +3987,7 @@ private class RefreshPendingLocationListener implements ReferenceManager.Refresh @Override public void beforeRefresh() { try { - lastWriteLocation = getEngine().getTranslogLastWriteLocation(); + lastWriteLocation = getEngine().translogManager().getTranslogLastWriteLocation(); } catch (AlreadyClosedException exc) { // shard is closed - no location is fine lastWriteLocation = null; @@ -4167,15 +4183,17 @@ public void close() throws IOException { newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } - final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( - engine, + final TranslogRecoveryRunner translogRunner = (snapshot) -> runTranslogRecovery( + newEngineReference.get(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { // TODO: add a dedicate recovery stats for the reset translog } ); - newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint); + newEngineReference.get() + .translogManager() + .recoverFromTranslog(translogRunner, newEngineReference.get().getProcessedLocalCheckpoint(), globalCheckpoint); newEngineReference.get().refresh("reset_engine"); synchronized (engineMutex) { verifyNotClosed(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java b/server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java new file mode 100644 index 0000000000000..2600097fd0f2a --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/ForceSyncRequest.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * Request to force a round of segment replication on primary target + * + * @opensearch.internal + */ +public class ForceSyncRequest extends RecoveryTransportRequest { + private final long recoveryId; + private final ShardId shardId; + + public ForceSyncRequest(long requestSeqNo, long recoveryId, ShardId shardId) { + super(requestSeqNo); + this.recoveryId = recoveryId; + this.shardId = shardId; + } + + public ForceSyncRequest(StreamInput in) throws IOException { + super(in); + this.recoveryId = in.readLong(); + this.shardId = new ShardId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeLong(recoveryId); + shardId.writeTo(out); + } + + public long getRecoveryId() { + return recoveryId; + } + + public ShardId getShardId() { + return shardId; + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java index 9ffe61208b78c..4bb6fc1d23b9f 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java @@ -101,7 +101,6 @@ && isTargetSameHistory() final StepListener sendFileStep = new StepListener<>(); final StepListener prepareEngineStep = new StepListener<>(); final StepListener sendSnapshotStep = new StepListener<>(); - final StepListener finalizeStep = new StepListener<>(); if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 6259842c282bf..e2f5ec76f2bd1 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -820,19 +820,34 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis logger ); + final StepListener handoffListener = new StepListener<>(); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); + final Consumer forceSegRepConsumer = shard.indexSettings().isSegRepEnabled() + ? recoveryTarget::forceSegmentFileSync + : res -> res.onResponse(null); // TODO: make relocated async // this acquires all IndexShard operation permits and will thus delay new recoveries until it is done - cancellableThreads.execute(() -> shard.relocated(request.targetAllocationId(), recoveryTarget::handoffPrimaryContext)); + cancellableThreads.execute( + () -> shard.relocated( + request.targetAllocationId(), + recoveryTarget::handoffPrimaryContext, + forceSegRepConsumer, + handoffListener + ) + ); /* * if the recovery process fails after disabling primary mode on the source shard, both relocation source and * target are failed (see {@link IndexShard#updateRoutingEntry}). */ + } else { + handoffListener.onResponse(null); } - stopWatch.stop(); - logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); - listener.onResponse(null); + handoffListener.whenComplete(res -> { + stopWatch.stop(); + logger.trace("finalizing recovery took [{}]", stopWatch.totalTime()); + listener.onResponse(null); + }, listener::onFailure); }, listener::onFailure); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 62aad1ce263ad..f0918813da62b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -263,6 +263,11 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { + throw new UnsupportedOperationException("Method not supported on target!"); + } + @Override public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSeqNo, ActionListener listener) { ActionListener.completeWith(listener, () -> { diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java index c750c0e88364b..ef0d4abc44c7d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java @@ -53,6 +53,15 @@ public interface RecoveryTargetHandler extends FileChunkWriter { */ void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener); + /** + * Used with Segment replication only + * + * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files + * conflict with replicas when target is promoted as primary. + * @param listener segment replication event listener + */ + void forceSegmentFileSync(ActionListener listener); + /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates * the global checkpoint. diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index e7ae62c1bee7d..5f638103a021c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -45,6 +45,7 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.RemoteSegmentFileChunkWriter; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.transport.EmptyTransportResponseHandler; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportResponse; @@ -186,6 +187,23 @@ public void indexTranslogOperations( retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader); } + /** + * Used with Segment replication only + * + * This function is used to force a sync target primary node with source (old primary). This is to avoid segment files + * conflict with replicas when target is promoted as primary. + * @param listener segment replication event listener + */ + @Override + public void forceSegmentFileSync(ActionListener listener) { + final String action = SegmentReplicationTargetService.Actions.FORCE_SYNC; + final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); + final ForceSyncRequest request = new ForceSyncRequest(requestSeqNo, recoveryId, shardId); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + final ActionListener responseListener = ActionListener.map(listener, r -> null); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); + } + @Override public void receiveFileInfo( List phase1FileNames, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 8fc53ccd3bc08..0b17a2c4ca7f6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -22,7 +22,9 @@ import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.ForceSyncRequest; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; @@ -33,9 +35,12 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; +import java.io.IOException; import java.util.Map; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; /** @@ -56,6 +61,8 @@ public class SegmentReplicationTargetService implements IndexEventListener { private final Map latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap(); + private final IndicesService indicesService; + // Empty Implementation, only required while Segment Replication is under feature flag. public static final SegmentReplicationTargetService NO_OP = new SegmentReplicationTargetService() { @Override @@ -80,6 +87,7 @@ private SegmentReplicationTargetService() { recoverySettings = null; onGoingReplications = null; sourceFactory = null; + indicesService = null; } public ReplicationRef get(long replicationId) { @@ -93,18 +101,21 @@ public ReplicationRef get(long replicationId) { */ public static class Actions { public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk"; + public static final String FORCE_SYNC = "internal:index/shard/replication/segments_sync"; } public SegmentReplicationTargetService( final ThreadPool threadPool, final RecoverySettings recoverySettings, final TransportService transportService, - final SegmentReplicationSourceFactory sourceFactory + final SegmentReplicationSourceFactory sourceFactory, + final IndicesService indicesService ) { this.threadPool = threadPool; this.recoverySettings = recoverySettings; this.onGoingReplications = new ReplicationCollection<>(logger, threadPool); this.sourceFactory = sourceFactory; + this.indicesService = indicesService; transportService.registerRequestHandler( Actions.FILE_CHUNK, @@ -112,6 +123,12 @@ public SegmentReplicationTargetService( FileChunkRequest::new, new FileChunkTransportRequestHandler() ); + transportService.registerRequestHandler( + Actions.FORCE_SYNC, + ThreadPool.Names.GENERIC, + ForceSyncRequest::new, + new ForceSyncTransportRequestHandler() + ); } /** @@ -320,4 +337,60 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha } } } + + class ForceSyncTransportRequestHandler implements TransportRequestHandler { + @Override + public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception { + assert indicesService != null; + final IndexShard indexShard = indicesService.getShardOrNull(request.getShardId()); + startReplication( + ReplicationCheckpoint.empty(request.getShardId()), + indexShard, + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication complete, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + try { + indexShard.resetToWriteableEngine(); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (InterruptedException | TimeoutException | IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + logger.trace( + () -> new ParameterizedMessage( + "[shardId {}] [replication id {}] Replication failed, timing data: {}", + indexShard.shardId().getId(), + state.getReplicationId(), + state.getTimingData() + ) + ); + if (sendShardFailure == true) { + indexShard.failShard("replication failure", e); + } + try { + channel.sendResponse(e); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + ); + } + } + } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index b05c58e4d23a4..55a3de98721c1 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1057,7 +1057,8 @@ protected Node( threadPool, recoverySettings, transportService, - new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService), + indicesService ) ); b.bind(SegmentReplicationSourceService.class) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 45c41a64d98b0..5e979b934ebec 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -50,6 +50,7 @@ import org.apache.lucene.util.Constants; import org.junit.Assert; import org.opensearch.Assertions; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -141,6 +142,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; @@ -994,7 +996,22 @@ public void testOperationPermitOnReplicaShards() throws Exception { AllocationId.newRelocation(routing.allocationId()) ); IndexShardTestCase.updateRoutingEntry(indexShard, routing); - indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + indexShard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(indexShard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); engineClosed = false; break; } @@ -1947,7 +1964,22 @@ public void testLockingBeforeAndAfterRelocated() throws Exception { Thread recoveryThread = new Thread(() -> { latch.countDown(); try { - shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1979,7 +2011,22 @@ public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception { Thread recoveryThread = new Thread(() -> { try { startRecovery.await(); - shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> relocationStarted.countDown()); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> relocationStarted.countDown(), + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -2067,11 +2114,26 @@ public void run() { AtomicBoolean relocated = new AtomicBoolean(); final Thread recoveryThread = new Thread(() -> { try { - shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + relocated.set(true); + } + + @Override + public void onFailure(Exception e) { + relocated.set(false); + fail(e.toString()); + } + } + ); } catch (InterruptedException e) { throw new RuntimeException(e); } - relocated.set(true); }); // ensure we wait for all primary operation locks to be acquired allPrimaryOperationLocksAcquired.await(); @@ -2102,20 +2164,72 @@ public void testRelocatedShardCanNotBeRevived() throws IOException, InterruptedE final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, routing); - shard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting)); closeShards(shard); } - public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException { + public void testRelocatedSegRepConsumerError() throws IOException, InterruptedException { + final IndexShard shard = newStartedShard(true); + final ShardRouting originalRouting = shard.routingEntry(); + final ShardRouting routing = ShardRoutingHelper.relocate(originalRouting, "other_node"); + IndexShardTestCase.updateRoutingEntry(shard, routing); + shard.relocated( + routing.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onFailure(new ReplicationFailedException("Segment replication failed")), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + fail("Expected failure"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(ExceptionsHelper.unwrapCause(e) instanceof ReplicationFailedException); + assertEquals(e.getMessage(), "Segment replication failed"); + } + } + ); + closeShards(shard); + } + + public void testShardCanNotBeMarkedAsRelocatedIfRelocationCancelled() throws IOException, InterruptedException { final IndexShard shard = newStartedShard(true); final ShardRouting originalRouting = shard.routingEntry(); final ShardRouting relocationRouting = ShardRoutingHelper.relocate(originalRouting, "other_node"); IndexShardTestCase.updateRoutingEntry(shard, relocationRouting); IndexShardTestCase.updateRoutingEntry(shard, originalRouting); - expectThrows( - IllegalIndexShardStateException.class, - () -> shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}) + shard.relocated( + relocationRouting.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + fail("IllegalIndexShardStateException expected!"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalIndexShardStateException); + } + } ); closeShards(shard); } @@ -2130,13 +2244,28 @@ public void testRelocatedShardCanNotBeRevivedConcurrently() throws IOException, Thread relocationThread = new Thread(new AbstractRunnable() { @Override public void onFailure(Exception e) { - relocationException.set(e); + fail(e.toString()); } @Override protected void doRun() throws Exception { cyclicBarrier.await(); - shard.relocated(relocationRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + relocationRouting.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + relocationException.set(e); + } + } + ); } }); relocationThread.start(); @@ -2184,20 +2313,48 @@ public void testRelocateMissingTarget() throws Exception { final ShardRouting toNode2 = ShardRoutingHelper.relocate(original, "node_2"); IndexShardTestCase.updateRoutingEntry(shard, toNode2); final AtomicBoolean relocated = new AtomicBoolean(); - final IllegalStateException error = expectThrows( - IllegalStateException.class, - () -> shard.relocated(toNode1.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true)) - ); - assertThat( - error.getMessage(), - equalTo( - "relocation target [" - + toNode1.getTargetRelocatingShard().allocationId().getId() - + "] is no longer part of the replication group" - ) + shard.relocated( + toNode1.getTargetRelocatingShard().allocationId().getId(), + ctx -> relocated.set(true), + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + fail("Expected IllegalStateException!"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalStateException); + assertThat( + e.getMessage(), + equalTo( + "relocation target [" + + toNode1.getTargetRelocatingShard().allocationId().getId() + + "] is no longer part of the replication group" + ) + ); + } + } ); + assertFalse(relocated.get()); - shard.relocated(toNode2.getTargetRelocatingShard().allocationId().getId(), ctx -> relocated.set(true)); + shard.relocated( + toNode2.getTargetRelocatingShard().allocationId().getId(), + ctx -> relocated.set(true), + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(relocated.get()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); assertTrue(relocated.get()); closeShards(shard); } @@ -2590,7 +2747,22 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc assertThat(shard.state(), equalTo(IndexShardState.STARTED)); ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node"); IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting); - shard.relocated(inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}); + shard.relocated( + inRecoveryRouting.getTargetRelocatingShard().allocationId().getId(), + primaryContext -> {}, + r -> r.onResponse(null), + new ActionListener() { + @Override + public void onResponse(Void unused) { + assertTrue(shard.isRelocatedPrimary()); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + } + ); assertTrue(shard.isRelocatedPrimary()); try { IndexShardTestCase.updateRoutingEntry(shard, origRouting); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 11fae987174f6..c46f97b5ec785 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -11,7 +11,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; -import org.opensearch.OpenSearchException; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; @@ -34,6 +34,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.CheckpointInfoResponse; import org.opensearch.indices.replication.GetSegmentFilesResponse; import org.opensearch.indices.replication.SegmentReplicationSource; @@ -44,6 +45,9 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationFailedException; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -54,9 +58,12 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -215,7 +222,7 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { IndexShard primaryShard = newStartedShard(true); SegmentReplicationTargetService sut; - sut = prepareForReplication(primaryShard); + sut = prepareForReplication(primaryShard, null); SegmentReplicationTargetService spy = spy(sut); // Starting a new shard in PrimaryMode and shard routing primary. @@ -279,6 +286,91 @@ public void testReplicaReceivesGenIncrease() throws Exception { } } + public void testPrimaryRelocation() throws Exception { + final IndexShard primarySource = newStartedShard(true, settings); + int totalOps = randomInt(10); + for (int i = 0; i < totalOps; i++) { + indexDoc(primarySource, "_doc", Integer.toString(i)); + } + IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); + final IndexShard primaryTarget = newShard( + primarySource.routingEntry().getTargetRelocatingShard(), + settings, + new NRTReplicationEngineFactory() + ); + updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); + + BiFunction, ActionListener, List> replicatePrimaryFunction = ( + shardList, + listener) -> { + try { + assert shardList.size() >= 2; + final IndexShard primary = shardList.get(0); + return replicateSegments(primary, shardList.subList(1, shardList.size()), listener); + } catch (IOException | InterruptedException e) { + listener.onFailure(e); + throw new RuntimeException(e); + } + }; + recoverReplica(primaryTarget, primarySource, true, replicatePrimaryFunction); + + // check that local checkpoint of new primary is properly tracked after primary relocation + assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L)); + assertThat( + primaryTarget.getReplicationTracker() + .getTrackedLocalCheckpointForShard(primaryTarget.routingEntry().allocationId().getId()) + .getLocalCheckpoint(), + equalTo(totalOps - 1L) + ); + assertDocCount(primaryTarget, totalOps); + closeShards(primarySource, primaryTarget); + } + + public void testPrimaryRelocationWithSegRepFailure() throws Exception { + final IndexShard primarySource = newStartedShard(true, settings); + int totalOps = randomInt(10); + for (int i = 0; i < totalOps; i++) { + indexDoc(primarySource, "_doc", Integer.toString(i)); + } + IndexShardTestCase.updateRoutingEntry(primarySource, primarySource.routingEntry().relocate(randomAlphaOfLength(10), -1)); + final IndexShard primaryTarget = newShard( + primarySource.routingEntry().getTargetRelocatingShard(), + settings, + new NRTReplicationEngineFactory() + ); + updateMappings(primaryTarget, primarySource.indexSettings().getIndexMetadata()); + + BiFunction, ActionListener, List> replicatePrimaryFunction = ( + shardList, + listener) -> { + listener.onFailure(new IOException("Expected failure")); + return null; + }; + Exception e = expectThrows( + Exception.class, + () -> recoverReplica( + primaryTarget, + primarySource, + (primary, sourceNode) -> new RecoveryTarget(primary, sourceNode, new ReplicationListener() { + @Override + public void onDone(ReplicationState state) { + throw new AssertionError("recovery must fail"); + } + + @Override + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("Expected failure")); + } + }), + true, + true, + replicatePrimaryFunction + ) + ); + assertThat(e, hasToString(containsString("Expected failure"))); + closeShards(primarySource, primaryTarget); + } + public void testReplicaReceivesLowerGeneration() throws Exception { // when a replica gets incoming segments that are lower than what it currently has on disk. @@ -741,7 +833,8 @@ private SegmentReplicationTargetService newTargetService(SegmentReplicationSourc threadPool, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), mock(TransportService.class), - sourceFactory + sourceFactory, + null ); } @@ -783,8 +876,7 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { - assertTrue(e instanceof CancellableThreads.ExecutionCancelledException); + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { assertFalse(sendShardFailure); assertEquals(SegmentReplicationState.Stage.CANCELLED, state.getStage()); latch.countDown(); diff --git a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java index 7761f97769440..0f5bba4f0c332 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java @@ -1116,6 +1116,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) {} + @Override + public void forceSegmentFileSync(ActionListener listener) {} + @Override public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 7437cb22e44d1..51afaef315f26 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -70,7 +70,7 @@ public void setUp() throws Exception { replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource); - sut = prepareForReplication(primaryShard); + sut = prepareForReplication(primaryShard, null); initialCheckpoint = replicaShard.getLatestReplicationCheckpoint(); aheadCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 5732fc5bfa270..415a3d487b790 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1900,7 +1900,8 @@ public void onFailure(final Exception e) { threadPool, recoverySettings, transportService, - new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService), + indicesService ), SegmentReplicationSourceService.NO_OP, shardStateAction, diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 20fe47c1d4cc0..ad19473380063 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -544,7 +544,8 @@ public void recoverReplica( targetSupplier, markAsRecovering, inSyncIds, - routingTable + routingTable, + (a, b) -> null ); OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); computeReplicationTargets(); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index f01a189e974bf..341598ffa7b88 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -38,9 +38,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.junit.Assert; -import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; @@ -62,7 +60,6 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.BytesArray; @@ -99,9 +96,8 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.InternalTranslogFactory; -import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.TranslogFactory; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.recovery.AsyncRecoveryTarget; @@ -125,12 +121,11 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; -import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.OpenSearchBlobStoreRepositoryIntegTestCase; import org.opensearch.snapshots.Snapshot; import org.opensearch.test.DummyShardLock; @@ -176,6 +171,8 @@ public abstract class IndexShardTestCase extends OpenSearchTestCase { private static final AtomicBoolean failOnShardFailures = new AtomicBoolean(true); + private RecoveryTarget recoveryTarget; + private static final Consumer DEFAULT_SHARD_FAILURE_HANDLER = failure -> { if (failOnShardFailures.get()) { throw new AssertionError(failure.reason, failure.cause); @@ -189,7 +186,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { throw new AssertionError(e); } }; @@ -553,17 +550,6 @@ protected IndexShard newShard( if (remoteStore == null && indexSettings.isRemoteStoreEnabled()) { remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); } - - final BiFunction translogFactorySupplier = (settings, shardRouting) -> { - if (settings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { - return new RemoteBlobStoreInternalTranslogFactory( - this::createRepositoriesService, - threadPool, - settings.getRemoteStoreTranslogRepository() - ); - } - return new InternalTranslogFactory(); - }; indexShard = new IndexShard( routing, indexSettings, @@ -585,7 +571,7 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - translogFactorySupplier, + new InternalTranslogFactory(), checkpointPublisher, remoteStore ); @@ -599,18 +585,6 @@ protected IndexShard newShard( return indexShard; } - protected RepositoriesService createRepositoriesService() { - RepositoriesService repositoriesService = Mockito.mock(RepositoriesService.class); - BlobStoreRepository repository = Mockito.mock(BlobStoreRepository.class); - when(repository.basePath()).thenReturn(new BlobPath()); - BlobStore blobStore = Mockito.mock(BlobStore.class); - BlobContainer blobContainer = Mockito.mock(BlobContainer.class); - when(blobStore.blobContainer(any())).thenReturn(blobContainer); - when(repository.blobStore()).thenReturn(blobStore); - when(repositoriesService.repository(any(String.class))).thenReturn(repository); - return repositoriesService; - } - protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata) throws IOException { Settings nodeSettings = Settings.builder().put("node.name", shardRouting.currentNodeId()).build(); @@ -834,18 +808,45 @@ protected DiscoveryNode getFakeDiscoNode(String id) { ); } - /** recovers a replica from the given primary **/ protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException { - recoverReplica(replica, primary, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener), true, startReplica); + recoverReplica(replica, primary, startReplica, (a, b) -> null); } /** recovers a replica from the given primary **/ + protected void recoverReplica( + IndexShard replica, + IndexShard primary, + boolean startReplica, + BiFunction, ActionListener, List> replicatePrimaryFunction + ) throws IOException { + recoverReplica( + replica, + primary, + (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener), + true, + startReplica, + replicatePrimaryFunction + ); + } + protected void recoverReplica( final IndexShard replica, final IndexShard primary, final BiFunction targetSupplier, final boolean markAsRecovering, final boolean markAsStarted + ) throws IOException { + recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a, b) -> null); + } + + /** recovers a replica from the given primary **/ + protected void recoverReplica( + final IndexShard replica, + final IndexShard primary, + final BiFunction targetSupplier, + final boolean markAsRecovering, + final boolean markAsStarted, + final BiFunction, ActionListener, List> replicatePrimaryFunction ) throws IOException { IndexShardRoutingTable.Builder newRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId()); newRoutingTable.addShard(primary.routingEntry()); @@ -854,7 +855,7 @@ protected void recoverReplica( } final Set inSyncIds = Collections.singleton(primary.routingEntry().allocationId().getId()); final IndexShardRoutingTable routingTable = newRoutingTable.build(); - recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, routingTable); + recoverUnstartedReplica(replica, primary, targetSupplier, markAsRecovering, inSyncIds, routingTable, replicatePrimaryFunction); if (markAsStarted) { startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable); } @@ -877,7 +878,8 @@ protected final void recoverUnstartedReplica( final BiFunction targetSupplier, final boolean markAsRecovering, final Set inSyncIds, - final IndexShardRoutingTable routingTable + final IndexShardRoutingTable routingTable, + final BiFunction, ActionListener, List> replicatePrimaryFunction ) throws IOException { final DiscoveryNode pNode = getFakeDiscoNode(primary.routingEntry().currentNodeId()); final DiscoveryNode rNode = getFakeDiscoNode(replica.routingEntry().currentNodeId()); @@ -911,7 +913,7 @@ protected final void recoverUnstartedReplica( recoverySettings.setChunkSize(new ByteSizeValue(fileChunkSizeInBytes)); final RecoverySourceHandler recovery = RecoverySourceHandlerFactory.create( primary, - new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), + new AsyncRecoveryTarget(recoveryTarget, threadPool.generic(), primary, replica, replicatePrimaryFunction), request, recoverySettings ); @@ -1226,14 +1228,17 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) { * been configured to return the given primaryShard's current segments. * * @param primaryShard {@link IndexShard} - The primary shard to replicate from. + * @param target {@link IndexShard} - The target replica shard in segment replication. */ - public final SegmentReplicationTargetService prepareForReplication(IndexShard primaryShard) { + public final SegmentReplicationTargetService prepareForReplication(IndexShard primaryShard, IndexShard target) { final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final IndicesService indicesService = mock(IndicesService.class); final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService( threadPool, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), mock(TransportService.class), - sourceFactory + sourceFactory, + indicesService ); final SegmentReplicationSource replicationSource = new SegmentReplicationSource() { @Override @@ -1273,6 +1278,7 @@ public void getSegmentFiles( } }; when(sourceFactory.get(any())).thenReturn(replicationSource); + when(indicesService.getShardOrNull(any())).thenReturn(target); return targetService; } @@ -1284,16 +1290,10 @@ public void getSegmentFiles( * @param replicaShards - Replicas that will be updated. * @return {@link List} List of target components orchestrating replication. */ - public final List replicateSegments(IndexShard primaryShard, List replicaShards) - throws IOException, InterruptedException { - final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard); - return replicateSegments(targetService, primaryShard, replicaShards); - } - public final List replicateSegments( - SegmentReplicationTargetService targetService, IndexShard primaryShard, - List replicaShards + List replicaShards, + ActionListener... listeners ) throws IOException, InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); Map primaryMetadata; @@ -1303,6 +1303,7 @@ public final List replicateSegments( } List ids = new ArrayList<>(); for (IndexShard replica : replicaShards) { + final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); final SegmentReplicationTarget target = targetService.startReplication( ReplicationCheckpoint.empty(replica.shardId), replica, @@ -1316,7 +1317,13 @@ public void onReplicationDone(SegmentReplicationState state) { assertTrue(recoveryDiff.missing.isEmpty()); assertTrue(recoveryDiff.different.isEmpty()); assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); + for (ActionListener listener : listeners) { + listener.onResponse(null); + } } catch (Exception e) { + for (ActionListener listener : listeners) { + listener.onFailure(e); + } throw ExceptionsHelper.convertToRuntime(e); } finally { countDownLatch.countDown(); @@ -1324,7 +1331,11 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { logger.error("Unexpected replication failure in test", e); Assert.fail("test replication should not fail: " + e); } diff --git a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java index b3ddec889c1e2..45b11c95b4102 100644 --- a/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java +++ b/test/framework/src/main/java/org/opensearch/indices/recovery/AsyncRecoveryTarget.java @@ -38,12 +38,15 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeases; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.SegmentReplicationTarget; import java.util.List; import java.util.concurrent.Executor; +import java.util.function.BiFunction; /** * Wraps a {@link RecoveryTarget} to make all remote calls to be executed asynchronously using the provided {@code executor}. @@ -52,9 +55,32 @@ public class AsyncRecoveryTarget implements RecoveryTargetHandler { private final RecoveryTargetHandler target; private final Executor executor; + private final IndexShard primary; + + private final IndexShard replica; + + private final BiFunction, ActionListener, List> replicatePrimaryFunction; + public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) { this.executor = executor; this.target = target; + this.primary = null; + this.replica = null; + this.replicatePrimaryFunction = (a, b) -> null; + } + + public AsyncRecoveryTarget( + RecoveryTargetHandler target, + Executor executor, + IndexShard primary, + IndexShard replica, + BiFunction, ActionListener, List> replicatePrimaryFunction + ) { + this.executor = executor; + this.target = target; + this.primary = primary; + this.replica = replica; + this.replicatePrimaryFunction = replicatePrimaryFunction; } @Override @@ -62,6 +88,11 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener target.prepareForTranslogOperations(totalTranslogOps, listener)); } + @Override + public void forceSegmentFileSync(ActionListener listener) { + executor.execute(() -> this.replicatePrimaryFunction.apply(List.of(primary, replica), listener)); + } + @Override public void finalizeRecovery(long globalCheckpoint, long trimAboveSeqNo, ActionListener listener) { executor.execute(() -> target.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, listener));