Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hardening automatic context propagation #3549

Merged
merged 27 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5bb38a7
Hardening automatic context propagation
chemicL May 30, 2023
6c5a500
Cleared tests from previously removed code
chemicL Jul 27, 2023
75bd4d0
ParallelFlux impl, wrapping Publishers instead of Subscribers only
chemicL Aug 31, 2023
8cbeb28
spotless
chemicL Aug 31, 2023
f50ff50
FluxSubscribeOnValue marked as internal
chemicL Aug 31, 2023
9a9bb89
Sinks
chemicL Sep 6, 2023
2f8a5ac
cleaning up and unifying fuseable vs non-fuseable context restoring i…
chemicL Sep 8, 2023
6bebd69
mono fuseable variant
chemicL Sep 11, 2023
1c1f299
Removed unnecessary fuseable classes
chemicL Sep 11, 2023
031f550
Added Mono.create tests
chemicL Sep 11, 2023
9a17e3a
delayUntil and sinks support
chemicL Sep 12, 2023
772c5fd
Support for repeatWhen, retryWhen, subscribeOnCallable, switchOnFirst…
chemicL Sep 20, 2023
9350f8c
SourceProducer opeartors support (first batch)
chemicL Sep 20, 2023
898e0eb
Remaining source producer wrappings
chemicL Sep 25, 2023
1b15611
Cleanup
chemicL Sep 25, 2023
c7243fb
Replaced unnecessary completion stage handling with Subscriber wrapping
chemicL Sep 25, 2023
479f0c0
Cleanup
chemicL Sep 26, 2023
6f61641
Further cleanup
chemicL Sep 26, 2023
0ef68d7
Removed comment
chemicL Sep 26, 2023
9d6afb4
Internal Attr for INTERNAL_PRODUCER
chemicL Sep 28, 2023
95e776d
Refactoring tests
chemicL Sep 28, 2023
5b3dfbf
Delivering completion signals on another thread
chemicL Sep 28, 2023
b27c77b
Revert ParallelArraySource Operators.toFluxOrMono
chemicL Oct 2, 2023
2c452ee
reverted hooks tests changes
chemicL Oct 3, 2023
88ce2c6
Merge branch 'main' into non-core-flux-and-mono
chemicL Oct 3, 2023
39b699a
new method excludes for japicmp
chemicL Oct 3, 2023
a203598
from(ParallelFlux) is package private
chemicL Oct 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,23 @@ def japicmpReport = tasks.register('japicmpReport') {
}
doLast {
def reportFile = file("${project.buildDir}/reports/japi.txt")
if (reportFile.exists()) {
println "\n **********************************"
println " * /!\\ API compatibility failures *"
println " **********************************"
println "Japicmp report was filtered and interpreted to find the following incompatibilities:"
reportFile.eachLine {
if (it.contains("*") && (!it.contains("***") || it.contains("****")))
println "source incompatible change: $it"
else if (it.contains("!"))
println "binary incompatible change: $it"
}
if (reportFile.exists()) {
println "\n **********************************"
println " * /!\\ API compatibility failures *"
println " **********************************"
println "Japicmp report was filtered and interpreted to find the following incompatibilities:"
reportFile.eachLine {
if (it.contains("*") && (!it.contains("***") || it.contains("****"))) {
println "source incompatible change: $it"
}
else if (it.contains("!")) {
println "binary incompatible change: $it"
}
}
else println "No incompatible change to report"
}
else {
println "No incompatible change to report"
}
}
}

Expand Down Expand Up @@ -252,6 +256,9 @@ task japicmp(type: JapicmpTask) {
classExcludes = [
]
methodExcludes = [
"reactor.core.publisher.Operators#toFluxOrMono(org.reactivestreams.Publisher)",
"reactor.core.publisher.Operators#toFluxOrMono(org.reactivestreams.Publisher[])",
"reactor.core.publisher.ParallelFlux#from(reactor.core.publisher.ParallelFlux)"
]
}

