diff --git a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java index ee5150c97fb4f..ad6c396df69a1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/SegmentReplicationPressureIT.java @@ -17,6 +17,7 @@ import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.replication.SegmentReplicationBaseIT; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.rest.RestStatus; import org.opensearch.test.OpenSearchIntegTestCase; @@ -52,12 +53,24 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + @Override + public Settings indexSettings() { + // we want to control refreshes + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT) + .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.refresh_interval", -1) + .build(); + } + @Override protected Collection> nodePlugins() { return asList(MockTransportService.TestPlugin.class); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671") public void testWritesRejected() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -76,6 +89,10 @@ public void testWritesRejected() throws Exception { indexingThread.start(); indexingThread.join(); latch.await(); + + indexDoc(); + totalDocs.incrementAndGet(); + refresh(INDEX_NAME); // index again while we are stale. assertBusy(() -> { expectThrows(OpenSearchRejectedExecutionException.class, () -> { @@ -90,6 +107,7 @@ public void testWritesRejected() throws Exception { // index another doc showing there is no pressure enforced. indexDoc(); + refresh(INDEX_NAME); waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); verifyStoreContent(); } @@ -98,7 +116,6 @@ public void testWritesRejected() throws Exception { * This test ensures that a replica can be added while the index is under write block. * Ensuring that only write requests are blocked. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671") public void testAddReplicaWhileWritesBlocked() throws Exception { final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -118,6 +135,9 @@ public void testAddReplicaWhileWritesBlocked() throws Exception { indexingThread.start(); indexingThread.join(); latch.await(); + indexDoc(); + totalDocs.incrementAndGet(); + refresh(INDEX_NAME); // index again while we are stale. assertBusy(() -> { expectThrows(OpenSearchRejectedExecutionException.class, () -> { @@ -142,6 +162,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception { // index another doc showing there is no pressure enforced. indexDoc(); + refresh(INDEX_NAME); waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {})); verifyStoreContent(); } @@ -258,7 +279,7 @@ private void assertFailedRequests(BulkResponse response) { } private void indexDoc() { - client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").get(); + client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").execute().actionGet(); } private void assertEqualSegmentInfosVersion(List replicaNames, IndexShard primaryShard) {