From 25ebdf46103c50b82b809981bc27b49e93e2304a Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Thu, 24 Oct 2024 20:53:07 +0300 Subject: [PATCH] Replace IntermittentLongGCDisruption with blocking cluster state updates (#115075) (#115580) In JDK 23 `Thread.resume` has been removed this means that we cannot use `IntermittentLongGCDisruption` that depends on it. We simulate the master node disruption with a `CyclicBarrier` that blocks cluster state updates. Closes: https://github.com/elastic/elasticsearch/issues/115045 The backport will close: https://github.com/elastic/elasticsearch/issues/112634 (cherry picked from commit 5dec36e9fba460654faa447b09b1ddd6b6589920) # Conflicts: # modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java --- .../LazyRolloverDuringDisruptionIT.java | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java index 89d576e74be2..a6354c49a5d0 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java @@ -17,17 +17,19 @@ import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.disruption.IntermittentLongGCDisruption; -import org.elasticsearch.test.disruption.SingleNodeDisruption; import org.elasticsearch.xcontent.XContentType; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import static org.hamcrest.Matchers.equalTo; @@ -42,7 +44,7 @@ protected Collection> nodePlugins() { } public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedException { - String masterNode = internalCluster().startMasterOnlyNode(); + internalCluster().startMasterOnlyNode(); internalCluster().startDataOnlyNodes(3); ensureStableCluster(4); @@ -50,7 +52,7 @@ public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedE createDataStream(dataStreamName); // Mark it to lazy rollover - new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute().get(); + safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute()); // Verify that the data stream is marked for rollover and that it has currently one index DataStream dataStream = getDataStream(dataStreamName); @@ -58,9 +60,22 @@ public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedE assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(1)); // Introduce a disruption to the master node that should delay the rollover execution - SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(random(), masterNode, 100, 200, 30000, 60000); - internalCluster().setDisruptionScheme(masterNodeDisruption); - masterNodeDisruption.startDisrupting(); + final var barrier = new CyclicBarrier(2); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .submitUnbatchedStateUpdateTask("block", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + safeAwait(barrier); + safeAwait(barrier); + return currentState; + } + + @Override + public void onFailure(Exception e) { + fail(e); + } + }); + safeAwait(barrier); // Start indexing operations int docs = randomIntBetween(5, 10); @@ -83,10 +98,10 @@ public void onFailure(Exception e) { } // End the disruption so that all pending tasks will complete - masterNodeDisruption.stopDisrupting(); + safeAwait(barrier); // Wait for all the indexing requests to be processed successfully - countDownLatch.await(); + safeAwait(countDownLatch); // Verify that the rollover has happened once dataStream = getDataStream(dataStreamName); @@ -95,8 +110,7 @@ public void onFailure(Exception e) { } private DataStream getDataStream(String dataStreamName) { - return client().execute(GetDataStreamAction.INSTANCE, new GetDataStreamAction.Request(new String[] { dataStreamName })) - .actionGet() + return safeGet(client().execute(GetDataStreamAction.INSTANCE, new GetDataStreamAction.Request(new String[] { dataStreamName }))) .getDataStreams() .get(0) .getDataStream(); @@ -111,15 +125,15 @@ private void createDataStream(String dataStreamName) throws InterruptedException .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false)) .build() ); - final AcknowledgedResponse putComposableTemplateResponse = client().execute( - TransportPutComposableIndexTemplateAction.TYPE, - putComposableTemplateRequest - ).actionGet(); + final AcknowledgedResponse putComposableTemplateResponse = safeGet( + client().execute(TransportPutComposableIndexTemplateAction.TYPE, putComposableTemplateRequest) + ); assertThat(putComposableTemplateResponse.isAcknowledged(), is(true)); final CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName); - final AcknowledgedResponse createDataStreamResponse = client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest) - .get(); + final AcknowledgedResponse createDataStreamResponse = safeGet( + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest) + ); assertThat(createDataStreamResponse.isAcknowledged(), is(true)); } }