From 87d5728d8d3fb5c9f1984f5b4933ed2b7e24e45c Mon Sep 17 00:00:00 2001
From: Clement Escoffier <clement.escoffier@gmail.com>
Date: Tue, 14 Nov 2023 14:09:44 +0100
Subject: [PATCH] Handle duplicated context in the CacheResultInterceptor

- Capture the context when calling the interceptor
- Make sure the item is emitted on the captured context

(cherry picked from commit f781b7fd106fd2d90656ac78d21f8e01374f0b68)
---
 .../cache/runtime/CacheResultInterceptor.java | 49 +++++++++++++++++++
 1 file changed, 49 insertions(+)

diff --git a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java
index dadc40e07d033..e6e1397b53dd4 100644
--- a/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java
+++ b/extensions/cache/runtime/src/main/java/io/quarkus/cache/runtime/CacheResultInterceptor.java
@@ -1,6 +1,7 @@
 package io.quarkus.cache.runtime;
 
 import java.time.Duration;
+import java.util.concurrent.Executor;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -16,6 +17,10 @@
 import io.smallrye.mutiny.Multi;
 import io.smallrye.mutiny.TimeoutException;
 import io.smallrye.mutiny.Uni;
+import io.vertx.core.Context;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.impl.ContextInternal;
 
 @CacheResult(cacheName = "") // The `cacheName` attribute is @Nonbinding.
 @Interceptor
@@ -53,6 +58,7 @@ public Object intercept(InvocationContext invocationContext) throws Throwable {
         try {
             ReturnType returnType = determineReturnType(invocationContext.getMethod().getReturnType());
             if (returnType != ReturnType.NonAsync) {
+                Context context = Vertx.currentContext();
                 Uni<Object> cacheValue = cache.getAsync(key, new Function<Object, Uni<Object>>() {
                     @SuppressWarnings("unchecked")
                     @Override
@@ -65,11 +71,54 @@ public Uni<Object> apply(Object key) {
                             throw new CacheException(e);
                         }
                     }
+                }).emitOn(new Executor() {
+                    // We need make sure we go back to the original context when the cache value is computed.
+                    // Otherwise, we would always emit on the context having computed the value, which could
+                    // break the duplicated context isolation.
+                    @Override
+                    public void execute(Runnable command) {
+                        Context ctx = Vertx.currentContext();
+                        if (context == null) {
+                            // We didn't capture a context
+                            if (ctx == null) {
+                                // We are not on a context => we can execute immediately.
+                                command.run();
+                            } else {
+                                // We are on a context.
+                                // We cannot continue on the current context as we may share a duplicated context.
+                                // We need a new one. Note that duplicate() does not duplicate the duplicated context,
+                                // but the root context.
+                                ((ContextInternal) ctx).duplicate()
+                                        .runOnContext(new Handler<Void>() {
+                                            @Override
+                                            public void handle(Void ignored) {
+                                                command.run();
+                                            }
+                                        });
+                            }
+                        } else {
+                            // We captured a context.
+                            if (ctx == context) {
+                                // We are on the same context => we can execute immediately
+                                command.run();
+                            } else {
+                                // 1) We are not on a context (ctx == null) => we need to switch to the captured context.
+                                // 2) We are on a different context (ctx != null) => we need to switch to the captured context.
+                                context.runOnContext(new Handler<Void>() {
+                                    @Override
+                                    public void handle(Void ignored) {
+                                        command.run();
+                                    }
+                                });
+                            }
+                        }
+                    }
                 });
 
                 if (binding.lockTimeout() <= 0) {
                     return createAsyncResult(cacheValue, returnType);
                 }
+                // IMPORTANT: The item/failure are emitted on the captured context.
                 cacheValue = cacheValue.ifNoItem().after(Duration.ofMillis(binding.lockTimeout()))
                         .recoverWithUni(new Supplier<Uni<?>>() {
                             @Override