Skip to content

Commit

Permalink
Introduce assertNoSuccessListener utility (elastic#108547)
Browse files Browse the repository at this point in the history
Similar to `assertNoFailureListener`, sometimes we need a listener which
asserts it is not completed successfully.
  • Loading branch information
DaveCTurner authored May 13, 2024
1 parent 605b618 commit c22ec19
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.action.support.ActionTestUtils.assertNoSuccessListener;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;

Expand Down Expand Up @@ -295,17 +296,9 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
assertSame(node, joiningNode);
assertEquals(JoinValidationService.JOIN_VALIDATE_ACTION_NAME, action);

final var listener = new ActionListener<TransportResponse>() {
@Override
public void onResponse(TransportResponse transportResponse) {
fail("should not succeed");
}

@Override
public void onFailure(Exception e) {
handleError(requestId, new RemoteTransportException(node.getName(), node.getAddress(), action, e));
}
};
final ActionListener<TransportResponse> listener = assertNoSuccessListener(
e -> handleError(requestId, new RemoteTransportException(node.getName(), node.getAddress(), action, e))
);

try (var ignored = NamedWriteableRegistryTests.ignoringUnknownNamedWriteables(); var out = new BytesStreamOutput()) {
request.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.ActionTestUtils.assertNoSuccessListener;
import static org.elasticsearch.cluster.service.MasterService.MAX_TASK_DESCRIPTION_CHARS;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -1041,30 +1042,22 @@ public void onFailure(Exception e) {
threadContext.putHeader(testContextHeaderName, testContextHeaderValue);
final var expectFailure = randomBoolean();
final var taskComplete = new AtomicBoolean();
final var task = new Task(expectFailure, testResponseHeaderValue, new ActionListener<>() {
@Override
public void onResponse(ClusterState clusterState) {
throw new AssertionError("should not succeed");
final var task = new Task(expectFailure, testResponseHeaderValue, assertNoSuccessListener(e -> {
assertEquals(testContextHeaderValue, threadContext.getHeader(testContextHeaderName));
assertEquals(List.of(testResponseHeaderValue), threadContext.getResponseHeaders().get(testResponseHeaderName));
assertThat(e, instanceOf(FailedToCommitClusterStateException.class));
assertThat(e.getMessage(), equalTo(publicationFailedExceptionMessage));
if (expectFailure) {
assertThat(e.getSuppressed().length, greaterThan(0));
var suppressed = e.getSuppressed()[0];
assertThat(suppressed, instanceOf(ElasticsearchException.class));
assertThat(suppressed.getMessage(), equalTo(taskFailedExceptionMessage));
}

@Override
public void onFailure(Exception e) {
assertEquals(testContextHeaderValue, threadContext.getHeader(testContextHeaderName));
assertEquals(List.of(testResponseHeaderValue), threadContext.getResponseHeaders().get(testResponseHeaderName));
assertThat(e, instanceOf(FailedToCommitClusterStateException.class));
assertThat(e.getMessage(), equalTo(publicationFailedExceptionMessage));
if (expectFailure) {
assertThat(e.getSuppressed().length, greaterThan(0));
var suppressed = e.getSuppressed()[0];
assertThat(suppressed, instanceOf(ElasticsearchException.class));
assertThat(suppressed.getMessage(), equalTo(taskFailedExceptionMessage));
}
assertNotNull(publishedState.get());
assertNotSame(stateBeforeFailure, publishedState.get());
assertTrue(taskComplete.compareAndSet(false, true));
publishFailureCountdown.countDown();
}
});
assertNotNull(publishedState.get());
assertNotSame(stateBeforeFailure, publishedState.get());
assertTrue(taskComplete.compareAndSet(false, true));
publishFailureCountdown.countDown();
}));

queue.submitTask("test", task, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.elasticsearch.test.ESTestCase.fail;

public class ActionTestUtils {

Expand Down Expand Up @@ -77,6 +80,27 @@ public static <T> ActionListener<T> assertNoFailureListener(CheckedConsumer<T, E
return ActionListener.wrap(consumer, ESTestCase::fail);
}

public static <T> ActionListener<T> assertNoSuccessListener(Consumer<Exception> consumer) {
return new ActionListener<>() {
@Override
public void onResponse(T result) {
fail(null, "unexpected success with result [%s] while expecting to handle failure with [%s]", result, consumer);
}

@Override
public void onFailure(Exception e) {
try {
consumer.accept(e);
} catch (Exception e2) {
if (e2 != e) {
e2.addSuppressed(e);
}
fail(e2, "unexpected failure in onFailure handler for [%s]", consumer);
}
}
};
}

public static ResponseListener wrapAsRestResponseListener(ActionListener<Response> listener) {
return new ResponseListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import static org.elasticsearch.ExceptionsHelper.unwrapCause;
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
import static org.elasticsearch.action.support.ActionTestUtils.assertNoSuccessListener;
import static org.elasticsearch.test.ESIntegTestCase.internalCluster;
import static org.elasticsearch.test.ESTestCase.asInstanceOf;
import static org.elasticsearch.test.ESTestCase.randomInt;
Expand All @@ -37,7 +38,6 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Utility plugin that captures the invocation of an action on a node after the task has been registered with the {@link TaskManager},
Expand Down Expand Up @@ -128,19 +128,11 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
if (capturingListener != null) {
final var cancellableTask = asInstanceOf(CancellableTask.class, task);
capturingListener.addListener(assertNoFailureListener(captured -> {
cancellableTask.addListener(() -> chain.proceed(task, action, request, new ActionListener<>() {
@Override
public void onResponse(Response response) {
fail("cancelled action should not succeed, but got " + response);
}

@Override
public void onFailure(Exception e) {
assertThat(unwrapCause(e), instanceOf(TaskCancelledException.class));
listener.onFailure(e);
captured.countDownLatch().countDown();
}
}));
cancellableTask.addListener(() -> chain.proceed(task, action, request, assertNoSuccessListener(e -> {
assertThat(unwrapCause(e), instanceOf(TaskCancelledException.class));
listener.onFailure(e);
captured.countDownLatch().countDown();
})));
assertFalse(cancellableTask.isCancelled());
captured.doCancel().run();
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.action.support.ActionTestUtils.assertNoSuccessListener;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
import static org.elasticsearch.xpack.ml.DefaultMachineLearningExtension.ANALYTICS_DEST_INDEX_ALLOWED_SETTINGS;
import static org.hamcrest.Matchers.arrayContaining;
Expand Down Expand Up @@ -334,10 +335,7 @@ private Map<String, Object> testCreateDestinationIndex(DataFrameAnalysis analysi
clock,
config,
ANALYTICS_DEST_INDEX_ALLOWED_SETTINGS,
ActionListener.wrap(
response -> fail("should not succeed"),
e -> assertThat(e.getMessage(), Matchers.matchesRegex(finalErrorMessage))
)
assertNoSuccessListener(e -> assertThat(e.getMessage(), Matchers.matchesRegex(finalErrorMessage)))
);

return null;
Expand Down Expand Up @@ -578,8 +576,7 @@ public void testCreateDestinationIndex_ResultsFieldsExistsInSourceIndex() {
clock,
config,
ANALYTICS_DEST_INDEX_ALLOWED_SETTINGS,
ActionListener.wrap(
response -> fail("should not succeed"),
assertNoSuccessListener(
e -> assertThat(
e.getMessage(),
equalTo("A field that matches the dest.results_field [ml] already exists; please set a different results_field")
Expand Down

0 comments on commit c22ec19

Please sign in to comment.