Skip to content

Commit

Permalink
Replace IntermittentLongGCDisruption with blocking cluster state upda…
Browse files Browse the repository at this point in the history
…tes (#115075)

In JDK 23 `Thread.resume` has been removed this means that we cannot use
`IntermittentLongGCDisruption` that depends on it. 

We simulate the master node disruption with a `CyclicBarrier` that
blocks cluster state updates.

Closes: #115045

The backport will close:
#112634
  • Loading branch information
gmarouli authored Oct 18, 2024
1 parent 6f60880 commit 5dec36e
Showing 1 changed file with 36 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
import org.elasticsearch.test.disruption.SingleNodeDisruption;
import org.elasticsearch.xcontent.XContentType;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.equalTo;
Expand All @@ -43,25 +45,38 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

public void testRolloverIsExecutedOnce() throws ExecutionException, InterruptedException {
String masterNode = internalCluster().startMasterOnlyNode();
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(3);
ensureStableCluster(4);

String dataStreamName = "my-data-stream";
createDataStream(dataStreamName);

// Mark it to lazy rollover
new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute().get();
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(true).execute());

// Verify that the data stream is marked for rollover and that it has currently one index
DataStream dataStream = getDataStream(dataStreamName);
assertThat(dataStream.rolloverOnWrite(), equalTo(true));
assertThat(dataStream.getBackingIndices().getIndices().size(), equalTo(1));

// Introduce a disruption to the master node that should delay the rollover execution
SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption(random(), masterNode, 100, 200, 30000, 60000);
internalCluster().setDisruptionScheme(masterNodeDisruption);
masterNodeDisruption.startDisrupting();
final var barrier = new CyclicBarrier(2);
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.submitUnbatchedStateUpdateTask("block", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
safeAwait(barrier);
safeAwait(barrier);
return currentState;
}

@Override
public void onFailure(Exception e) {
fail(e);
}
});
safeAwait(barrier);

// Start indexing operations
int docs = randomIntBetween(5, 10);
Expand All @@ -84,10 +99,10 @@ public void onFailure(Exception e) {
}

// End the disruption so that all pending tasks will complete
masterNodeDisruption.stopDisrupting();
safeAwait(barrier);

// Wait for all the indexing requests to be processed successfully
countDownLatch.await();
safeAwait(countDownLatch);

// Verify that the rollover has happened once
dataStream = getDataStream(dataStreamName);
Expand All @@ -96,10 +111,12 @@ public void onFailure(Exception e) {
}

private DataStream getDataStream(String dataStreamName) {
return client().execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName })
).actionGet().getDataStreams().get(0).getDataStream();
return safeGet(
client().execute(
GetDataStreamAction.INSTANCE,
new GetDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName })
)
).getDataStreams().get(0).getDataStream();
}

private void createDataStream(String dataStreamName) throws InterruptedException, ExecutionException {
Expand All @@ -111,19 +128,19 @@ private void createDataStream(String dataStreamName) throws InterruptedException
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
.build()
);
final AcknowledgedResponse putComposableTemplateResponse = client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
putComposableTemplateRequest
).actionGet();
final AcknowledgedResponse putComposableTemplateResponse = safeGet(
client().execute(TransportPutComposableIndexTemplateAction.TYPE, putComposableTemplateRequest)
);
assertThat(putComposableTemplateResponse.isAcknowledged(), is(true));

final CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(
TEST_REQUEST_TIMEOUT,
TEST_REQUEST_TIMEOUT,
dataStreamName
);
final AcknowledgedResponse createDataStreamResponse = client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)
.get();
final AcknowledgedResponse createDataStreamResponse = safeGet(
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)
);
assertThat(createDataStreamResponse.isAcknowledged(), is(true));
}
}

0 comments on commit 5dec36e

Please sign in to comment.