Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
FroMage committed Jul 17, 2020
1 parent cdd6167 commit 0850664
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 42 deletions.
43 changes: 26 additions & 17 deletions core/src/main/java/io/smallrye/context/CompletionStageWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@
public class CompletionStageWrapper<T> implements CompletionStage<T>, Contextualized {
private final CompletionStage<T> f;
private final SmallRyeThreadContext context;
private final Executor executor;

public CompletionStageWrapper(SmallRyeThreadContext context, CompletionStage<T> f) {
public CompletionStageWrapper(SmallRyeThreadContext context, CompletionStage<T> f, Executor executor) {
this.context = context;
this.f = f;
this.executor = executor;
}

private void checkDefaultExecutor() {
throw new UnsupportedOperationException("Async methods not supported when no executor is specified");
if (executor == null)
throw new UnsupportedOperationException("Async methods not supported when no executor is specified");
}

@Override
public CompletableFuture<T> toCompletableFuture() {
// FIXME: propagate an executor too here?
return context.withContextCapture(f.toCompletableFuture(), null);
return context.withContextCapture(f.toCompletableFuture(), executor);
}

@Override
Expand All @@ -42,7 +44,7 @@ public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends
@Override
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {
checkDefaultExecutor();
return context.withContextCapture(f.handleAsync(context.contextualFunctionUnlessContextualized(fn)));
return context.withContextCapture(f.handleAsync(context.contextualFunctionUnlessContextualized(fn), executor));
}

@Override
Expand All @@ -58,7 +60,7 @@ public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn) {
@Override
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
checkDefaultExecutor();
return context.withContextCapture(f.thenApplyAsync(context.contextualFunctionUnlessContextualized(fn)));
return context.withContextCapture(f.thenApplyAsync(context.contextualFunctionUnlessContextualized(fn), executor));
}

@Override
Expand All @@ -74,7 +76,7 @@ public CompletionStage<Void> thenAccept(Consumer<? super T> action) {
@Override
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) {
checkDefaultExecutor();
return context.withContextCapture(f.thenAcceptAsync(context.contextualConsumerUnlessContextualized(action)));
return context.withContextCapture(f.thenAcceptAsync(context.contextualConsumerUnlessContextualized(action), executor));
}

@Override
Expand All @@ -90,7 +92,7 @@ public CompletionStage<Void> thenRun(Runnable action) {
@Override
public CompletionStage<Void> thenRunAsync(Runnable action) {
checkDefaultExecutor();
return context.withContextCapture(f.thenRunAsync(context.contextualRunnableUnlessContextualized(action)));
return context.withContextCapture(f.thenRunAsync(context.contextualRunnableUnlessContextualized(action), executor));
}

@Override
Expand All @@ -108,7 +110,8 @@ public <U, V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn) {
checkDefaultExecutor();
return context.withContextCapture(f.thenCombineAsync(other, context.contextualFunctionUnlessContextualized(fn)));
return context
.withContextCapture(f.thenCombineAsync(other, context.contextualFunctionUnlessContextualized(fn), executor));
}

@Override
Expand All @@ -128,7 +131,8 @@ public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> oth
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
checkDefaultExecutor();
return context.withContextCapture(f.thenAcceptBothAsync(other, context.contextualConsumerUnlessContextualized(action)));
return context.withContextCapture(
f.thenAcceptBothAsync(other, context.contextualConsumerUnlessContextualized(action), executor));
}

@Override
Expand All @@ -146,7 +150,8 @@ public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable act
@Override
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
checkDefaultExecutor();
return context.withContextCapture(f.runAfterBothAsync(other, context.contextualRunnableUnlessContextualized(action)));
return context.withContextCapture(
f.runAfterBothAsync(other, context.contextualRunnableUnlessContextualized(action), executor));
}

@Override
Expand All @@ -163,7 +168,8 @@ public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,
@Override
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {
checkDefaultExecutor();
return context.withContextCapture(f.applyToEitherAsync(other, context.contextualFunctionUnlessContextualized(fn)));
return context
.withContextCapture(f.applyToEitherAsync(other, context.contextualFunctionUnlessContextualized(fn), executor));
}

@Override
Expand All @@ -181,7 +187,8 @@ public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Co
@Override
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
checkDefaultExecutor();
return context.withContextCapture(f.acceptEitherAsync(other, context.contextualConsumerUnlessContextualized(action)));
return context.withContextCapture(
f.acceptEitherAsync(other, context.contextualConsumerUnlessContextualized(action), executor));
}

@Override
Expand All @@ -199,7 +206,8 @@ public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable a
@Override
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
checkDefaultExecutor();
return context.withContextCapture(f.runAfterEitherAsync(other, context.contextualRunnableUnlessContextualized(action)));
return context.withContextCapture(
f.runAfterEitherAsync(other, context.contextualRunnableUnlessContextualized(action), executor));
}

@Override
Expand All @@ -216,7 +224,7 @@ public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends Completi
@Override
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
checkDefaultExecutor();
return context.withContextCapture(f.thenComposeAsync(context.contextualFunctionUnlessContextualized(fn)));
return context.withContextCapture(f.thenComposeAsync(context.contextualFunctionUnlessContextualized(fn), executor));
}

