From 7120304aabb7e0c458f2e650a82d3a0ce99c5e50 Mon Sep 17 00:00:00 2001 From: Gwenneg Lepage Date: Wed, 3 Jul 2024 00:07:39 +0200 Subject: [PATCH 1/2] Handle duplicated Vert.x context in CaffeineCacheImpl --- .../cache/runtime/CacheResultInterceptor.java | 48 ----------------- .../runtime/caffeine/CaffeineCacheImpl.java | 52 ++++++++++++++++++- 2 files changed, 51 insertions(+), 49 deletions(-) diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java index f635d965a518e..f16d6674ee7ed 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java @@ -1,7 +1,6 @@ package io.quarkus.cache.runtime; import java.time.Duration; -import java.util.concurrent.Executor; import java.util.function.Function; import java.util.function.Supplier; @@ -17,10 +16,6 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.TimeoutException; import io.smallrye.mutiny.Uni; -import io.vertx.core.Context; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.core.impl.ContextInternal; @CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding. @Interceptor @@ -58,7 +53,6 @@ public Object intercept(InvocationContext invocationContext) throws Throwable { try { ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType()); if (returnType != ReturnType.NonAsync) { - Context context = Vertx.currentContext(); Uni cacheValue = cache.getAsync(key, new Function>() { @SuppressWarnings("unchecked") @Override @@ -76,48 +70,6 @@ public Uni apply(Object key) { public Uni apply(Throwable throwable) { return cache.invalidate(key).replaceWith(throwable); } - }).emitOn(new Executor() { - // We need make sure we go back to the original context when the cache value is computed. - // Otherwise, we would always emit on the context having computed the value, which could - // break the duplicated context isolation. - @Override - public void execute(Runnable command) { - Context ctx = Vertx.currentContext(); - if (context == null) { - // We didn't capture a context - if (ctx == null) { - // We are not on a context => we can execute immediately. - command.run(); - } else { - // We are on a context. - // We cannot continue on the current context as we may share a duplicated context. - // We need a new one. Note that duplicate() does not duplicate the duplicated context, - // but the root context. - ((ContextInternal) ctx).duplicate() - .runOnContext(new Handler() { - @Override - public void handle(Void ignored) { - command.run(); - } - }); - } - } else { - // We captured a context. - if (ctx == context) { - // We are on the same context => we can execute immediately - command.run(); - } else { - // 1) We are not on a context (ctx == null) => we need to switch to the captured context. - // 2) We are on a different context (ctx != null) => we need to switch to the captured context. - context.runOnContext(new Handler() { - @Override - public void handle(Void ignored) { - command.run(); - } - }); - } - } - } }); if (binding.lockTimeout() <= 0) { diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCacheImpl.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCacheImpl.java index dff31eb8dab1e..278c1ebc3ab15 100644 --- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCacheImpl.java +++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/caffeine/CaffeineCacheImpl.java @@ -8,6 +8,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -26,6 +27,10 @@ import io.quarkus.cache.runtime.AbstractCache; import io.quarkus.cache.runtime.NullValueConverter; import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; /** * This class is an internal Quarkus cache implementation using Caffeine. Do not use it explicitly from your Quarkus @@ -99,6 +104,7 @@ public CompletionStage get() { @Override public Uni getAsync(K key, Function> valueLoader) { Objects.requireNonNull(key, NULL_KEYS_NOT_SUPPORTED_MSG); + Context context = Vertx.currentContext(); return Uni.createFrom() .completionStage(new Supplier>() { @Override @@ -119,7 +125,51 @@ public CompletableFuture apply(Object key) { recorder.doRecord(key); return result; } - }).map(fromCacheValue()); + }) + .map(fromCacheValue()) + .emitOn(new Executor() { + // We need make sure we go back to the original context when the cache value is computed. + // Otherwise, we would always emit on the context having computed the value, which could + // break the duplicated context isolation. + @Override + public void execute(Runnable command) { + Context ctx = Vertx.currentContext(); + if (context == null) { + // We didn't capture a context + if (ctx == null) { + // We are not on a context => we can execute immediately. + command.run(); + } else { + // We are on a context. + // We cannot continue on the current context as we may share a duplicated context. + // We need a new one. Note that duplicate() does not duplicate the duplicated context, + // but the root context. + ((ContextInternal) ctx).duplicate() + .runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } else { + // We captured a context. + if (ctx == context) { + // We are on the same context => we can execute immediately + command.run(); + } else { + // 1) We are not on a context (ctx == null) => we need to switch to the captured context. + // 2) We are on a different context (ctx != null) => we need to switch to the captured context. + context.runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } + } + }); } @Override From dca21394fb812f6423cc82a67fce67516e8d8b83 Mon Sep 17 00:00:00 2001 From: Katia Aresti Date: Fri, 5 Jul 2024 20:13:42 +0200 Subject: [PATCH 2/2] Update Infinispan Cache Implementation to propagate Vert.x context correctly (#298) --- .../runtime/InfinispanCacheImpl.java | 51 ++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java b/extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java index a92f6f4e70fd7..cc0ea14720eb6 100644 --- a/extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java +++ b/extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java @@ -5,6 +5,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -24,6 +25,10 @@ import io.quarkus.infinispan.client.runtime.InfinispanClientUtil; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; /** * This class is an internal Quarkus cache implementation using Infinispan. @@ -111,6 +116,8 @@ public Uni get(K key, Function valueLoader) { @Override public Uni getAsync(K key, Function> valueLoader) { + Context context = Vertx.currentContext(); + return Uni.createFrom().completionStage(CompletionStages.handleAndCompose(remoteCache.getAsync(key), (v1, ex1) -> { if (ex1 != null) { return CompletableFuture.failedFuture(ex1); @@ -145,7 +152,49 @@ public Uni getAsync(K key, Function> valueLoader) { } }); return resultAsync; - })); + })).emitOn(new Executor() { + // We need make sure we go back to the original context when the cache value is computed. + // Otherwise, we would always emit on the context having computed the value, which could + // break the duplicated context isolation. + @Override + public void execute(Runnable command) { + Context ctx = Vertx.currentContext(); + if (context == null) { + // We didn't capture a context + if (ctx == null) { + // We are not on a context => we can execute immediately. + command.run(); + } else { + // We are on a context. + // We cannot continue on the current context as we may share a duplicated context. + // We need a new one. Note that duplicate() does not duplicate the duplicated context, + // but the root context. + ((ContextInternal) ctx).duplicate() + .runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } else { + // We captured a context. + if (ctx == context) { + // We are on the same context => we can execute immediately + command.run(); + } else { + // 1) We are not on a context (ctx == null) => we need to switch to the captured context. + // 2) We are on a different context (ctx != null) => we need to switch to the captured context. + context.runOnContext(new Handler() { + @Override + public void handle(Void ignored) { + command.run(); + } + }); + } + } + } + }); } @Override