Skip to content

Commit

Permalink
Use the thread-local ThreadContext in order to be able to define whic…
Browse files Browse the repository at this point in the history
…h contexts get propagated

Fixes #626
  • Loading branch information
FroMage committed Jul 21, 2021
1 parent 0e32e0a commit 4e32565
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package io.smallrye.mutiny.context;

import org.eclipse.microprofile.context.spi.ContextManagerProvider;

import io.smallrye.context.SmallRyeThreadContext;

/**
* Provides context propagation by intercepting the user callbacks.
*/
public class DefaultContextPropagationInterceptor extends BaseContextPropagationInterceptor {

static final SmallRyeThreadContext THREAD_CONTEXT = (SmallRyeThreadContext) ContextManagerProvider.instance()
.getContextManager()
.newThreadContextBuilder().build();

@Override
protected SmallRyeThreadContext getThreadContext() {
return THREAD_CONTEXT;
return SmallRyeThreadContext.getCurrentThreadContextOrPropagatedContexts();
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
package io.smallrye.mutiny.context;

import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;

import org.eclipse.microprofile.context.ThreadContext;
import org.eclipse.microprofile.context.spi.ContextManager;
import org.eclipse.microprofile.context.spi.ContextManagerExtension;

import io.smallrye.context.SmallRyeThreadContext;
import io.smallrye.mutiny.infrastructure.Infrastructure;

public class MutinyContextManagerExtension implements ContextManagerExtension {

@Override
public void setup(ContextManager manager) {
ThreadContext threadContext = manager.newThreadContextBuilder().build();
Infrastructure.setCompletableFutureWrapper(threadContext::withContextCapture);
Infrastructure.setCompletableFutureWrapper(new UnaryOperator<CompletableFuture<?>>() {
@Override
public CompletableFuture<?> apply(CompletableFuture<?> t) {
ThreadContext threadContext = SmallRyeThreadContext.getCurrentThreadContextOrPropagatedContexts();
return threadContext.withContextCapture(t);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import java.util.List;
import java.util.concurrent.*;

import org.eclipse.microprofile.context.ThreadContext;
import org.junit.jupiter.api.*;

import io.smallrye.context.CleanAutoCloseable;
import io.smallrye.context.SmallRyeThreadContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
Expand Down Expand Up @@ -293,4 +296,39 @@ public void testScan() {
.assertItems(0, 1, 3, 6, 10, 15, 21, 28, 36, 45);

}

@Test
public void testContextOverride() {
MyContext ctx = MyContext.get();
assertThat(ctx).isNotNull();
SmallRyeThreadContext emptyContext = SmallRyeThreadContext.builder().cleared(ThreadContext.ALL_REMAINING)
.propagated(ThreadContext.NONE).build();
Multi<Integer> multi;
// remove context propagation in this scope
try (CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(emptyContext)) {
multi = Multi.createFrom()
.item(() -> {
assertThat(MyContext.get()).isNull();
return 2;
})
.runSubscriptionOn(executor)
.map(r -> {
assertThat(MyContext.get()).isNull();
return r;
});
}

Uni<Integer> latch = Multi.createFrom().<Integer> emitter(emitter -> new Thread(() -> {
try {
int result = multi.toUni().await().indefinitely();
emitter.emit(result);
emitter.complete();
} catch (Throwable t) {
emitter.fail(t);
}
}).start()).toUni();

int result = latch.await().indefinitely();
assertThat(result).isEqualTo(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.eclipse.microprofile.context.ThreadContext;
import org.junit.jupiter.api.*;

import io.smallrye.context.CleanAutoCloseable;
import io.smallrye.context.SmallRyeThreadContext;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.Cancellable;
Expand Down Expand Up @@ -436,4 +439,37 @@ public void testCache() {
assertThat(uni.await().atMost(Duration.ofMillis(100))).isEqualTo(4);
}

@Test
public void testContextOverride() {
MyContext ctx = MyContext.get();
assertThat(ctx).isNotNull();
SmallRyeThreadContext emptyContext = SmallRyeThreadContext.builder().cleared(ThreadContext.ALL_REMAINING)
.propagated(ThreadContext.NONE).build();
Uni<Integer> uni;
// remove context propagation in this scope
try (CleanAutoCloseable ac = SmallRyeThreadContext.withThreadContext(emptyContext)) {
uni = Uni.createFrom()
.item(() -> {
assertThat(MyContext.get()).isNull();
return 2;
})
.runSubscriptionOn(executor)
.map(r -> {
assertThat(MyContext.get()).isNull();
return r;
});
}

Uni<Integer> latch = Uni.createFrom().emitter(emitter -> new Thread(() -> {
try {
int result = uni.await().indefinitely();
emitter.complete(result);
} catch (Throwable t) {
emitter.fail(t);
}
}).start());

int result = latch.await().indefinitely();
assertThat(result).isEqualTo(2);
}
}

0 comments on commit 4e32565

Please sign in to comment.