diff --git a/context-propagation/src/main/java/io/smallrye/mutiny/context/DefaultContextPropagationInterceptor.java b/context-propagation/src/main/java/io/smallrye/mutiny/context/DefaultContextPropagationInterceptor.java index 4dfcd1c4c..29df3ad30 100644 --- a/context-propagation/src/main/java/io/smallrye/mutiny/context/DefaultContextPropagationInterceptor.java +++ b/context-propagation/src/main/java/io/smallrye/mutiny/context/DefaultContextPropagationInterceptor.java @@ -1,7 +1,5 @@ package io.smallrye.mutiny.context; -import org.eclipse.microprofile.context.spi.ContextManagerProvider; - import io.smallrye.context.SmallRyeThreadContext; /** @@ -9,12 +7,8 @@ */ public class DefaultContextPropagationInterceptor extends BaseContextPropagationInterceptor { - static final SmallRyeThreadContext THREAD_CONTEXT = (SmallRyeThreadContext) ContextManagerProvider.instance() - .getContextManager() - .newThreadContextBuilder().build(); - @Override protected SmallRyeThreadContext getThreadContext() { - return THREAD_CONTEXT; + return SmallRyeThreadContext.getCurrentThreadContextOrPropagatedContexts(); } } diff --git a/context-propagation/src/main/java/io/smallrye/mutiny/context/MutinyContextManagerExtension.java b/context-propagation/src/main/java/io/smallrye/mutiny/context/MutinyContextManagerExtension.java index e6af3fb2b..75ce74aab 100644 --- a/context-propagation/src/main/java/io/smallrye/mutiny/context/MutinyContextManagerExtension.java +++ b/context-propagation/src/main/java/io/smallrye/mutiny/context/MutinyContextManagerExtension.java @@ -1,17 +1,26 @@ package io.smallrye.mutiny.context; +import java.util.concurrent.CompletableFuture; +import java.util.function.UnaryOperator; + import org.eclipse.microprofile.context.ThreadContext; import org.eclipse.microprofile.context.spi.ContextManager; import org.eclipse.microprofile.context.spi.ContextManagerExtension; +import io.smallrye.context.SmallRyeThreadContext; import io.smallrye.mutiny.infrastructure.Infrastructure; public class MutinyContextManagerExtension implements ContextManagerExtension { @Override public void setup(ContextManager manager) { - ThreadContext threadContext = manager.newThreadContextBuilder().build(); - Infrastructure.setCompletableFutureWrapper(threadContext::withContextCapture); + Infrastructure.setCompletableFutureWrapper(new UnaryOperator>() { + @Override + public CompletableFuture apply(CompletableFuture t) { + ThreadContext threadContext = SmallRyeThreadContext.getCurrentThreadContextOrPropagatedContexts(); + return threadContext.withContextCapture(t); + } + }); } } diff --git a/context-propagation/src/test/java/io/smallrye/mutiny/context/MultiContextPropagationTest.java b/context-propagation/src/test/java/io/smallrye/mutiny/context/MultiContextPropagationTest.java index 1b8f6a8d4..5fb82b6f3 100644 --- a/context-propagation/src/test/java/io/smallrye/mutiny/context/MultiContextPropagationTest.java +++ b/context-propagation/src/test/java/io/smallrye/mutiny/context/MultiContextPropagationTest.java @@ -8,8 +8,11 @@ import java.util.List; import java.util.concurrent.*; +import org.eclipse.microprofile.context.ThreadContext; import org.junit.jupiter.api.*; +import io.smallrye.context.CleanAutoCloseable; +import io.smallrye.context.SmallRyeThreadContext; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.test.AssertSubscriber; @@ -293,4 +296,39 @@ public void testScan() { .assertItems(0, 1, 3, 6, 10, 15, 21, 28, 36, 45); } + + @Test + public void testContextOverride() { + MyContext ctx = MyContext.get(); + assertThat(ctx).isNotNull(); + SmallRyeThreadContext emptyContext = SmallRyeThreadContext.builder().cleared(ThreadContext.ALL_REMAINING) + .propagated(ThreadContext.NONE).build(); + Multi multi; + // remove context propagation in this scope + try (CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(emptyContext)) { + multi = Multi.createFrom() + .item(() -> { + assertThat(MyContext.get()).isNull(); + return 2; + }) + .runSubscriptionOn(executor) + .map(r -> { + assertThat(MyContext.get()).isNull(); + return r; + }); + } + + Uni latch = Multi.createFrom(). emitter(emitter -> new Thread(() -> { + try { + int result = multi.toUni().await().indefinitely(); + emitter.emit(result); + emitter.complete(); + } catch (Throwable t) { + emitter.fail(t); + } + }).start()).toUni(); + + int result = latch.await().indefinitely(); + assertThat(result).isEqualTo(2); + } } diff --git a/context-propagation/src/test/java/io/smallrye/mutiny/context/UniContextPropagationTest.java b/context-propagation/src/test/java/io/smallrye/mutiny/context/UniContextPropagationTest.java index a82873f0e..48cd13eac 100644 --- a/context-propagation/src/test/java/io/smallrye/mutiny/context/UniContextPropagationTest.java +++ b/context-propagation/src/test/java/io/smallrye/mutiny/context/UniContextPropagationTest.java @@ -10,8 +10,11 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import org.eclipse.microprofile.context.ThreadContext; import org.junit.jupiter.api.*; +import io.smallrye.context.CleanAutoCloseable; +import io.smallrye.context.SmallRyeThreadContext; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.subscription.Cancellable; @@ -436,4 +439,37 @@ public void testCache() { assertThat(uni.await().atMost(Duration.ofMillis(100))).isEqualTo(4); } + @Test + public void testContextOverride() { + MyContext ctx = MyContext.get(); + assertThat(ctx).isNotNull(); + SmallRyeThreadContext emptyContext = SmallRyeThreadContext.builder().cleared(ThreadContext.ALL_REMAINING) + .propagated(ThreadContext.NONE).build(); + Uni uni; + // remove context propagation in this scope + try (CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(emptyContext)) { + uni = Uni.createFrom() + .item(() -> { + assertThat(MyContext.get()).isNull(); + return 2; + }) + .runSubscriptionOn(executor) + .map(r -> { + assertThat(MyContext.get()).isNull(); + return r; + }); + } + + Uni latch = Uni.createFrom().emitter(emitter -> new Thread(() -> { + try { + int result = uni.await().indefinitely(); + emitter.complete(result); + } catch (Throwable t) { + emitter.fail(t); + } + }).start()); + + int result = latch.await().indefinitely(); + assertThat(result).isEqualTo(2); + } }