Skip to content

Commit

Permalink
GH-395 - Avoid event publication completion for failed CompletableFut…
Browse files Browse the repository at this point in the history
…ures.

We now explicitly handle CompletableFuture instances returned from transactional event listeners by registering the completion handlers only on success, the debug logging on failure and immediately return the decorated instance.

Previously, a failed CompletableFuture instance would still have the publication marked completed as it doesn't cause any exception being thrown and thus ultimately ending up in the code path that issues the completion.
  • Loading branch information
odrotbohm committed Nov 28, 2023
1 parent 5bb16e4 commit da43c72
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.modulith.events.support;

import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.aopalliance.aop.Advice;
Expand All @@ -31,6 +32,7 @@
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.modulith.events.core.EventPublicationRegistry;
import org.springframework.modulith.events.core.PublicationTargetIdentifier;
import org.springframework.transaction.event.TransactionPhase;
Expand Down Expand Up @@ -164,25 +166,30 @@ public Object invoke(MethodInvocation invocation) throws Throwable {

Object result = null;
var method = invocation.getMethod();
var argument = invocation.getArguments()[0];

try {

result = invocation.proceed();
} catch (Exception o_O) {

if (LOG.isDebugEnabled()) {
LOG.debug("Invocation of listener {} failed. Leaving event publication uncompleted.", method, o_O);
} else {
LOG.info("Invocation of listener {} failed with message {}. Leaving event publication uncompleted.",
method, o_O.getMessage());
if (result instanceof CompletableFuture<?> future) {

return future
.thenAccept(it -> markCompleted(method, argument))
.exceptionallyCompose(it -> {
handleFailure(method, it);
return CompletableFuture.failedFuture(it);
});
}

} catch (Throwable o_O) {

handleFailure(method, o_O);

throw o_O;
}

// Mark publication complete if the method is a transactional event listener.
String adapterId = ADAPTERS.get(method).getListenerId();
PublicationTargetIdentifier identifier = PublicationTargetIdentifier.of(adapterId);
registry.get().markCompleted(invocation.getArguments()[0], identifier);
markCompleted(method, argument);

return result;
}
Expand All @@ -196,6 +203,27 @@ public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 10;
}

@Nullable
private static Void handleFailure(Method method, Throwable o_O) {

if (LOG.isDebugEnabled()) {
LOG.debug("Invocation of listener {} failed. Leaving event publication uncompleted.", method, o_O);
} else {
LOG.info("Invocation of listener {} failed with message {}. Leaving event publication uncompleted.",
method, o_O.getMessage());
}

return null;
}

private void markCompleted(Method method, Object event) {

// Mark publication complete if the method is a transactional event listener.
String adapterId = ADAPTERS.get(method).getListenerId();
PublicationTargetIdentifier identifier = PublicationTargetIdentifier.of(adapterId);
registry.get().markCompleted(event, identifier);
}

private static TransactionalApplicationListenerMethodAdapter createAdapter(Method method) {
return new TransactionalApplicationListenerMethodAdapter(null, method.getDeclaringClass(), method);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

import org.junit.jupiter.api.Test;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.context.event.EventListener;
import org.springframework.modulith.events.core.EventPublicationRegistry;
import org.springframework.scheduling.annotation.Async;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

Expand Down Expand Up @@ -59,6 +61,34 @@ void doesNotTriggerCompletionForNonEventListener() {
assertNonCompletion(SomeEventListener::nonEventListener);
}

@Test // GH-395
void doesNotTriggerCompletionOnFailedCompletableFuture() throws Throwable {

var result = createProxyFor(bean).asyncWithResult(true);

assertThat(result.isDone()).isFalse();
verify(registry, never()).markCompleted(any(), any());

Thread.sleep(500);

assertThat(result.isCompletedExceptionally()).isTrue();
verify(registry, never()).markCompleted(any(), any());
}

@Test // GH-395
void marksLazilyComputedCompletableFutureAsCompleted() throws Throwable {

var result = createProxyFor(bean).asyncWithResult(false);

assertThat(result.isDone()).isFalse();
verify(registry, never()).markCompleted(any(), any());

Thread.sleep(500);

assertThat(result.isCompletedExceptionally()).isFalse();
verify(registry).markCompleted(any(), any());
}

private void assertCompletion(BiConsumer<SomeEventListener, Object> consumer) {
assertCompletion(consumer, true);
}
Expand All @@ -78,11 +108,12 @@ private void assertCompletion(BiConsumer<SomeEventListener, Object> consumer, bo
verify(registry, times(expected ? 1 : 0)).markCompleted(any(), any());
}

private Object createProxyFor(Object bean) {
@SuppressWarnings("unchecked")
private <T> T createProxyFor(T bean) {

ProxyFactory factory = new ProxyFactory(bean);
factory.addAdvisor(new CompletionRegisteringAdvisor(() -> registry));
return factory.getProxy();
return (T) factory.getProxy();
}

static class SomeEventListener {
Expand All @@ -97,5 +128,22 @@ void onAfterRollback(Object object) {}
void simpleEventListener(Object object) {}

void nonEventListener(Object object) {}

@Async
@TransactionalEventListener
CompletableFuture<?> asyncWithResult(boolean fail) {

return CompletableFuture.completedFuture(new Object())
.thenComposeAsync(it -> {

try {
Thread.sleep(200);
} catch (InterruptedException e) {}

return fail
? CompletableFuture.failedFuture(new IllegalArgumentException())
: CompletableFuture.completedFuture(it);
});
}
}
}

0 comments on commit da43c72

Please sign in to comment.