Skip to content

Commit

Permalink
Properly support CompletionStage as a return type in caching extension
Browse files Browse the repository at this point in the history
Fixes: #23816
  • Loading branch information
geoand committed Apr 14, 2022
1 parent 04a67c8 commit abb2a8b
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 107 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package io.quarkus.cache.test.runtime;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.cache.CacheInvalidate;
import io.quarkus.cache.CacheInvalidateAll;
import io.quarkus.cache.CacheResult;
import io.quarkus.test.QuarkusUnitTest;

/**
* Tests the caching annotations on methods returning {@link CompletableFuture}.
*/
public class CompletionStageReturnTypeTest {

private static final String CACHE_NAME_1 = "test-cache-1";
private static final String CACHE_NAME_2 = "test-cache-2";
private static final String KEY_1 = "key-1";
private static final String KEY_2 = "key-2";

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest().withApplicationRoot((jar) -> jar.addClass(CachedService.class));

@Inject
CachedService cachedService;

@Test
void testCacheResult() throws ExecutionException, InterruptedException {
// STEP 1
// Action: a method annotated with @CacheResult and returning a CompletionStage is called.
// Expected effect: the method is invoked, as CompletionStage is eager.
// Verified by: invocations counter.
CompletableFuture<String> cf1 = cachedService.cacheResult1(KEY_1);
assertEquals(1, cachedService.getCacheResultInvocations());

// STEP 2
// Action: same call as STEP 1.
// Expected effect: same as STEP 1 with a different CompletionStage instance returned.
// Verified by: invocations counter and different objects references between STEPS 1 AND 2 results.
CompletableFuture<String> cf2 = cachedService.cacheResult1(KEY_1);
assertEquals(1, cachedService.getCacheResultInvocations());
assertNotSame(cf1, cf2);

// STEP 3
// Action: the Uni returned in STEP 1 is subscribed to and we wait for an item event to be fired.
// Expected effect: the method from STEP 1 is invoked and its result is cached.
// Verified by: invocations counter and STEP 4.
String emittedItem1 = cf1.get();
assertEquals(1, cachedService.getCacheResultInvocations());

// STEP 4
// Action: the Uni returned in STEP 2 is subscribed to and we wait for an item event to be fired.
// Expected effect: the method from STEP 2 is not invoked and the value cached in STEP 3 is returned.
// Verified by: invocations counter and same object reference between STEPS 3 and 4 emitted items.
String emittedItem2 = cf2.get();
assertEquals(1, cachedService.getCacheResultInvocations());
assertSame(emittedItem1, emittedItem2);

// STEP 5
// Action: same call as STEP 2 with a different key and an immediate subscription.
// Expected effect: the method is invoked and a new item is emitted (also cached).
// Verified by: invocations counter.
String emittedItem3 = cachedService.cacheResult1("another-key").get();
assertEquals(2, cachedService.getCacheResultInvocations());
}

@Test
void testCacheInvalidate() throws ExecutionException, InterruptedException {
// First, let's put some data into the caches.
String value1 = cachedService.cacheResult1(KEY_1).get();
Object value2 = cachedService.cacheResult2(KEY_2).get();

// We will invalidate some data (only KEY_1) in all caches later.
cachedService.cacheInvalidate(KEY_1).get();
// For now, the method that will invalidate the data should not be invoked, as CompletionStage is eager.
assertEquals(1, cachedService.getCacheInvalidateInvocations());

// The data for the second key should still be cached at this point.
Object value4 = cachedService.cacheResult2(KEY_2).get();
assertSame(value2, value4);

// Let's call the methods annotated with @CacheResult again.
String value7 = cachedService.cacheResult1(KEY_1).get();

// The objects references should be different for the invalidated key.
assertNotSame(value1, value7);
}

@Test
void testCacheInvalidateAll() throws ExecutionException, InterruptedException {
// First, let's put some data into the caches.
String value1 = cachedService.cacheResult1(KEY_1).get();
Object value2 = cachedService.cacheResult2(KEY_2).get();

// We will invalidate all the data in all caches later.
cachedService.cacheInvalidateAll().get();

// For now, the method that will invalidate the data should not be invoked, as CompletionStage is eager.
assertEquals(1, cachedService.getCacheInvalidateAllInvocations());

// Let's call the methods annotated with @CacheResult again.
String value3 = cachedService.cacheResult1(KEY_1).get();
Object value4 = cachedService.cacheResult2(KEY_2).get();

// All objects references should be different.
assertNotSame(value1, value3);
assertNotSame(value2, value4);
}

@ApplicationScoped
static class CachedService {

private volatile int cacheResultInvocations;
private volatile int cacheInvalidateInvocations;
private volatile int cacheInvalidateAllInvocations;

@CacheResult(cacheName = CACHE_NAME_1)
public CompletableFuture<String> cacheResult1(String key) {
cacheResultInvocations++;
return CompletableFuture.completedFuture(new String());
}

@CacheResult(cacheName = CACHE_NAME_2)
public CompletableFuture<Object> cacheResult2(String key) {
return CompletableFuture.completedFuture(new Object());
}

@CacheInvalidate(cacheName = CACHE_NAME_1)
@CacheInvalidate(cacheName = CACHE_NAME_2)
public CompletableFuture<Void> cacheInvalidate(String key) {
cacheInvalidateInvocations++;
return CompletableFuture.completedFuture(null);
}

@CacheInvalidateAll(cacheName = CACHE_NAME_1)
@CacheInvalidateAll(cacheName = CACHE_NAME_2)
public CompletableFuture<Void> cacheInvalidateAll() {
cacheInvalidateAllInvocations++;
return CompletableFuture.completedFuture(null);
}

public int getCacheResultInvocations() {
return cacheResultInvocations;
}

public int getCacheInvalidateInvocations() {
return cacheInvalidateInvocations;
}

public int getCacheInvalidateAllInvocations() {
return cacheInvalidateAllInvocations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import javax.inject.Inject;
Expand All @@ -16,6 +17,7 @@

import io.quarkus.arc.runtime.InterceptorBindings;
import io.quarkus.cache.Cache;
import io.quarkus.cache.CacheException;
import io.quarkus.cache.CacheKey;
import io.quarkus.cache.CacheManager;
import io.quarkus.cache.CompositeCacheKey;
Expand All @@ -27,6 +29,7 @@ public abstract class CacheInterceptor {

private static final Logger LOGGER = Logger.getLogger(CacheInterceptor.class);
private static final String PERFORMANCE_WARN_MSG = "Cache key resolution based on reflection calls. Please create a GitHub issue in the Quarkus repository, the maintainers might be able to improve your application performance.";
protected static final String UNHANDLED_ASYNC_RETURN_TYPE_MSG = "Unhandled async return type";

@Inject
CacheManager cacheManager;
Expand Down Expand Up @@ -135,7 +138,44 @@ protected Object getCacheKey(Cache cache, List<Short> cacheKeyParameterPositions
}
}

protected static boolean isUniReturnType(InvocationContext invocationContext) {
return Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType());
protected static ReturnType determineReturnType(Class<?> returnType) {
if (Uni.class.isAssignableFrom(returnType)) {
return ReturnType.Uni;
}
if (CompletionStage.class.isAssignableFrom(returnType)) {
return ReturnType.CompletionStage;
}
return ReturnType.NonAsync;
}

protected Uni<?> asyncInvocationResultToUni(Object invocationResult, ReturnType returnType) {
if (returnType == ReturnType.Uni) {
return (Uni<?>) invocationResult;
} else if (returnType == ReturnType.CompletionStage) {
return Uni.createFrom().completionStage(new Supplier<>() {
@Override
public CompletionStage<?> get() {
return (CompletionStage<?>) invocationResult;
}
});
} else {
throw new CacheException(new IllegalStateException(UNHANDLED_ASYNC_RETURN_TYPE_MSG));
}
}

protected Object createAsyncResult(Uni<Object> cacheValue, ReturnType returnType) {
if (returnType == ReturnType.Uni) {
return cacheValue;
}
if (returnType == ReturnType.CompletionStage) {
return cacheValue.subscribeAsCompletionStage();
}
throw new CacheException(new IllegalStateException(UNHANDLED_ASYNC_RETURN_TYPE_MSG));
}

protected enum ReturnType {
NonAsync,
Uni,
CompletionStage
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,26 @@ public class CacheInvalidateAllInterceptor extends CacheInterceptor {
public Object intercept(InvocationContext invocationContext) throws Exception {
CacheInterceptionContext<CacheInvalidateAll> interceptionContext = getInterceptionContext(invocationContext,
CacheInvalidateAll.class, false);

if (interceptionContext.getInterceptorBindings().isEmpty()) {
// This should never happen.
LOGGER.warn(INTERCEPTOR_BINDINGS_ERROR_MSG);
return invocationContext.proceed();
} else if (isUniReturnType(invocationContext)) {
return invalidateAllNonBlocking(invocationContext, interceptionContext);
} else {
}
ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType());
if (returnType == ReturnType.NonAsync) {
return invalidateAllBlocking(invocationContext, interceptionContext);

} else {
return invalidateAllNonBlocking(invocationContext, interceptionContext, returnType);
}
}

private Object invalidateAllNonBlocking(InvocationContext invocationContext,
CacheInterceptionContext<CacheInvalidateAll> interceptionContext) {
CacheInterceptionContext<CacheInvalidateAll> interceptionContext,
ReturnType returnType) {
LOGGER.trace("Invalidating all cache entries in a non-blocking way");
return Multi.createFrom().iterable(interceptionContext.getInterceptorBindings())
var uni = Multi.createFrom().iterable(interceptionContext.getInterceptorBindings())
.onItem().transformToUniAndMerge(new Function<CacheInvalidateAll, Uni<? extends Void>>() {
@Override
public Uni<Void> apply(CacheInvalidateAll binding) {
Expand All @@ -53,12 +58,13 @@ public Uni<Void> apply(CacheInvalidateAll binding) {
@Override
public Uni<?> apply(Object ignored) {
try {
return (Uni<Object>) invocationContext.proceed();
return asyncInvocationResultToUni(invocationContext.proceed(), returnType);
} catch (Exception e) {
throw new CacheException(e);
}
}
});
return createAsyncResult(uni, returnType);
}

private Object invalidateAllBlocking(InvocationContext invocationContext,
Expand Down
Loading

0 comments on commit abb2a8b

Please sign in to comment.