Skip to content

Commit

Permalink
Avoid self-suppression on grouped action listener (#53262)
Browse files Browse the repository at this point in the history
It can be that a failure is repeated to a grouped action listener. For
example, if the same exception such as a connect transport exception, is
the cause of repeated failures. Previously we were unconditionally
self-suppressing the exception into the first exception, but
self-supressing is not allowed. Thus, we would throw an exception and
the grouped action listener would never complete. This commit addresses
this by guarding against self-suppression.
  • Loading branch information
jasontedor committed Mar 8, 2020
1 parent 1fa12bd commit 4aec5ef
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ public void onResponse(T element) {
@Override
public void onFailure(Exception e) {
if (failure.compareAndSet(null, e) == false) {
failure.accumulateAndGet(e, (previous, current) -> {
previous.addSuppressed(current);
return previous;
failure.accumulateAndGet(e, (current, update) -> {
// we have to avoid self-suppression!
if (update != current) {
current.addSuppressed(update);
}
return current;
});
}
if (countDown.countDown()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class GroupedActionListenerTests extends ESTestCase {

Expand Down Expand Up @@ -139,4 +142,22 @@ public void testConcurrentFailures() throws InterruptedException {
assertThat(exception, instanceOf(IOException.class));
assertEquals(numGroups - 1, exception.getSuppressed().length);
}

/*
* It can happen that the same exception causes a grouped listener to be notified of the failure multiple times. Since we suppress
* additional exceptions into the first exception, we have to guard against suppressing into the same exception, which could occur if we
* are notified of with the same failure multiple times. This test verifies that the guard against self-suppression remains.
*/
public void testRepeatNotificationForTheSameException() {
final AtomicReference<Exception> finalException = new AtomicReference<>();
final GroupedActionListener<Void> listener =
new GroupedActionListener<Void>(ActionListener.wrap(r -> {}, finalException::set), 2, Collections.emptyList());
final Exception e = new Exception();
// repeat notification for the same exception
listener.onFailure(e);
listener.onFailure(e);
assertThat(finalException.get(), not(nullValue()));
assertThat(finalException.get(), equalTo(e));
}

}

0 comments on commit 4aec5ef

Please sign in to comment.