@Override
Expand All @@ -233,7 +241,8 @@ public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable>
@Override
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
checkDefaultExecutor();
return context.withContextCapture(f.whenCompleteAsync(context.contextualConsumerUnlessContextualized(action)));
return context
.withContextCapture(f.whenCompleteAsync(context.contextualConsumerUnlessContextualized(action), executor));
}

@Override
Expand All @@ -256,4 +265,4 @@ public int hashCode() {
public boolean equals(Object obj) {
return f.equals(obj);
}
}
}
6 changes: 3 additions & 3 deletions core/src/main/java/io/smallrye/context/JdkSpecific.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ public class JdkSpecific {

public interface Contract {
public <T> CompletionStage<T> newCompletionStageWrapper(SmallRyeThreadContext threadContext,
CompletionStage<T> future);
CompletionStage<T> future, Executor executor);

public <T> CompletableFuture<T> newCompletableFutureWrapper(SmallRyeThreadContext threadContext,
CompletableFuture<T> future, Executor executor, boolean minimal);
}

public static <T> CompletionStage<T> newCompletionStageWrapper(SmallRyeThreadContext threadContext,
CompletionStage<T> future) {
return impl.newCompletionStageWrapper(threadContext, future);
CompletionStage<T> future, Executor executor) {
return impl.newCompletionStageWrapper(threadContext, future, executor);
}

public static <T> CompletableFuture<T> newCompletableFutureWrapper(SmallRyeThreadContext threadContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,87 @@ public static Builder builder() {
return SmallRyeContextManagerProvider.instance().getContextManager().newManagedExecutorBuilder();
}

//
// Extras

/**
* <p>
* Returns a new <code>CompletableFuture</code> that is completed by the completion of the
* specified stage.
* </p>
*
* <p>
* The new completable future is backed by the ManagedExecutor upon which copy is invoked,
* which serves as the default asynchronous execution facility
* for the new stage and all dependent stages created from it, and so forth.
* </p>
*
* <p>
* When dependent stages are created from the new completable future, thread context is captured
* and/or cleared by the ManagedExecutor. This guarantees that the action
* performed by each stage always runs under the thread context of the code that creates the stage,
* unless the user explicitly overrides by supplying a pre-contextualized action.
* </p>
*
* <p>
* Invocation of this method does not impact thread context propagation for the supplied
* completable future or any dependent stages created from it, other than the new dependent
* completable future that is created by this method.
* </p>
*
* @param <T> completable future result type.
* @param stage a completable future whose completion triggers completion of the new completable
* future that is created by this method.
* @return the new completable future.
*/
public <T> CompletableFuture<T> copy(CompletableFuture<T> stage) {
return threadContext.withContextCapture(stage, this);
}

/**
* <p>
* Returns a new <code>CompletionStage</code> that is completed by the completion of the
* specified stage.
* </p>
*
* <p>
* The new completable future is backed by the ManagedExecutor upon which copy is invoked,
* which serves as the default asynchronous execution facility
* for the new stage and all dependent stages created from it, and so forth.
* </p>
*
* <p>
* When dependent stages are created from the new completable future, thread context is captured
* and/or cleared by the ManagedExecutor. This guarantees that the action
* performed by each stage always runs under the thread context of the code that creates the stage,
* unless the user explicitly overrides by supplying a pre-contextualized action.
* </p>
*
* <p>
* Invocation of this method does not impact thread context propagation for the supplied
* stage or any dependent stages created from it, other than the new dependent
* completion stage that is created by this method.
* </p>
*
* @param <T> completion stage result type.
* @param stage a completion stage whose completion triggers completion of the new stage
* that is created by this method.
* @return the new completion stage.
*/
public <T> CompletionStage<T> copy(CompletionStage<T> stage) {
return threadContext.withContextCapture(stage, this);
}

/**
* Returns a <code>ThreadContext</code> which has the same propagation settings as this <code>ManagedExecutor</code>,
* which uses this <code>ManagedExecutor</code> as its default executor.
*
* @return a ThreadContext with the same propagation settings as this ManagedExecutor.
*/
public SmallRyeThreadContext getThreadContext() {
return threadContext;
}

public static class Builder implements ManagedExecutor.Builder {

private SmallRyeContextManager manager;
Expand Down Expand Up @@ -258,7 +339,7 @@ public SmallRyeManagedExecutor build() {
else
executor = SmallRyeManagedExecutor.newThreadPoolExecutor(maxAsync, maxQueued);
return new SmallRyeManagedExecutor(maxAsync, maxQueued,
new SmallRyeThreadContext(manager, propagated, SmallRyeContextManager.NO_STRING, cleared),
new SmallRyeThreadContext(manager, propagated, SmallRyeContextManager.NO_STRING, cleared, null, executor),
executor, injectionPointName);
}

Expand Down
Loading

0 comments on commit 0850664

Please sign in to comment.