Skip to content

Commit

Permalink
Merge #3307 into 3.5.1
Browse files Browse the repository at this point in the history
Signed-off-by: OlegDokuka <[email protected]>
  • Loading branch information
Oleh Dokuka committed Dec 12, 2022
2 parents 69450bb + cfe1979 commit 558173d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ public Flux<Long> generateCompanion(Flux<RetrySignal> t) {
//short-circuit delay == 0 case
if (nextBackoff.isZero()) {
return RetrySpec.applyHooks(copy, Mono.just(iteration),
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry);
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv);
}

ThreadLocalRandom random = ThreadLocalRandom.current();
Expand Down Expand Up @@ -602,9 +602,8 @@ public Flux<Long> generateCompanion(Flux<RetrySignal> t) {
jitter = random.nextLong(lowBound, highBound);
}
Duration effectiveBackoff = nextBackoff.plusMillis(jitter);
return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff,
backoffSchedulerSupplier.get()),
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry);
return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff, backoffSchedulerSupplier.get()),
syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv);
})
.contextWrite(c -> Context.empty())
);
Expand Down
7 changes: 4 additions & 3 deletions reactor-core/src/main/java/reactor/util/retry/RetrySpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ else if (iteration >= maxAttempts) {
return Mono.error(retryExhaustedGenerator.apply(this, copy));
}
else {
return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry);
return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry, cv);
}
})
.contextWrite(c -> Context.empty())
Expand All @@ -389,7 +389,8 @@ static <T> Mono<T> applyHooks(RetrySignal copyOfSignal,
final Consumer<RetrySignal> doPreRetry,
final Consumer<RetrySignal> doPostRetry,
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPreRetry,
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPostRetry) {
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPostRetry,
final ContextView cv) {
if (doPreRetry != NO_OP_CONSUMER) {
try {
doPreRetry.accept(copyOfSignal);
Expand All @@ -410,6 +411,6 @@ static <T> Mono<T> applyHooks(RetrySignal copyOfSignal,
Mono<Void> preRetryMono = asyncPreRetry == NO_OP_BIFUNCTION ? Mono.empty() : asyncPreRetry.apply(copyOfSignal, Mono.empty());
Mono<Void> postRetryMono = asyncPostRetry != NO_OP_BIFUNCTION ? asyncPostRetry.apply(copyOfSignal, postRetrySyncMono) : postRetrySyncMono;

return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn);
return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn).contextWrite(cv);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.assertj.core.api.Assertions;
Expand All @@ -39,16 +41,19 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.PublisherProbe;
import reactor.test.scheduler.VirtualTimeScheduler;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.function.Tuple2;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;
import reactor.util.retry.RetrySpec;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.BDDMockito.given;

public class FluxRetryWhenTest {

Expand All @@ -58,6 +63,45 @@ public class FluxRetryWhenTest {
Flux<Integer> rangeError = Flux.concat(Flux.range(1, 2),
Flux.error(new RuntimeException("forced failure 0")));

@Test
// https://github.com/reactor/reactor-core/issues/3314
void ensuresContextIsRestoredInRetryFunctions() {
PublisherProbe<Void> doBeforeRetryProbe = PublisherProbe.empty();
AtomicReference<ContextView> capturedContext = new AtomicReference<>();

RetrySpec spec = Retry.max(1)
.doBeforeRetryAsync(
retrySignal ->
Mono.deferContextual(cv -> {
capturedContext.set(cv);
return doBeforeRetryProbe.mono();
})
);

Context context = Context.of("test", "test");

Mono.defer(new Supplier<Mono<?>>() {
int index = 0;

@Override
public Mono<?> get() {
if (index++ == 0) {
return Mono.error(new RuntimeException());
} else {
return Mono.just("someValue");
}
}
})
.retryWhen(spec)
.contextWrite(context)
.as(StepVerifier::create)
.expectNext("someValue")
.verifyComplete();

doBeforeRetryProbe.assertWasSubscribed();
assertThat(capturedContext).hasValueMatching(c -> c.hasKey("test"));
}

@Test
//https://github.com/reactor/reactor-core/issues/3253
public void shouldFailWhenOnErrorContinueEnabled() {
Expand Down

0 comments on commit 558173d

Please sign in to comment.