From cfe1979e5387979bebbb9e084197917195746184 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Mon, 12 Dec 2022 11:48:36 +0200 Subject: [PATCH] ensures context restored in RetrySpec (#3316) closes https://github.com/reactor/reactor-core/issues/3314 This commit restores context for inner Flux inside concatMap, while the concatMap remains without context. It allows Retry methods to have context while onErrorContinue to be disabled for retry concatMap --- .../reactor/util/retry/RetryBackoffSpec.java | 7 ++- .../java/reactor/util/retry/RetrySpec.java | 7 +-- .../core/publisher/FluxRetryWhenTest.java | 44 +++++++++++++++++++ 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java index 540501b5cf..72baa73abf 100644 --- a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java +++ b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java @@ -574,7 +574,7 @@ public Flux generateCompanion(Flux t) { //short-circuit delay == 0 case if (nextBackoff.isZero()) { return RetrySpec.applyHooks(copy, Mono.just(iteration), - syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry); + syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv); } ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -602,9 +602,8 @@ public Flux generateCompanion(Flux t) { jitter = random.nextLong(lowBound, highBound); } Duration effectiveBackoff = nextBackoff.plusMillis(jitter); - return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff, - backoffSchedulerSupplier.get()), - syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry); + return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff, backoffSchedulerSupplier.get()), + syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry, cv); }) .contextWrite(c -> Context.empty()) ); diff --git a/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java b/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java index 710ff4de86..dc783537aa 100644 --- a/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java +++ b/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java @@ -373,7 +373,7 @@ else if (iteration >= maxAttempts) { return Mono.error(retryExhaustedGenerator.apply(this, copy)); } else { - return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry); + return applyHooks(copy, Mono.just(iteration), doPreRetry, doPostRetry, asyncPreRetry, asyncPostRetry, cv); } }) .contextWrite(c -> Context.empty()) @@ -389,7 +389,8 @@ static Mono applyHooks(RetrySignal copyOfSignal, final Consumer doPreRetry, final Consumer doPostRetry, final BiFunction, Mono> asyncPreRetry, - final BiFunction, Mono> asyncPostRetry) { + final BiFunction, Mono> asyncPostRetry, + final ContextView cv) { if (doPreRetry != NO_OP_CONSUMER) { try { doPreRetry.accept(copyOfSignal); @@ -410,6 +411,6 @@ static Mono applyHooks(RetrySignal copyOfSignal, Mono preRetryMono = asyncPreRetry == NO_OP_BIFUNCTION ? Mono.empty() : asyncPreRetry.apply(copyOfSignal, Mono.empty()); Mono postRetryMono = asyncPostRetry != NO_OP_BIFUNCTION ? asyncPostRetry.apply(copyOfSignal, postRetrySyncMono) : postRetrySyncMono; - return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn); + return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn).contextWrite(cv); } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java index 1eb0dbad55..87b54cb114 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java @@ -24,7 +24,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; @@ -39,6 +41,7 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; +import reactor.test.publisher.PublisherProbe; import reactor.test.scheduler.VirtualTimeScheduler; import reactor.test.subscriber.AssertSubscriber; import reactor.util.context.Context; @@ -46,9 +49,11 @@ import reactor.util.function.Tuple2; import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; +import reactor.util.retry.RetrySpec; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.BDDMockito.given; public class FluxRetryWhenTest { @@ -58,6 +63,45 @@ public class FluxRetryWhenTest { Flux rangeError = Flux.concat(Flux.range(1, 2), Flux.error(new RuntimeException("forced failure 0"))); + @Test + // https://github.com/reactor/reactor-core/issues/3314 + void ensuresContextIsRestoredInRetryFunctions() { + PublisherProbe doBeforeRetryProbe = PublisherProbe.empty(); + AtomicReference capturedContext = new AtomicReference<>(); + + RetrySpec spec = Retry.max(1) + .doBeforeRetryAsync( + retrySignal -> + Mono.deferContextual(cv -> { + capturedContext.set(cv); + return doBeforeRetryProbe.mono(); + }) + ); + + Context context = Context.of("test", "test"); + + Mono.defer(new Supplier>() { + int index = 0; + + @Override + public Mono get() { + if (index++ == 0) { + return Mono.error(new RuntimeException()); + } else { + return Mono.just("someValue"); + } + } + }) + .retryWhen(spec) + .contextWrite(context) + .as(StepVerifier::create) + .expectNext("someValue") + .verifyComplete(); + + doBeforeRetryProbe.assertWasSubscribed(); + assertThat(capturedContext).hasValueMatching(c -> c.hasKey("test")); + } + @Test //https://github.com/reactor/reactor-core/issues/3253 public void shouldFailWhenOnErrorContinueEnabled() {