Skip to content

Commit

Permalink
Merge pull request #22271 from stuartwdouglas/22251
Browse files Browse the repository at this point in the history
Don't block when returning a Uni
  • Loading branch information
gwenneg authored Dec 20, 2021
2 parents fc71715 + 17e42fa commit b81c20f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,57 +33,59 @@ public class UniValueTest {
public void test() {
// STEP 1
// Action: a method annotated with @CacheResult and returning a Uni is called.
// Expected effect: the method is invoked and an UnresolvedUniValue is cached.
// Expected effect: the method is not invoked, as Uni is lazy
// Verified by: invocations counter and CacheResultInterceptor log.
Uni<String> uni1 = cachedService.cachedMethod(KEY);
assertEquals(1, cachedService.getInvocations());
assertEquals(0, cachedService.getInvocations());

// STEP 2
// Action: same call as STEP 1.
// Expected effect: the method is invoked because the key is associated with a cached UnresolvedUniValue.
// Verified by: invocations counter and CacheResultInterceptor log.
// Expected effect: the method is not invoked, as Uni is lazy
Uni<String> uni2 = cachedService.cachedMethod(KEY);
assertEquals(2, cachedService.getInvocations());
assertEquals(0, cachedService.getInvocations());

// STEP 3
// Action: the Uni returned in STEP 1 is subscribed to and we wait for an item event to be fired.
// Expected effect: the UnresolvedUniValue cached during STEP 1 is replaced with the emitted item from this step in the cache.
// Verified by: subscriptions counter and CaffeineCache log.
String emittedItem1 = uni1.await().indefinitely();
assertEquals("1", emittedItem1); // This checks the subscriptions counter value.
//the method would be called to resolve the value
assertEquals(1, cachedService.getInvocations());

// STEP 4
// Action: the Uni returned in STEP 2 is subscribed to and we wait for an item event to be fired.
// Expected effect: the emitted item from STEP 3 is replaced with the emitted item from this step in the cache.
// Verified by: subscriptions counter, CaffeineCache log and different objects references between STEPS 3 and 4 emitted items.
String emittedItem2 = uni2.await().indefinitely();
assertTrue(emittedItem1 != emittedItem2);
assertEquals("2", emittedItem2); // This checks the subscriptions counter value.
assertTrue(emittedItem1 == emittedItem2);
assertEquals("1", emittedItem2); // This checks the subscriptions counter value.
assertEquals(1, cachedService.getInvocations());

// STEP 5
// Action: same call as STEP 2 but we immediately subscribe to the returned Uni and wait for an item event to be fired.
// Expected effect: the method is not invoked and the emitted item cached during STEP 4 is returned.
// Verified by: invocations and subscriptions counters, same object reference between STEPS 4 and 5 emitted items.
String emittedItem3 = cachedService.cachedMethod(KEY).await().indefinitely();
assertEquals(2, cachedService.getInvocations());
assertEquals("2", emittedItem3); // This checks the subscriptions counter value.
assertEquals(1, cachedService.getInvocations());
assertEquals("1", emittedItem3); // This checks the subscriptions counter value.
assertTrue(emittedItem2 == emittedItem3);

// STEP 6
// Action: same call as STEP 5 with a different key.
// Expected effect: the method is invoked and an UnresolvedUniValue is cached.
// Verified by: invocations and subscriptions counters, CacheResultInterceptor log and different objects references between STEPS 5 and 6 emitted items.
String emittedItem4 = cachedService.cachedMethod("another-key").await().indefinitely();
assertEquals(3, cachedService.getInvocations());
assertEquals("3", emittedItem4); // This checks the subscriptions counter value.
assertEquals(2, cachedService.getInvocations());
assertEquals("2", emittedItem4); // This checks the subscriptions counter value.
assertTrue(emittedItem3 != emittedItem4);
}

@ApplicationScoped
static class CachedService {

private int invocations;
private int subscriptions;
private volatile int invocations;
private volatile int subscriptions;

@CacheResult(cacheName = "test-cache")
public Uni<String> cachedMethod(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,71 @@ public Object intercept(InvocationContext invocationContext) throws Throwable {
}

try {

Uni<Object> cacheValue = cache.get(key, new Function<Object, Object>() {
@Override
public Object apply(Object k) {
try {
if (Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType())) {
LOGGER.debugf("Adding %s entry with key [%s] into cache [%s]",
UnresolvedUniValue.class.getSimpleName(), key, binding.cacheName());
return UnresolvedUniValue.INSTANCE;
} else {
return invocationContext.proceed();
final boolean isUni = Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType());
if (isUni) {
Uni<Object> ret = cache.get(key, new Function<Object, Object>() {
@Override
public Object apply(Object k) {
LOGGER.debugf("Adding %s entry with key [%s] into cache [%s]",
UnresolvedUniValue.class.getSimpleName(), key, binding.cacheName());
return UnresolvedUniValue.INSTANCE;
}
}).onItem().transformToUni(o -> {
if (o == UnresolvedUniValue.INSTANCE) {
try {
return ((Uni<Object>) invocationContext.proceed())
.onItem().call(emittedValue -> cache.replaceUniValue(key, emittedValue));
} catch (CacheException e) {
throw e;
} catch (Exception e) {
throw new CacheException(e);
}
} catch (Throwable e) {
throw new CacheException(e);
} else {
return Uni.createFrom().item(o);
}
});
if (binding.lockTimeout() <= 0) {
return ret;
}
});
return ret.ifNoItem().after(Duration.ofMillis(binding.lockTimeout())).recoverWithUni(() -> {
try {
return (Uni<?>) invocationContext.proceed();
} catch (CacheException e) {
throw e;
} catch (Exception e) {
throw new CacheException(e);
}
});

Object value;
if (binding.lockTimeout() <= 0) {
value = cacheValue.await().indefinitely();
} else {
try {
/*
* If the current thread started the cache value computation, then the computation is already finished since
* it was done synchronously and the following call will never time out.
*/
value = cacheValue.await().atMost(Duration.ofMillis(binding.lockTimeout()));
} catch (TimeoutException e) {
// TODO: Add statistics here to monitor the timeout.
return invocationContext.proceed();
Uni<Object> cacheValue = cache.get(key, new Function<Object, Object>() {
@Override
public Object apply(Object k) {
try {
return invocationContext.proceed();
} catch (CacheException e) {
throw e;
} catch (Throwable e) {
throw new CacheException(e);
}
}
});
Object value;
if (binding.lockTimeout() <= 0) {
value = cacheValue.await().indefinitely();
} else {
try {
/*
* If the current thread started the cache value computation, then the computation is already finished
* since
* it was done synchronously and the following call will never time out.
*/
value = cacheValue.await().atMost(Duration.ofMillis(binding.lockTimeout()));
} catch (TimeoutException e) {
// TODO: Add statistics here to monitor the timeout.
return invocationContext.proceed();
}
}
}

if (Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType())) {
return resolveUni(invocationContext, cache, key, value);
} else {
return value;
}

Expand All @@ -90,14 +118,4 @@ public Object apply(Object k) {
}
}
}

private Object resolveUni(InvocationContext invocationContext, AbstractCache cache, Object key, Object value)
throws Exception {
if (value == UnresolvedUniValue.INSTANCE) {
return ((Uni<Object>) invocationContext.proceed())
.onItem().call(emittedValue -> cache.replaceUniValue(key, emittedValue));
} else {
return Uni.createFrom().item(value);
}
}
}

0 comments on commit b81c20f

Please sign in to comment.