Skip to content

Commit

Permalink
Replace IntermittentLongGCDisruption with blocking cluster state upda…
Browse files Browse the repository at this point in the history
…tes (#115075) (#115580)

In JDK 23 `Thread.resume` has been removed this means that we cannot use
`IntermittentLongGCDisruption` that depends on it.

We simulate the master node disruption with a `CyclicBarrier` that
blocks cluster state updates.

Closes: #115045

The backport will close:
#112634

(cherry picked from commit 5dec36e)

# Conflicts:
#	modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/LazyRolloverDuringDisruptionIT.java
  • Loading branch information
gmarouli authored Oct 24, 2024
1 parent 0a3b617 commit 25ebdf4
Showing 1 changed file with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.xcontent.XContentType;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.equalTo;
Expand All @@ -42,25 +44,38 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedException {
String masterNode = internalCluster().startMasterOnlyNode();
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(3);
ensureStableCluster(4);

String dataStreamName = "my-data-stream";
createDataStream(dataStreamName);

// Mark it to lazy rollover
new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute().get();
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute());

// Verify that the data stream is marked for rollover and that it has currently one index
DataStream dataStream = getDataStream(dataStreamName);
assertThat(dataStream.rolloverOnWrite(), equalTo(true));
assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(1));

// Introduce a disruption to the master node that should delay the rollover execution
SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(random(), masterNode, 100, 200, 30000, 60000);
internalCluster().setDisruptionScheme(masterNodeDisruption);
masterNodeDisruption.startDisrupting();
final var barrier = new CyclicBarrier(2);
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.submitUnbatchedStateUpdateTask("block", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
safeAwait(barrier);
safeAwait(barrier);
return currentState;
}

@Override
public void onFailure(Exception e) {
fail(e);
}
});
safeAwait(barrier);

// Start indexing operations
int docs = randomIntBetween(5, 10);
Expand All @@ -83,10 +98,10 @@ public void onFailure(Exception e) {
}

// End the disruption so that all pending tasks will complete
masterNodeDisruption.stopDisrupting();
safeAwait(barrier);

// Wait for all the indexing requests to be processed successfully
countDownLatch.await();
safeAwait(countDownLatch);

// Verify that the rollover has happened once
dataStream = getDataStream(dataStreamName);
Expand All @@ -95,8 +110,7 @@ public void onFailure(Exception e) {
}

private DataStream getDataStream(String dataStreamName) {
return client().execute(GetDataStreamAction.INSTANCE, new GetDataStreamAction.Request(new String[] { dataStreamName }))
.actionGet()
return safeGet(client().execute(GetDataStreamAction.INSTANCE, new GetDataStreamAction.Request(new String[] { dataStreamName })))
.getDataStreams()
.get(0)
.getDataStream();
Expand All @@ -111,15 +125,15 @@ private void createDataStream(String dataStreamName) throws InterruptedException
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
.build()
);
final AcknowledgedResponse putComposableTemplateResponse = client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
putComposableTemplateRequest
).actionGet();
final AcknowledgedResponse putComposableTemplateResponse = safeGet(
client().execute(TransportPutComposableIndexTemplateAction.TYPE, putComposableTemplateRequest)
);
assertThat(putComposableTemplateResponse.isAcknowledged(), is(true));

final CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
final AcknowledgedResponse createDataStreamResponse = client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)
.get();
final AcknowledgedResponse createDataStreamResponse = safeGet(
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)
);
assertThat(createDataStreamResponse.isAcknowledged(), is(true));
}
}

0 comments on commit 25ebdf4

Please sign in to comment.