From 7d925654bb2bdf9ebdac79242800942eb7494f8d Mon Sep 17 00:00:00 2001 From: Vacha Shah Date: Mon, 1 Aug 2022 14:13:37 -0700 Subject: [PATCH] Upgrade dependencies (#4047) * Upgrading dependencies for hadoop and aws-java-sdk Signed-off-by: Vacha Shah * Fixing precommit Signed-off-by: Vacha Shah * Upgrading transitive dependencies Signed-off-by: Vacha Shah * Excluding transitive dependencies Signed-off-by: Vacha Shah (cherry picked from commit 88f5537b10a77cd1a7aa32990df54e581f1ea369) --- buildSrc/build.gradle | 2 + buildSrc/version.properties | 2 +- .../aws-java-sdk-core-1.12.247.jar.sha1 | 1 - .../aws-java-sdk-core-1.12.270.jar.sha1 | 1 + .../aws-java-sdk-ec2-1.12.247.jar.sha1 | 1 - .../aws-java-sdk-ec2-1.12.270.jar.sha1 | 1 + plugins/repository-hdfs/build.gradle | 10 +- .../licenses/hadoop-client-api-3.3.1.jar.sha1 | 1 - .../licenses/hadoop-client-api-3.3.3.jar.sha1 | 1 + .../hadoop-client-runtime-3.3.1.jar.sha1 | 1 - .../hadoop-client-runtime-3.3.3.jar.sha1 | 1 + .../licenses/hadoop-hdfs-3.3.1.jar.sha1 | 1 - .../licenses/hadoop-hdfs-3.3.3.jar.sha1 | 1 + .../aws-java-sdk-core-1.12.247.jar.sha1 | 1 - .../aws-java-sdk-core-1.12.270.jar.sha1 | 1 + .../aws-java-sdk-s3-1.12.247.jar.sha1 | 1 - .../aws-java-sdk-s3-1.12.270.jar.sha1 | 1 + .../aws-java-sdk-sts-1.12.247.jar.sha1 | 1 - .../aws-java-sdk-sts-1.12.270.jar.sha1 | 1 + .../licenses/jmespath-java-1.12.247.jar.sha1 | 1 - .../licenses/jmespath-java-1.12.270.jar.sha1 | 1 + .../replication/SegmentReplicationIT.java | 306 ++++++++++++++++++ test/fixtures/hdfs-fixture/build.gradle | 5 +- 23 files changed, 326 insertions(+), 17 deletions(-) delete mode 100644 plugins/discovery-ec2/licenses/aws-java-sdk-core-1.12.247.jar.sha1 create mode 100644 plugins/discovery-ec2/licenses/aws-java-sdk-core-1.12.270.jar.sha1 delete mode 100644 plugins/discovery-ec2/licenses/aws-java-sdk-ec2-1.12.247.jar.sha1 create mode 100644 plugins/discovery-ec2/licenses/aws-java-sdk-ec2-1.12.270.jar.sha1 delete mode 100644 plugins/repository-hdfs/licenses/hadoop-client-api-3.3.1.jar.sha1 create mode 100644 plugins/repository-hdfs/licenses/hadoop-client-api-3.3.3.jar.sha1 delete mode 100644 plugins/repository-hdfs/licenses/hadoop-client-runtime-3.3.1.jar.sha1 create mode 100644 plugins/repository-hdfs/licenses/hadoop-client-runtime-3.3.3.jar.sha1 delete mode 100644 plugins/repository-hdfs/licenses/hadoop-hdfs-3.3.1.jar.sha1 create mode 100644 plugins/repository-hdfs/licenses/hadoop-hdfs-3.3.3.jar.sha1 delete mode 100644 plugins/repository-s3/licenses/aws-java-sdk-core-1.12.247.jar.sha1 create mode 100644 plugins/repository-s3/licenses/aws-java-sdk-core-1.12.270.jar.sha1 delete mode 100644 plugins/repository-s3/licenses/aws-java-sdk-s3-1.12.247.jar.sha1 create mode 100644 plugins/repository-s3/licenses/aws-java-sdk-s3-1.12.270.jar.sha1 delete mode 100644 plugins/repository-s3/licenses/aws-java-sdk-sts-1.12.247.jar.sha1 create mode 100644 plugins/repository-s3/licenses/aws-java-sdk-sts-1.12.270.jar.sha1 delete mode 100644 plugins/repository-s3/licenses/jmespath-java-1.12.247.jar.sha1 create mode 100644 plugins/repository-s3/licenses/jmespath-java-1.12.270.jar.sha1 create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 8092153c9f685..0b6b51443fe5a 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -112,6 +112,8 @@ dependencies { api 'commons-io:commons-io:2.7' api "net.java.dev.jna:jna:5.11.0" api 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2' + api 'org.jdom:jdom2:2.0.6.1' + api 'org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.7.10' api 'de.thetaphi:forbiddenapis:3.3' api 'com.avast.gradle:gradle-docker-compose-plugin:0.14.12' api 'org.apache.maven:maven-model:3.6.2' diff --git a/buildSrc/version.properties b/buildSrc/version.properties index fb1c6519efdfb..90898265e9303 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -32,7 +32,7 @@ commonslogging = 1.2 commonscodec = 1.13 # plugin dependencies -aws = 1.12.247 +aws = 1.12.270 # when updating this version, you need to ensure compatibility with: # - plugins/ingest-attachment (transitive dependency, check the upstream POM) diff --git a/plugins/discovery-ec2/licenses/aws-java-sdk-core-1.12.247.jar.sha1 b/plugins/discovery-ec2/licenses/aws-java-sdk-core-1.12.247.jar.sha1 deleted file mode 100644 index 5b3f4a3511769..0000000000000 --- a/plugins/discovery-ec2/licenses/aws-java-sdk-core-1.12.247.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -70f59d940c965a899f69743ec36a8eb099f539ef \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/aws-java-sdk-core-1.12.270.jar.sha1 b/plugins/discovery-ec2/licenses/aws-java-sdk-core-1.12.270.jar.sha1 new file mode 100644 index 0000000000000..ce40f68b3e229 --- /dev/null +++ b/plugins/discovery-ec2/licenses/aws-java-sdk-core-1.12.270.jar.sha1 @@ -0,0 +1 @@ +8f0cc2cc1b41c51e2117f5b1ce6530febf99d4ba \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/aws-java-sdk-ec2-1.12.247.jar.sha1 b/plugins/discovery-ec2/licenses/aws-java-sdk-ec2-1.12.247.jar.sha1 deleted file mode 100644 index 03505417f3e26..0000000000000 --- a/plugins/discovery-ec2/licenses/aws-java-sdk-ec2-1.12.247.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -30120ff6617fb653d525856480d7ba99528d875d \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/aws-java-sdk-ec2-1.12.270.jar.sha1 b/plugins/discovery-ec2/licenses/aws-java-sdk-ec2-1.12.270.jar.sha1 new file mode 100644 index 0000000000000..ac00f6d4e8038 --- /dev/null +++ b/plugins/discovery-ec2/licenses/aws-java-sdk-ec2-1.12.270.jar.sha1 @@ -0,0 +1 @@ +6d0ce44b33006e163c25f394f869e6b3a51aefc5 \ No newline at end of file diff --git a/plugins/repository-hdfs/build.gradle b/plugins/repository-hdfs/build.gradle index 1ad8c3f82f1eb..5534e2acf6a61 100644 --- a/plugins/repository-hdfs/build.gradle +++ b/plugins/repository-hdfs/build.gradle @@ -48,7 +48,7 @@ opensearchplugin { } versions << [ - 'hadoop3': '3.3.1' + 'hadoop3': '3.3.3' ] testFixtures.useFixture ":test:fixtures:krb5kdc-fixture", "hdfs" @@ -60,7 +60,9 @@ configurations { dependencies { api "org.apache.hadoop:hadoop-client-api:${versions.hadoop3}" runtimeOnly "org.apache.hadoop:hadoop-client-runtime:${versions.hadoop3}" - api "org.apache.hadoop:hadoop-hdfs:${versions.hadoop3}" + api("org.apache.hadoop:hadoop-hdfs:${versions.hadoop3}") { + exclude module: 'jetty-server' + } api 'org.apache.htrace:htrace-core4:4.2.0-incubating' api "org.apache.logging.log4j:log4j-core:${versions.log4j}" api 'org.apache.avro:avro:1.10.2' @@ -438,10 +440,6 @@ thirdPartyAudit { 'org.apache.hadoop.shaded.org.apache.curator.shaded.com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper', 'org.apache.hadoop.shaded.org.apache.curator.shaded.com.google.common.util.concurrent.AbstractFuture$UnsafeAtomicHelper$1', 'org.apache.hadoop.shaded.org.xbill.DNS.spi.DNSJavaNameServiceDescriptor', - 'org.apache.hadoop.shaded.org.xerial.snappy.pure.PureJavaSnappy', - 'org.apache.hadoop.shaded.org.xerial.snappy.pure.SnappyRawCompressor', - 'org.apache.hadoop.shaded.org.xerial.snappy.pure.SnappyRawDecompressor', - 'org.apache.hadoop.shaded.org.xerial.snappy.pure.UnsafeUtil', 'org.apache.avro.reflect.FieldAccessUnsafe', 'org.apache.avro.reflect.FieldAccessUnsafe$UnsafeBooleanField', diff --git a/plugins/repository-hdfs/licenses/hadoop-client-api-3.3.1.jar.sha1 b/plugins/repository-hdfs/licenses/hadoop-client-api-3.3.1.jar.sha1 deleted file mode 100644 index dc2f20e310d30..0000000000000 --- a/plugins/repository-hdfs/licenses/hadoop-client-api-3.3.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4b9c9cdd9967495838fb521001699c4c9dddf183 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/hadoop-client-api-3.3.3.jar.sha1 b/plugins/repository-hdfs/licenses/hadoop-client-api-3.3.3.jar.sha1 new file mode 100644 index 0000000000000..8df133d0bd106 --- /dev/null +++ b/plugins/repository-hdfs/licenses/hadoop-client-api-3.3.3.jar.sha1 @@ -0,0 +1 @@ +d0593aed2d4df9bcee507550913d29d589ebd84a \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/hadoop-client-runtime-3.3.1.jar.sha1 b/plugins/repository-hdfs/licenses/hadoop-client-runtime-3.3.1.jar.sha1 deleted file mode 100644 index feb37ecc90255..0000000000000 --- a/plugins/repository-hdfs/licenses/hadoop-client-runtime-3.3.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -f3a55d882328ee87a1054f99d62ba987fa9029a4 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/hadoop-client-runtime-3.3.3.jar.sha1 b/plugins/repository-hdfs/licenses/hadoop-client-runtime-3.3.3.jar.sha1 new file mode 100644 index 0000000000000..f980eebc7a46c --- /dev/null +++ b/plugins/repository-hdfs/licenses/hadoop-client-runtime-3.3.3.jar.sha1 @@ -0,0 +1 @@ +52619ecfb0225d7ae67b15264521064824ac57ca \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/hadoop-hdfs-3.3.1.jar.sha1 b/plugins/repository-hdfs/licenses/hadoop-hdfs-3.3.1.jar.sha1 deleted file mode 100644 index 66c98cf7ec291..0000000000000 --- a/plugins/repository-hdfs/licenses/hadoop-hdfs-3.3.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5da7f270cb6564e099e0d2d424285a24fca62bd2 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/hadoop-hdfs-3.3.3.jar.sha1 b/plugins/repository-hdfs/licenses/hadoop-hdfs-3.3.3.jar.sha1 new file mode 100644 index 0000000000000..463b7415e4c4b --- /dev/null +++ b/plugins/repository-hdfs/licenses/hadoop-hdfs-3.3.3.jar.sha1 @@ -0,0 +1 @@ +d4d199760c11d47f90e12fe3882e2b24c77e4eb5 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/aws-java-sdk-core-1.12.247.jar.sha1 b/plugins/repository-s3/licenses/aws-java-sdk-core-1.12.247.jar.sha1 deleted file mode 100644 index 5b3f4a3511769..0000000000000 --- a/plugins/repository-s3/licenses/aws-java-sdk-core-1.12.247.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -70f59d940c965a899f69743ec36a8eb099f539ef \ No newline at end of file diff --git a/plugins/repository-s3/licenses/aws-java-sdk-core-1.12.270.jar.sha1 b/plugins/repository-s3/licenses/aws-java-sdk-core-1.12.270.jar.sha1 new file mode 100644 index 0000000000000..ce40f68b3e229 --- /dev/null +++ b/plugins/repository-s3/licenses/aws-java-sdk-core-1.12.270.jar.sha1 @@ -0,0 +1 @@ +8f0cc2cc1b41c51e2117f5b1ce6530febf99d4ba \ No newline at end of file diff --git a/plugins/repository-s3/licenses/aws-java-sdk-s3-1.12.247.jar.sha1 b/plugins/repository-s3/licenses/aws-java-sdk-s3-1.12.247.jar.sha1 deleted file mode 100644 index 2d32399f871b4..0000000000000 --- a/plugins/repository-s3/licenses/aws-java-sdk-s3-1.12.247.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -648c59d979e2792b4aa8f444a4748abd62a65783 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/aws-java-sdk-s3-1.12.270.jar.sha1 b/plugins/repository-s3/licenses/aws-java-sdk-s3-1.12.270.jar.sha1 new file mode 100644 index 0000000000000..73b9b4cd8d410 --- /dev/null +++ b/plugins/repository-s3/licenses/aws-java-sdk-s3-1.12.270.jar.sha1 @@ -0,0 +1 @@ +2901cdd72a7f0d940b2bd4e1bcdb606d5d33736f \ No newline at end of file diff --git a/plugins/repository-s3/licenses/aws-java-sdk-sts-1.12.247.jar.sha1 b/plugins/repository-s3/licenses/aws-java-sdk-sts-1.12.247.jar.sha1 deleted file mode 100644 index 31ce4a4e6a5cb..0000000000000 --- a/plugins/repository-s3/licenses/aws-java-sdk-sts-1.12.247.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -3e77a7409ccf7ef3c3d342897dd75590147d2ffe \ No newline at end of file diff --git a/plugins/repository-s3/licenses/aws-java-sdk-sts-1.12.270.jar.sha1 b/plugins/repository-s3/licenses/aws-java-sdk-sts-1.12.270.jar.sha1 new file mode 100644 index 0000000000000..3b77226cdd5d2 --- /dev/null +++ b/plugins/repository-s3/licenses/aws-java-sdk-sts-1.12.270.jar.sha1 @@ -0,0 +1 @@ +aeffa3e8d9471377adf1108e21dab92f4f13edb3 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/jmespath-java-1.12.247.jar.sha1 b/plugins/repository-s3/licenses/jmespath-java-1.12.247.jar.sha1 deleted file mode 100644 index fd71f57f9a5fc..0000000000000 --- a/plugins/repository-s3/licenses/jmespath-java-1.12.247.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -a1f7acde495f815af705490f2a37b3758299a8e4 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/jmespath-java-1.12.270.jar.sha1 b/plugins/repository-s3/licenses/jmespath-java-1.12.270.jar.sha1 new file mode 100644 index 0000000000000..a50e3040575c3 --- /dev/null +++ b/plugins/repository-s3/licenses/jmespath-java-1.12.270.jar.sha1 @@ -0,0 +1 @@ +5bd3e1976e3b3b94c30e4868af9a5bfc4221e24a \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java new file mode 100644 index 0000000000000..a1cc0148dcdac --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -0,0 +1,306 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.apache.lucene.index.SegmentInfos; +import org.junit.BeforeClass; +import org.opensearch.action.admin.indices.segments.IndexShardSegments; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; +import org.opensearch.index.engine.Segment; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.BackgroundIndexer; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationIT extends OpenSearchIntegTestCase { + + private static final String INDEX_NAME = "test-idx-1"; + private static final int SHARD_COUNT = 1; + private static final int REPLICA_COUNT = 1; + + @BeforeClass + public static void assumeFeatureFlag() { + assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + } + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + flushAndRefresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + public void testReplicationAfterForceMerge() throws Exception { + final String nodeA = internalCluster().startNode(); + final String nodeB = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + + final int initialDocCount = scaledRandomIntBetween(0, 200); + final int additionalDocCount = scaledRandomIntBetween(0, 200); + final int expectedHitCount = initialDocCount + additionalDocCount; + try ( + BackgroundIndexer indexer = new BackgroundIndexer( + INDEX_NAME, + "_doc", + client(), + -1, + RandomizedTest.scaledRandomIntBetween(2, 5), + false, + random() + ) + ) { + indexer.start(initialDocCount); + waitForDocs(initialDocCount, indexer); + + flush(INDEX_NAME); + waitForReplicaUpdate(); + // wait a short amount of time to give replication a chance to complete. + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); + + // Index a second set of docs so we can merge into one segment. + indexer.start(additionalDocCount); + waitForDocs(expectedHitCount, indexer); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + waitForReplicaUpdate(); + assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); + + ensureGreen(INDEX_NAME); + assertSegmentStats(REPLICA_COUNT); + } + } + + public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); + ensureGreen(INDEX_NAME); + + // Index a doc to create the first set of segments. _s1.si + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + // Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si) + flushAndRefresh(INDEX_NAME); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1); + + // Index to create another segment + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").get(); + + // Force a merge here so that the in memory SegmentInfos does not reference old segments on disk. + client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get(); + refresh(INDEX_NAME); + + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + + client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); + + waitForReplicaUpdate(); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); + + final Index index = resolveIndex(INDEX_NAME); + IndexShard primaryShard = getIndexShard(index, primaryNode); + IndexShard replicaShard = getIndexShard(index, replicaNode); + assertEquals( + primaryShard.translogStats().estimatedNumberOfOperations(), + replicaShard.translogStats().estimatedNumberOfOperations() + ); + assertSegmentStats(REPLICA_COUNT); + } + + private void assertSegmentStats(int numberOfReplicas) throws IOException { + final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet(); + + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + + // There will be an entry in the list for each index. + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + + // Separate Primary & replica shards ShardSegments. + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1); + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + + assertEquals( + "There should be a ShardSegment entry for each replica in the replicationGroup", + numberOfReplicas, + replicaShardSegments.size() + ); + + for (ShardSegments shardSegment : replicaShardSegments) { + final Map latestReplicaSegments = getLatestSegments(shardSegment); + for (Segment replicaSegment : latestReplicaSegments.values()) { + final Segment primarySegment = latestPrimarySegments.get(replicaSegment.getName()); + assertEquals(replicaSegment.getGeneration(), primarySegment.getGeneration()); + assertEquals(replicaSegment.getNumDocs(), primarySegment.getNumDocs()); + assertEquals(replicaSegment.getDeletedDocs(), primarySegment.getDeletedDocs()); + assertEquals(replicaSegment.getSize(), primarySegment.getSize()); + } + + // Fetch the IndexShard for this replica and try and build its SegmentInfos from the previous commit point. + // This ensures the previous commit point is not wiped. + final ShardRouting replicaShardRouting = shardSegment.getShardRouting(); + ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState(); + final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId()); + final Index index = resolveIndex(INDEX_NAME); + IndexShard indexShard = getIndexShard(index, replicaNode.getName()); + final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory()); + // calls to readCommit will fail if a valid commit point and all its segments are not in the store. + SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName); + } + } + } + + /** + * Waits until the replica is caught up to the latest primary segments gen. + * @throws Exception if assertion fails + */ + private void waitForReplicaUpdate() throws Exception { + // wait until the replica has the latest segment generation. + assertBusy(() -> { + final IndicesSegmentResponse indicesSegmentResponse = client().admin() + .indices() + .segments(new IndicesSegmentsRequest()) + .actionGet(); + List segmentsByIndex = getShardSegments(indicesSegmentResponse); + for (ShardSegments[] replicationGroupSegments : segmentsByIndex) { + final Map> segmentListMap = segmentsByShardType(replicationGroupSegments); + final List primaryShardSegmentsList = segmentListMap.get(true); + final List replicaShardSegments = segmentListMap.get(false); + + final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get(); + final Map latestPrimarySegments = getLatestSegments(primaryShardSegments); + final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get(); + for (ShardSegments shardSegments : replicaShardSegments) { + final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments() + .stream() + .anyMatch(segment -> segment.getGeneration() == latestPrimaryGen); + assertTrue(isReplicaCaughtUpToPrimary); + } + } + }); + } + + private IndexShard getIndexShard(Index index, String node) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexServiceSafe(index); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return indexService.getShard(shardId.get()); + } + + private List getShardSegments(IndicesSegmentResponse indicesSegmentResponse) { + return indicesSegmentResponse.getIndices() + .values() + .stream() // get list of IndexSegments + .flatMap(is -> is.getShards().values().stream()) // Map to shard replication group + .map(IndexShardSegments::getShards) // get list of segments across replication group + .collect(Collectors.toList()); + } + + private Map getLatestSegments(ShardSegments segments) { + final Long latestPrimaryGen = segments.getSegments().stream().map(Segment::getGeneration).max(Long::compare).get(); + return segments.getSegments() + .stream() + .filter(s -> s.getGeneration() == latestPrimaryGen) + .collect(Collectors.toMap(Segment::getName, Function.identity())); + } + + private Map> segmentsByShardType(ShardSegments[] replicationGroupSegments) { + return Arrays.stream(replicationGroupSegments).collect(Collectors.groupingBy(s -> s.getShardRouting().primary())); + } +} diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index 2ff444c03b123..73aca2a6ca02b 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -33,7 +33,9 @@ apply plugin: 'opensearch.java' group = 'hdfs' dependencies { - api "org.apache.hadoop:hadoop-minicluster:3.3.2" + api("org.apache.hadoop:hadoop-minicluster:3.3.3") { + exclude module: 'websocket-client' + } api "org.apache.commons:commons-compress:1.21" api "commons-codec:commons-codec:${versions.commonscodec}" api "org.apache.logging.log4j:log4j-core:${versions.log4j}" @@ -44,4 +46,5 @@ dependencies { api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api 'net.minidev:json-smart:2.4.8' api "org.mockito:mockito-core:${versions.mockito}" + api "com.google.protobuf:protobuf-java:3.21.2" }