Skip to content

Commit

Permalink
Hardening automatic context propagation (#3549)
Browse files Browse the repository at this point in the history
This change addresses Flux, Mono, and Publisher implementations that are
implemented outside of reactor-core for the purposes of automatic restoration
of ThreadLocal state from the Context.

Arbitrary Flux, Mono, or raw Publisher implementations can deliver signals from
any Thread they desire, with no need to use Scheduler-controlled Threads. The
implementations in reactor-core, whenever they switch Threads or when they
receive signals out-of-band (e.g. request(n)) restore the ThreadLocal state.
Recently, factories for Flux and Mono creation from external sources, such as a
raw Publisher, or Future implementations, have added guards via a dedicated
ThreadLocal-restoring Subscriber to deal with signals from the sources that are
not performing the restoration. However, custom Flux, Mono, and Publisher
sources not wrapped via from() method can deliver signals without the
ThreadLocal state restored. This is the case with reactor-netty's Flux
implementation delivering signals from Netty's event loop. Also, while wrapping
the sources obtained via Flux.from and Mono.from factories works fine, plenty
of operators, such as flatMap, accept a raw Publisher and don't provide any
guard against changing Thread, which means these operators need to be
addressed.

This change applies wrapping of either the Producer or Subscriber. It is driven
by a Scannable flag on the Publisher. We do it in these scenarios:

1. Wrap the Publisher when we assemble an operator at runtime of another
operator (flatMap for example).
2. Wrap the Subscriber at operators that allow
the user to directly emit signals from a different Thread (Sinks for example).
3. Wrap the Subscriber when subscribe(Subscriber) from RS Publisher is called
and the source is not protected.

The following changes have been introduced:

* Added an internal Scannable attribute: INTERNAL_PRODUCER, which allows
  determining if a Flux or Mono is a reactor-core implementation.
* Reviewed Scannable implementations to report INTERNAL_PRODUCER properly for
  internal implementations.
* Introduced new methods in Operators class, which allow
  wrapping the Subscriber with a context restoration guard whenever the
  Publisher is not an INTERNAL_PRODUCER and allow wrapping the Publisher
  directly by applying an operator that restores ThreadLocals when signals
  travel downstream.
* In Flux, Mono, InternalFluxPublisher,
  InternalMonoPublisher, FluxFromMonoOperator, MonoFromFluxOperator,
  MonoFromPublisher, the subscribe(Subscriber) method from Publisher uses
  Subscriber wrapping as a last-resort chance to restore ThreadLocals. In case
  subscribe(CoreSubscriber) is used, we have no way to intercept it, as it is
  abstract, so we need other means of wrapping too.
* Flux.from, Mono.from, Mono.fromDirect and their respective wrap() methods
  also apply the wrapping.
* In operators which receive a Publisher from the user during runtime (for
  instance flatMap receives it as a return value of a lambda provided by the
  user), either Operators.toFluxOrMono is used or direct type wrapping when it
  is known what is expected.
* Added plenty of tests that depict the issue for
  the identified cases that are able to accept a Publisher that can switch a
  Thread when delivering signals.

Resolves #3478.
  • Loading branch information
chemicL authored Oct 3, 2023
1 parent 9b6a53e commit 12926b9
Show file tree
Hide file tree
Showing 134 changed files with 2,316 additions and 751 deletions.
31 changes: 19 additions & 12 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,23 @@ def japicmpReport = tasks.register('japicmpReport') {
}
doLast {
def reportFile = file("${project.buildDir}/reports/japi.txt")
if (reportFile.exists()) {
println "\n **********************************"
println " * /!\\ API compatibility failures *"
println " **********************************"
println "Japicmp report was filtered and interpreted to find the following incompatibilities:"
reportFile.eachLine {
if (it.contains("*") && (!it.contains("***") || it.contains("****")))
println "source incompatible change: $it"
else if (it.contains("!"))
println "binary incompatible change: $it"
}
if (reportFile.exists()) {
println "\n **********************************"
println " * /!\\ API compatibility failures *"
println " **********************************"
println "Japicmp report was filtered and interpreted to find the following incompatibilities:"
reportFile.eachLine {
if (it.contains("*") && (!it.contains("***") || it.contains("****"))) {
println "source incompatible change: $it"
}
else if (it.contains("!")) {
println "binary incompatible change: $it"
}
}
else println "No incompatible change to report"
}
else {
println "No incompatible change to report"
}
}
}

Expand Down Expand Up @@ -252,6 +256,9 @@ task japicmp(type: JapicmpTask) {
classExcludes = [
]
methodExcludes = [
"reactor.core.publisher.Operators#toFluxOrMono(org.reactivestreams.Publisher)",
"reactor.core.publisher.Operators#toFluxOrMono(org.reactivestreams.Publisher[])",
"reactor.core.publisher.ParallelFlux#from(reactor.core.publisher.ParallelFlux)"
]
}

Expand Down
5 changes: 1 addition & 4 deletions reactor-core/src/main/java/reactor/core/Scannable.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,14 +18,12 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.function.Function;
import java.util.regex.Pattern;
Expand All @@ -38,7 +36,6 @@
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/**
* A Scannable component exposes state in a non strictly memory consistent way and
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,7 +47,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return getPrefetch();
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
return super.scanUnsafe(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,7 +72,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.ACTUAL_METADATA) return !stacktrace.isCheckpoint;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
return super.scanUnsafe(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,7 +54,8 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.RUN_STYLE) return Scannable.from(source).scanUnsafe(key);
if (key == Attr.LIFTER) return liftFunction.name;
return null;

return super.scanUnsafe(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,8 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.RUN_STYLE) return Scannable.from(source).scanUnsafe(key);
if (key == Attr.LIFTER) return liftFunction.name;
return null;

return super.scanUnsafe(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ final class ContextPropagation {
}
}

static <T> Flux<T> fluxRestoreThreadLocals(Flux<? extends T> flux) {
return new FluxContextWriteRestoringThreadLocals<>(flux, Function.identity());
}

static <T> Mono<T> monoRestoreThreadLocals(Mono<? extends T> mono) {
return new MonoContextWriteRestoringThreadLocals<>(mono, Function.identity());
}

static void configureContextSnapshotFactory(boolean clearMissing) {
if (ContextPropagationSupport.isContextPropagation103OnClasspath) {
globalContextSnapshotFactory = ContextSnapshotFactory.builder()
Expand Down Expand Up @@ -117,6 +125,8 @@ public static Function<Runnable, Runnable> scopePassingOnScheduleHook() {
};
}



/**
* Create a support function that takes a snapshot of thread locals and merges them with the
* provided {@link Context}, resulting in a new {@link Context} which includes entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package reactor.core.publisher;

import java.util.function.Function;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CorePublisher;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.Logger;
import reactor.util.Loggers;

Expand Down Expand Up @@ -65,6 +72,11 @@ static boolean shouldPropagateContextToThreadLocals() {
return isContextPropagationOnClasspath && propagateContextToThreadLocals;
}

static boolean shouldWrapPublisher(Publisher<?> publisher) {
return shouldPropagateContextToThreadLocals() &&
!Scannable.from(publisher).scanOrDefault(InternalProducerAttr.INSTANCE, false);
}

static boolean shouldRestoreThreadLocalsInSomeOperators() {
return isContextPropagationOnClasspath && !propagateContextToThreadLocals;
}
Expand Down
34 changes: 24 additions & 10 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package reactor.core.publisher;

import java.io.File;
import java.lang.reflect.ParameterizedType;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -44,6 +46,7 @@
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

Expand Down Expand Up @@ -1060,7 +1063,7 @@ public static <I> Flux<I> firstWithValue(Publisher<? extends I> first, Publisher
*/
public static <T> Flux<T> from(Publisher<? extends T> source) {
//duplicated in wrap, but necessary to detect early and thus avoid applying assembly
if (source instanceof Flux) {
if (source instanceof Flux && !ContextPropagationSupport.shouldWrapPublisher(source)) {
@SuppressWarnings("unchecked")
Flux<T> casted = (Flux<T>) source;
return casted;
Expand Down Expand Up @@ -8770,6 +8773,7 @@ public final void subscribe(Subscriber<? super T> actual) {
}
}

subscriber = Operators.restoreContextOnSubscriberIfPublisherNonInternal(publisher, subscriber);
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Expand Down Expand Up @@ -11065,8 +11069,12 @@ static <A, B> BiFunction<A, B, Tuple2<A, B>> tuple2Function() {
*/
@SuppressWarnings("unchecked")
static <I> Flux<I> wrap(Publisher<? extends I> source) {
if (source instanceof Flux) {
return (Flux<I>) source;
boolean shouldWrap = ContextPropagationSupport.shouldWrapPublisher(source);
if (source instanceof Flux) {
if (!shouldWrap) {
return (Flux<I>) source;
}
return ContextPropagation.fluxRestoreThreadLocals((Flux<? extends I>) source);
}

//for scalars we'll instantiate the operators directly to avoid onAssembly
Expand All @@ -11084,16 +11092,22 @@ static <I> Flux<I> wrap(Publisher<? extends I> source) {
}
}

if(source instanceof Mono){
if(source instanceof Fuseable){
return new FluxSourceMonoFuseable<>((Mono<I>)source);
Flux<I> target;
if (source instanceof Mono) {
if (source instanceof Fuseable) {
target = new FluxSourceMonoFuseable<>((Mono<I>) source);
} else {
target = new FluxSourceMono<>((Mono<I>) source);
}
return new FluxSourceMono<>((Mono<I>)source);
} else if (source instanceof Fuseable) {
target = new FluxSourceFuseable<>(source);
} else {
target = new FluxSource<>(source);
}
if(source instanceof Fuseable){
return new FluxSourceFuseable<>(source);
if (shouldWrap) {
return ContextPropagation.fluxRestoreThreadLocals(target);
}
return new FluxSource<>(source);
return target;
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,8 +64,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
public Object scanUnsafe(Attr key) {
if (key == Attr.BUFFERED) return array.length;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
return SourceProducer.super.scanUnsafe(key);
}

static final class ArraySubscription<T>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -431,7 +431,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.ERROR) return errors;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
return InnerOperator.super.scanUnsafe(key);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,6 @@ public T call() throws Exception {
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
return SourceProducer.super.scanUnsafe(key);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -166,6 +166,8 @@ else if (!(actual instanceof QueueSubscription)) {
}
}

Operators.toFluxOrMono(a);

Queue<SourceAndArray> queue = queueSupplier.get();

CombineLatestCoordinator<T, R> coordinator =
Expand All @@ -180,7 +182,7 @@ else if (!(actual instanceof QueueSubscription)) {
public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return prefetch;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
return SourceProducer.super.scanUnsafe(key);
}

static final class CombineLatestCoordinator<T, R>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,7 +61,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
if (p == null) {
Operators.error(actual, new NullPointerException("The single source Publisher is null"));
} else {
p.subscribe(actual);
Operators.toFluxOrMono(p).subscribe(actual);
}
return;
}
Expand All @@ -83,7 +83,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
public Object scanUnsafe(Attr key) {
if (key == Attr.DELAY_ERROR) return delayError;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
return SourceProducer.super.scanUnsafe(key);
}

/**
Expand Down Expand Up @@ -255,7 +255,7 @@ public void onComplete() {
if (this.cancelled) {
return;
}
p.subscribe(this);
Operators.toFluxOrMono(p).subscribe(this);

final Object state = this.get();
if (state != DONE) {
Expand Down Expand Up @@ -440,7 +440,7 @@ public void onComplete() {
return;
}

p.subscribe(this);
Operators.toFluxOrMono(p).subscribe(this);

final Object state = this.get();
if (state != DONE) {
Expand Down
Loading

0 comments on commit 12926b9

Please sign in to comment.