diff --git a/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java index 824a65edde1bb..a75ec89895578 100644 --- a/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java +++ b/extensions/cache/deployment/src/test/java/io/quarkus/cache/test/runtime/UniValueTest.java @@ -33,17 +33,16 @@ public class UniValueTest { public void test() { // STEP 1 // Action: a method annotated with @CacheResult and returning a Uni is called. - // Expected effect: the method is invoked and an UnresolvedUniValue is cached. + // Expected effect: the method is not invoked, as Uni is lazy // Verified by: invocations counter and CacheResultInterceptor log. Uni uni1 = cachedService.cachedMethod(KEY); - assertEquals(1, cachedService.getInvocations()); + assertEquals(0, cachedService.getInvocations()); // STEP 2 // Action: same call as STEP 1. - // Expected effect: the method is invoked because the key is associated with a cached UnresolvedUniValue. - // Verified by: invocations counter and CacheResultInterceptor log. + // Expected effect: the method is not invoked, as Uni is lazy Uni uni2 = cachedService.cachedMethod(KEY); - assertEquals(2, cachedService.getInvocations()); + assertEquals(0, cachedService.getInvocations()); // STEP 3 // Action: the Uni returned in STEP 1 is subscribed to and we wait for an item event to be fired. @@ -51,22 +50,25 @@ public void test() { // Verified by: subscriptions counter and CaffeineCache log. String emittedItem1 = uni1.await().indefinitely(); assertEquals("1", emittedItem1); // This checks the subscriptions counter value. + //the method would be called to resolve the value + assertEquals(1, cachedService.getInvocations()); // STEP 4 // Action: the Uni returned in STEP 2 is subscribed to and we wait for an item event to be fired. // Expected effect: the emitted item from STEP 3 is replaced with the emitted item from this step in the cache. // Verified by: subscriptions counter, CaffeineCache log and different objects references between STEPS 3 and 4 emitted items. String emittedItem2 = uni2.await().indefinitely(); - assertTrue(emittedItem1 != emittedItem2); - assertEquals("2", emittedItem2); // This checks the subscriptions counter value. + assertTrue(emittedItem1 == emittedItem2); + assertEquals("1", emittedItem2); // This checks the subscriptions counter value. + assertEquals(1, cachedService.getInvocations()); // STEP 5 // Action: same call as STEP 2 but we immediately subscribe to the returned Uni and wait for an item event to be fired. // Expected effect: the method is not invoked and the emitted item cached during STEP 4 is returned. // Verified by: invocations and subscriptions counters, same object reference between STEPS 4 and 5 emitted items. String emittedItem3 = cachedService.cachedMethod(KEY).await().indefinitely(); - assertEquals(2, cachedService.getInvocations()); - assertEquals("2", emittedItem3); // This checks the subscriptions counter value. + assertEquals(1, cachedService.getInvocations()); + assertEquals("1", emittedItem3); // This checks the subscriptions counter value. assertTrue(emittedItem2 == emittedItem3); // STEP 6 @@ -74,16 +76,16 @@ public void test() { // Expected effect: the method is invoked and an UnresolvedUniValue is cached. // Verified by: invocations and subscriptions counters, CacheResultInterceptor log and different objects references between STEPS 5 and 6 emitted items. String emittedItem4 = cachedService.cachedMethod("another-key").await().indefinitely(); - assertEquals(3, cachedService.getInvocations()); - assertEquals("3", emittedItem4); // This checks the subscriptions counter value. + assertEquals(2, cachedService.getInvocations()); + assertEquals("2", emittedItem4); // This checks the subscriptions counter value. assertTrue(emittedItem3 != emittedItem4); } @ApplicationScoped static class CachedService { - private int invocations; - private int subscriptions; + private volatile int invocations; + private volatile int subscriptions; @CacheResult(cacheName = "test-cache") public Uni cachedMethod(String key) { diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java index 034dc77df8971..e780bbda78691 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java @@ -42,43 +42,71 @@ public Object intercept(InvocationContext invocationContext) throws Throwable { } try { - - Uni cacheValue = cache.get(key, new Function() { - @Override - public Object apply(Object k) { - try { - if (Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType())) { - LOGGER.debugf("Adding %s entry with key [%s] into cache [%s]", - UnresolvedUniValue.class.getSimpleName(), key, binding.cacheName()); - return UnresolvedUniValue.INSTANCE; - } else { - return invocationContext.proceed(); + final boolean isUni = Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType()); + if (isUni) { + Uni ret = cache.get(key, new Function() { + @Override + public Object apply(Object k) { + LOGGER.debugf("Adding %s entry with key [%s] into cache [%s]", + UnresolvedUniValue.class.getSimpleName(), key, binding.cacheName()); + return UnresolvedUniValue.INSTANCE; + } + }).onItem().transformToUni(o -> { + if (o == UnresolvedUniValue.INSTANCE) { + try { + return ((Uni) invocationContext.proceed()) + .onItem().call(emittedValue -> cache.replaceUniValue(key, emittedValue)); + } catch (CacheException e) { + throw e; + } catch (Exception e) { + throw new CacheException(e); } - } catch (Throwable e) { - throw new CacheException(e); + } else { + return Uni.createFrom().item(o); } + }); + if (binding.lockTimeout() <= 0) { + return ret; } - }); + return ret.ifNoItem().after(Duration.ofMillis(binding.lockTimeout())).recoverWithUni(() -> { + try { + return (Uni) invocationContext.proceed(); + } catch (CacheException e) { + throw e; + } catch (Exception e) { + throw new CacheException(e); + } + }); - Object value; - if (binding.lockTimeout() <= 0) { - value = cacheValue.await().indefinitely(); } else { - try { - /* - * If the current thread started the cache value computation, then the computation is already finished since - * it was done synchronously and the following call will never time out. - */ - value = cacheValue.await().atMost(Duration.ofMillis(binding.lockTimeout())); - } catch (TimeoutException e) { - // TODO: Add statistics here to monitor the timeout. - return invocationContext.proceed(); + Uni cacheValue = cache.get(key, new Function() { + @Override + public Object apply(Object k) { + try { + return invocationContext.proceed(); + } catch (CacheException e) { + throw e; + } catch (Throwable e) { + throw new CacheException(e); + } + } + }); + Object value; + if (binding.lockTimeout() <= 0) { + value = cacheValue.await().indefinitely(); + } else { + try { + /* + * If the current thread started the cache value computation, then the computation is already finished + * since + * it was done synchronously and the following call will never time out. + */ + value = cacheValue.await().atMost(Duration.ofMillis(binding.lockTimeout())); + } catch (TimeoutException e) { + // TODO: Add statistics here to monitor the timeout. + return invocationContext.proceed(); + } } - } - - if (Uni.class.isAssignableFrom(invocationContext.getMethod().getReturnType())) { - return resolveUni(invocationContext, cache, key, value); - } else { return value; } @@ -90,14 +118,4 @@ public Object apply(Object k) { } } } - - private Object resolveUni(InvocationContext invocationContext, AbstractCache cache, Object key, Object value) - throws Exception { - if (value == UnresolvedUniValue.INSTANCE) { - return ((Uni) invocationContext.proceed()) - .onItem().call(emittedValue -> cache.replaceUniValue(key, emittedValue)); - } else { - return Uni.createFrom().item(value); - } - } }