diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java index 83d34571a159..00dfd5c65b12 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java @@ -18,17 +18,19 @@ import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.disruption.IntermittentLongGCDisruption; -import org.elasticsearch.test.disruption.SingleNodeDisruption; import org.elasticsearch.xcontent.XContentType; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.equalTo; @@ -43,7 +45,7 @@ protected Collection> nodePlugins() { } public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedException { - String masterNode = internalCluster().startMasterOnlyNode(); + internalCluster().startMasterOnlyNode(); internalCluster().startDataOnlyNodes(3); ensureStableCluster(4); @@ -51,7 +53,7 @@ public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedE createDataStream(dataStreamName); // Mark it to lazy rollover - new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute().get(); + safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute()); // Verify that the data stream is marked for rollover and that it has currently one index DataStream dataStream = getDataStream(dataStreamName); @@ -59,9 +61,22 @@ public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedE assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(1)); // Introduce a disruption to the master node that should delay the rollover execution - SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(random(), masterNode, 100, 200, 30000, 60000); - internalCluster().setDisruptionScheme(masterNodeDisruption); - masterNodeDisruption.startDisrupting(); + final var barrier = new CyclicBarrier(2); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .submitUnbatchedStateUpdateTask("block", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + safeAwait(barrier); + safeAwait(barrier); + return currentState; + } + + @Override + public void onFailure(Exception e) { + fail(e); + } + }); + safeAwait(barrier); // Start indexing operations int docs = randomIntBetween(5, 10); @@ -84,10 +99,10 @@ public void onFailure(Exception e) { } // End the disruption so that all pending tasks will complete - masterNodeDisruption.stopDisrupting(); + safeAwait(barrier); // Wait for all the indexing requests to be processed successfully - countDownLatch.await(); + safeAwait(countDownLatch); // Verify that the rollover has happened once dataStream = getDataStream(dataStreamName); @@ -96,10 +111,12 @@ public void onFailure(Exception e) { } private DataStream getDataStream(String dataStreamName) { - return client().execute( - GetDataStreamAction.INSTANCE, - new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName }) - ).actionGet().getDataStreams().get(0).getDataStream(); + return safeGet( + client().execute( + GetDataStreamAction.INSTANCE, + new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName }) + ) + ).getDataStreams().get(0).getDataStream(); } private void createDataStream(String dataStreamName) throws InterruptedException, ExecutionException { @@ -111,10 +128,9 @@ private void createDataStream(String dataStreamName) throws InterruptedException .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false)) .build() ); - final AcknowledgedResponse putComposableTemplateResponse = client().execute( - TransportPutComposableIndexTemplateAction.TYPE, - putComposableTemplateRequest - ).actionGet(); + final AcknowledgedResponse putComposableTemplateResponse = safeGet( + client().execute(TransportPutComposableIndexTemplateAction.TYPE, putComposableTemplateRequest) + ); assertThat(putComposableTemplateResponse.isAcknowledged(), is(true)); final CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request( @@ -122,8 +138,9 @@ private void createDataStream(String dataStreamName) throws InterruptedException TEST_REQUEST_TIMEOUT, dataStreamName ); - final AcknowledgedResponse createDataStreamResponse = client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest) - .get(); + final AcknowledgedResponse createDataStreamResponse = safeGet( + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest) + ); assertThat(createDataStreamResponse.isAcknowledged(), is(true)); } }