Skip to content

Commit

Permalink
Merge #3665 into 3.6.2
Browse files Browse the repository at this point in the history
Signed-off-by: OlegDokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Dec 14, 2023
2 parents c60f682 + 24f04bc commit 5077243
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.concurrent.ForkJoinPool;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.II_Result;
import reactor.core.scheduler.Schedulers;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.Queues;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

public class FluxPublishOnStressTest {

@JCStressTest
@Outcome(id = {"0, 1", "0, 0"}, expect = ACCEPTABLE, desc = "no errors propagated after cancellation because of disposed worker")
@State
public static class FluxPublishOnOnNextAndCancelRaceStressTest {
final StressSubscription<Integer> upstream = new StressSubscription<>(null);
final StressSubscriber<Integer> downstream = new StressSubscriber<>();
final Scheduler scheduler =
Schedulers.fromExecutorService(ForkJoinPool.commonPool());

final FluxPublishOn.PublishOnSubscriber<Integer> publishOnSubscriber =
new FluxPublishOn.PublishOnSubscriber<>(downstream,
scheduler,
scheduler.createWorker(), true, 32, 12, Queues.get(32));


{
publishOnSubscriber.onSubscribe(upstream);
}

@Actor
public void produce() {
publishOnSubscriber.onNext(1);
publishOnSubscriber.onNext(2);
publishOnSubscriber.onNext(3);
publishOnSubscriber.onNext(4);
}

@Actor
public void cancel() {
publishOnSubscriber.cancel();
}

@Arbiter
public void arbiter(II_Result result) {
result.r1 = downstream.onErrorCalls.get();
result.r2 = downstream.droppedErrors.size();
}
}

@JCStressTest
@Outcome(id = {"0, 1", "0, 0"}, expect = ACCEPTABLE, desc = "no errors propagated after cancellation because of disposed worker")
@State
public static class FluxPublishOnConditionalOnNextAndCancelRaceStressTest {
final StressSubscription<Integer> upstream = new StressSubscription<>(null);
final ConditionalStressSubscriber<Integer> downstream = new ConditionalStressSubscriber<>();
final Scheduler scheduler =
Schedulers.fromExecutorService(ForkJoinPool.commonPool());

final FluxPublishOn.PublishOnConditionalSubscriber<Integer> publishOnSubscriber =
new FluxPublishOn.PublishOnConditionalSubscriber<>(downstream,
scheduler,
scheduler.createWorker(), true, 32, 12, Queues.get(32));


{
publishOnSubscriber.onSubscribe(upstream);
}

@Actor
public void produce() {
publishOnSubscriber.onNext(1);
publishOnSubscriber.onNext(2);
publishOnSubscriber.onNext(3);
publishOnSubscriber.onNext(4);
}

@Actor
public void cancel() {
publishOnSubscriber.cancel();
}

@Arbiter
public void arbiter(II_Result result) {
result.r1 = downstream.onErrorCalls.get();
result.r2 = downstream.droppedErrors.size();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -324,6 +324,12 @@ void trySchedule(
// In all other modes we are free to discard queue immediately since there is no racing on pooling
Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);
}

if (cancelled) {
Operators.onErrorDropped(ree, actual.currentContext());
return;
}

actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal,
actual.currentContext()));
}
Expand Down Expand Up @@ -884,6 +890,12 @@ void trySchedule(
// In all other modes we are free to discard queue immediately since there is no racing on pooling
Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);
}

if (cancelled) {
Operators.onErrorDropped(ree, actual.currentContext());
return;
}

actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal,
actual.currentContext()));
}
Expand Down

0 comments on commit 5077243

Please sign in to comment.