Expand Down
5 changes: 1 addition & 4 deletions reactor-core/src/main/java/reactor/core/Scannable.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,14 +18,12 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.function.Function;
import java.util.regex.Pattern;
Expand All @@ -38,7 +36,6 @@
import reactor.core.scheduler.Scheduler.Worker;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/**
* A Scannable component exposes state in a non strictly memory consistent way and
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,7 +47,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return getPrefetch();
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
return super.scanUnsafe(key);
Copy link
Contributor

@OlegDokuka OlegDokuka Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the minor outcome is that super scan will go through the same attr one more time (hey, potential small perf penalty). So... may be it worth just adding attr explicitly as it was before

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the individual implementations should not duplicate what the parent does, just enhancements and changes should be reported.

Copy link
Contributor

@OlegDokuka OlegDokuka Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is for common cases. Here, in low-level library, that was likely done performance wise. Even it was not important before, now we would have to be sure that by doing common OOP stuff we don't impact performance on the hot path (since now that code is part of the hot path and any redundancy will add significant impact at scale).
Such thing may invisible for latency of a single request, but at scale, especially in the case when just few Netty threads are doing all the work such small Impact in 0,00001% will have %1 overall perf degradation for 100_000 queued requests which would have to be served by the same number of threads. Thus such unthoughtful change may impact someones production.

Copy link
Member Author

@chemicL chemicL Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning you give assumes that 1 thread is handling the queue. If you keep maintaining the queue filled at 100k pending requests constantly it means you're in much bigger trouble. To put things into perspective assume mean time to handle a request is 1ms. That means you'd wait 100s for a request to start being processed. That 0,00001% impact, or 1% overall impact is honestly meaningless to you at that point. Your request is already waiting for 1 minute 40 seconds. It doesn't matter you'd wait an additional second, having the end result being 1 minute 41 seconds.

Putting that discussion aside, as you suggest, I will see whether inlining kicks in, as I'd expect it should with such short methods, but if this is a tricky situation to the JIT, we'll see that in JMH tests, which I'm also going to perform.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,7 +72,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.ACTUAL_METADATA) return !stacktrace.isCheckpoint;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
return super.scanUnsafe(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,7 +54,8 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.RUN_STYLE) return Scannable.from(source).scanUnsafe(key);
if (key == Attr.LIFTER) return liftFunction.name;
return null;

return super.scanUnsafe(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,8 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return source;
if (key == Attr.RUN_STYLE) return Scannable.from(source).scanUnsafe(key);
if (key == Attr.LIFTER) return liftFunction.name;
return null;

return super.scanUnsafe(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ final class ContextPropagation {
}
}

static <T> Flux<T> fluxRestoreThreadLocals(Flux<? extends T> flux) {
return new FluxContextWriteRestoringThreadLocals<>(flux, Function.identity());
}

static <T> Mono<T> monoRestoreThreadLocals(Mono<? extends T> mono) {
return new MonoContextWriteRestoringThreadLocals<>(mono, Function.identity());
}

static void configureContextSnapshotFactory(boolean clearMissing) {
if (ContextPropagationSupport.isContextPropagation103OnClasspath) {
globalContextSnapshotFactory = ContextSnapshotFactory.builder()
Expand Down Expand Up @@ -117,6 +125,8 @@ public static Function<Runnable, Runnable> scopePassingOnScheduleHook() {
};
}



/**
* Create a support function that takes a snapshot of thread locals and merges them with the
* provided {@link Context}, resulting in a new {@link Context} which includes entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package reactor.core.publisher;

import java.util.function.Function;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CorePublisher;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.Logger;
import reactor.util.Loggers;

Expand Down Expand Up @@ -65,6 +72,11 @@ static boolean shouldPropagateContextToThreadLocals() {
return isContextPropagationOnClasspath && propagateContextToThreadLocals;
}

static boolean shouldWrapPublisher(Publisher<?> publisher) {
return shouldPropagateContextToThreadLocals() &&
!Scannable.from(publisher).scanOrDefault(InternalProducerAttr.INSTANCE, false);
}

static boolean shouldRestoreThreadLocalsInSomeOperators() {
return isContextPropagationOnClasspath && !propagateContextToThreadLocals;
}
Expand Down
34 changes: 24 additions & 10 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package reactor.core.publisher;

import java.io.File;
import java.lang.reflect.ParameterizedType;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -44,6 +46,7 @@
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

Expand Down Expand Up @@ -1060,7 +1063,7 @@ public static <I> Flux<I> firstWithValue(Publisher<? extends I> first, Publisher
*/
public static <T> Flux<T> from(Publisher<? extends T> source) {
//duplicated in wrap, but necessary to detect early and thus avoid applying assembly
if (source instanceof Flux) {
if (source instanceof Flux && !ContextPropagationSupport.shouldWrapPublisher(source)) {
@SuppressWarnings("unchecked")
Flux<T> casted = (Flux<T>) source;
return casted;
Expand Down Expand Up @@ -8770,6 +8773,7 @@ public final void subscribe(Subscriber<? super T> actual) {
}
}

subscriber = Operators.restoreContextOnSubscriberIfPublisherNonInternal(publisher, subscriber);
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Expand Down Expand Up @@ -11065,8 +11069,12 @@ static <A, B> BiFunction<A, B, Tuple2<A, B>> tuple2Function() {
*/
@SuppressWarnings("unchecked")
static <I> Flux<I> wrap(Publisher<? extends I> source) {
if (source instanceof Flux) {
return (Flux<I>) source;
boolean shouldWrap = ContextPropagationSupport.shouldWrapPublisher(source);
if (source instanceof Flux) {
if (!shouldWrap) {
return (Flux<I>) source;
}
return ContextPropagation.fluxRestoreThreadLocals((Flux<? extends I>) source);
}

//for scalars we'll instantiate the operators directly to avoid onAssembly
Expand All @@ -11084,16 +11092,22 @@ static <I> Flux<I> wrap(Publisher<? extends I> source) {
}
}

if(source instanceof Mono){
if(source instanceof Fuseable){
return new FluxSourceMonoFuseable<>((Mono<I>)source);
Flux<I> target;
if (source instanceof Mono) {
if (source instanceof Fuseable) {
target = new FluxSourceMonoFuseable<>((Mono<I>) source);
} else {
target = new FluxSourceMono<>((Mono<I>) source);
}
return new FluxSourceMono<>((Mono<I>)source);
} else if (source instanceof Fuseable) {
target = new FluxSourceFuseable<>(source);
} else {
target = new FluxSource<>(source);
}
if(source instanceof Fuseable){
return new FluxSourceFuseable<>(source);
if (shouldWrap) {
return ContextPropagation.fluxRestoreThreadLocals(target);
}
return new FluxSource<>(source);
return target;
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,8 +64,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
public Object scanUnsafe(Attr key) {
if (key == Attr.BUFFERED) return array.length;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
return SourceProducer.super.scanUnsafe(key);
}

static final class ArraySubscription<T>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -431,7 +431,7 @@ public Object scanUnsafe(Attr key) {
if (key == Attr.ERROR) return errors;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return null;
return InnerOperator.super.scanUnsafe(key);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,6 @@ public T call() throws Exception {
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
return SourceProducer.super.scanUnsafe(key);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -166,6 +166,8 @@ else if (!(actual instanceof QueueSubscription)) {
}
}

Operators.toFluxOrMono(a);

Queue<SourceAndArray> queue = queueSupplier.get();

CombineLatestCoordinator<T, R> coordinator =
Expand All @@ -180,7 +182,7 @@ else if (!(actual instanceof QueueSubscription)) {
public Object scanUnsafe(Attr key) {
if (key == Attr.PREFETCH) return prefetch;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
return SourceProducer.super.scanUnsafe(key);
}

static final class CombineLatestCoordinator<T, R>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,7 +61,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
if (p == null) {
Operators.error(actual, new NullPointerException("The single source Publisher is null"));
} else {
p.subscribe(actual);
Operators.toFluxOrMono(p).subscribe(actual);
}
return;
}
Expand All @@ -83,7 +83,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
public Object scanUnsafe(Attr key) {
if (key == Attr.DELAY_ERROR) return delayError;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
return SourceProducer.super.scanUnsafe(key);
}

/**
Expand Down Expand Up @@ -255,7 +255,7 @@ public void onComplete() {
if (this.cancelled) {
return;
}
p.subscribe(this);
Operators.toFluxOrMono(p).subscribe(this);

final Object state = this.get();
if (state != DONE) {
Expand Down Expand Up @@ -440,7 +440,7 @@ public void onComplete() {
return;
}

p.subscribe(this);
Operators.toFluxOrMono(p).subscribe(this);

final Object state = this.get();
if (state != DONE) {
Expand Down
Loading
Loading