Skip to content

Commit

Permalink
rework to detect resetOnNext by instanceof on the Function
Browse files Browse the repository at this point in the history
the API for retryBackoff still needs to expose the boolean for now
  • Loading branch information
simonbasle committed Dec 4, 2019
1 parent 9aeb77b commit fcd92c0
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 188 deletions.
50 changes: 1 addition & 49 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -7228,54 +7228,6 @@ public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
return onAssembly(new FluxRetryWhen<>(this, whenFactory));
}

/**
* Retries this {@link Flux} when a companion sequence signals
* an item in response to this {@link Flux} error signal
* <p>If the companion sequence signals when the {@link Flux} is active, the retry
* attempt is suppressed and any terminal signal will terminate the {@link Flux} source with the same signal
* immediately.
*
* <p>
* <img class="marble" src="doc-files/marbles/retryWhenForFlux.svg" alt="">
* <p>
* Note that if the companion {@link Publisher} created by the {@code whenFactory}
* emits {@link Context} as trigger objects, these {@link Context} will REPLACE the
* operator's own Context. <strong>Please be careful there</strong>: replacing the
* Context means that some keys you don't own could be removed, breaking libraries
* that depend on them. As a result, the recommended approach is to always create such
* a {@link Context} trigger by starting from the original Context (ensuring the trigger
* contains all the keys from the original, unless you absolutely know you want to
* remove one of these keys):
* <pre><code>
* .retryWhen(errorCurrentAttempt -> errorCurrentAttempt
* .flatMap(e -> Mono.subscriberContext().map(ctx -> Tuples.of(e, ctx)))
* .flatMap(t2 -> {
* Throwable lastError = t2.getT1();
* Context ctx = t2.getT2();
* int rl = ctx.getOrDefault("retriesLeft", 0);
* if (rl > 0) {
* // /!\ THE ctx.put HERE IS THE ESSENTIAL PART /!\
* return Mono.just(ctx.put("retriesLeft", rl - 1)
* .put("lastError", lastError));
* } else {
* return Mono.<Context>error(new IllegalStateException("retries exhausted", lastError));
* }
* })
* )
* </code></pre>
*
* @param whenFactory the {@link Function} that returns the associated {@link Publisher}
* companion, given a {@link Flux} that signals each onError as a {@link Throwable}.
* @param resetOnNext true to re-apply the factory the first time an element is received <strong>after</strong> a retry.
* this can help with transient errors in long-lived Flux
*
* @return a {@link Flux} that retries on onError when the companion {@link Publisher} produces an
* onNext signal
*/
public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory, boolean resetOnNext) {
return onAssembly(new FluxRetryWhen<>(this, whenFactory, resetOnNext));
}

/**
* In case of error, retry this {@link Flux} up to {@code numRetries} times using a
* randomized exponential backoff strategy (jitter). The jitter factor is {@code 50%}
Expand Down Expand Up @@ -7504,7 +7456,7 @@ public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Durati
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor, Scheduler backoffScheduler, boolean resetOnNext) {
return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor, backoffScheduler), resetOnNext);
return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor, backoffScheduler, resetOnNext));
}

/**
Expand Down
160 changes: 87 additions & 73 deletions reactor-core/src/main/java/reactor/core/publisher/FluxRetryWhen.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,11 @@ final class FluxRetryWhen<T> extends InternalFluxOperator<T, T> {
static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);

final Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory;
final boolean resetOnNext;

FluxRetryWhen(Flux<? extends T> source,
Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory) {
this(source, whenSourceFactory, false);
}

FluxRetryWhen(Flux<? extends T> source,
Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory,
boolean resetOnNext) {
super(source);
this.whenSourceFactory = Objects.requireNonNull(whenSourceFactory, "whenSourceFactory");
this.resetOnNext = resetOnNext;
}

/**
Expand All @@ -70,9 +62,9 @@ final class FluxRetryWhen<T> extends InternalFluxOperator<T, T> {
* @param <T> the type of the main sequence
* @return true if the reset went well, false if an error was propagated to downstream
*/
static <T> boolean resetTrigger(@Nullable final RetryWhenOtherSubscriber other,
static <T> boolean tryResetTrigger(@Nullable final RetryWhenOtherSubscriber other,
@Nullable final CoreSubscriber<? super T> downstream,
@Nullable final Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory) {
@Nullable final Function<? super Flux<Throwable>, Publisher<Long>> whenSourceFactory) {
if (other == null || downstream == null || whenSourceFactory == null) {
return true;
}
Expand All @@ -91,7 +83,7 @@ static <T> boolean resetTrigger(@Nullable final RetryWhenOtherSubscriber other,

static <T> void subscribe(CoreSubscriber<? super T> s,
Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory,
CorePublisher<? extends T> source, boolean resetOnNext) {
CorePublisher<? extends T> source) {
RetryWhenOtherSubscriber other = new RetryWhenOtherSubscriber();
Subscriber<Throwable> signaller = Operators.serialize(other.completionSignal);

Expand All @@ -100,19 +92,27 @@ static <T> void subscribe(CoreSubscriber<? super T> s,
CoreSubscriber<T> serial = Operators.serialize(s);

RetryWhenMainSubscriber<T> main;
if (resetOnNext) {
main = new RetryWhenMainSubscriber<>(serial, signaller, source, other, whenSourceFactory);
if (whenSourceFactory instanceof TransientResubscribeWhenFunction) {
Function<? super Flux<Throwable>, Publisher<Long>> transientFunction = (Function<? super Flux<Throwable>, Publisher<Long>>) whenSourceFactory;
main = new RetryWhenMainSubscriber<>(serial, signaller, source, other,transientFunction);
}
else {
main = new RetryWhenMainSubscriber<>(serial, signaller, source, null, null);
main = new RetryWhenMainSubscriber<>(serial, signaller, source);
}
other.main = main;

other.main = main;
serial.onSubscribe(main);

if (!resetTrigger(other, s, whenSourceFactory)) {
//this behavior is copied in `tryResetTrigger` for the transient case
Publisher<?> p;
try {
p = Objects.requireNonNull(whenSourceFactory.apply(other), "The whenSourceFactory returned a null Publisher");
}
catch (Throwable e) {
s.onError(Operators.onOperatorError(e, s.currentContext()));
return;
}
p.subscribe(other);

if (!main.cancelled) {
source.subscribe(main);
Expand All @@ -121,7 +121,7 @@ static <T> void subscribe(CoreSubscriber<? super T> s,

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
subscribe(actual, whenSourceFactory, source, resetOnNext);
subscribe(actual, whenSourceFactory, source);
return null;
}

Expand All @@ -135,7 +135,7 @@ static final class RetryWhenMainSubscriber<T> extends
final CorePublisher<? extends T> source;

@Nullable
final Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory;
final Function<? super Flux<Throwable>, Publisher<Long>> transientWhenSourceFactory;
@Nullable
final RetryWhenOtherSubscriber other;

Expand All @@ -156,17 +156,23 @@ static final class RetryWhenMainSubscriber<T> extends
Subscriber<Throwable> signaller,
CorePublisher<? extends T> source,
@Nullable RetryWhenOtherSubscriber other,
@Nullable Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory) {
@Nullable Function<? super Flux<Throwable>, Publisher<Long>> transientWhenSourceFactory) {
super(actual);
this.signaller = signaller;
this.source = source;
this.otherArbiter = new Operators.SwapSubscription(true);
this.context = actual.currentContext();
this.other = other;
this.whenSourceFactory = whenSourceFactory;
this.transientWhenSourceFactory = transientWhenSourceFactory;
this.resetTriggerOnNextElement = false;
}

RetryWhenMainSubscriber(CoreSubscriber<? super T> actual,
Subscriber<Throwable> signaller,
CorePublisher<? extends T> source) {
this(actual, signaller, source, null, null);
}

@Override
public Context currentContext() {
return this.context;
Expand All @@ -193,7 +199,7 @@ void swap(Subscription w) {
public void onNext(T t) {
if (resetTriggerOnNextElement) {
resetTriggerOnNextElement = false; //we don't want to reset for subsequent onNext
if (!FluxRetryWhen.resetTrigger(other, actual, whenSourceFactory)) {
if (!FluxRetryWhen.tryResetTrigger(other, actual, transientWhenSourceFactory)) {
return;
}
}
Expand Down Expand Up @@ -316,64 +322,72 @@ public DirectProcessor<Throwable> source() {
}
}

@FunctionalInterface
interface TransientResubscribeWhenFunction<T> extends Function<Flux<T>, Publisher<Long>> {
}

static Function<Flux<Throwable>, Publisher<Long>> randomExponentialBackoffFunction(
long numRetries, Duration firstBackoff, Duration maxBackoff,
double jitterFactor, Scheduler backoffScheduler) {
double jitterFactor, Scheduler backoffScheduler,
boolean isTransient) {
if (jitterFactor < 0 || jitterFactor > 1) throw new IllegalArgumentException("jitterFactor must be between 0 and 1 (default 0.5)");
Objects.requireNonNull(firstBackoff, "firstBackoff is required");
Objects.requireNonNull(maxBackoff, "maxBackoff is required");
Objects.requireNonNull(backoffScheduler, "backoffScheduler is required");

return t -> t.index()
.flatMap(t2 -> {
long iteration = t2.getT1();

if (iteration >= numRetries) {
return Mono.<Long>error(new IllegalStateException("Retries exhausted: " + iteration + "/" + numRetries, t2.getT2()));
}

Duration nextBackoff;
try {
nextBackoff = firstBackoff.multipliedBy((long) Math.pow(2, iteration));
if (nextBackoff.compareTo(maxBackoff) > 0) {
nextBackoff = maxBackoff;
}
}
catch (ArithmeticException overflow) {
nextBackoff = maxBackoff;
}

//short-circuit delay == 0 case
if (nextBackoff.isZero()) {
return Mono.just(iteration);
}

ThreadLocalRandom random = ThreadLocalRandom.current();

long jitterOffset;
try {
jitterOffset = nextBackoff.multipliedBy((long) (100 * jitterFactor))
.dividedBy(100)
.toMillis();
}
catch (ArithmeticException ae) {
jitterOffset = Math.round(Long.MAX_VALUE * jitterFactor);
}
long lowBound = Math.max(firstBackoff.minus(nextBackoff)
.toMillis(), -jitterOffset);
long highBound = Math.min(maxBackoff.minus(nextBackoff)
.toMillis(), jitterOffset);

long jitter;
if (highBound == lowBound) {
if (highBound == 0) jitter = 0;
else jitter = random.nextLong(highBound);
}
else {
jitter = random.nextLong(lowBound, highBound);
}
Duration effectiveBackoff = nextBackoff.plusMillis(jitter);
return Mono.delay(effectiveBackoff, backoffScheduler);
});
TransientResubscribeWhenFunction<Throwable> function = t ->
t.index()
.flatMap(t2 -> {
long iteration = t2.getT1();

if (iteration >= numRetries) {
return Mono.<Long>error(new IllegalStateException("Retries exhausted: " + iteration + "/" + numRetries, t2.getT2()));
}

Duration nextBackoff;
try {
nextBackoff = firstBackoff.multipliedBy((long) Math.pow(2, iteration));
if (nextBackoff.compareTo(maxBackoff) > 0) {
nextBackoff = maxBackoff;
}
}
catch (ArithmeticException overflow) {
nextBackoff = maxBackoff;
}

//short-circuit delay == 0 case
if (nextBackoff.isZero()) {
return Mono.just(iteration);
}

ThreadLocalRandom random = ThreadLocalRandom.current();

long jitterOffset;
try {
jitterOffset = nextBackoff.multipliedBy((long) (100 * jitterFactor))
.dividedBy(100)
.toMillis();
}
catch (ArithmeticException ae) {
jitterOffset = Math.round(Long.MAX_VALUE * jitterFactor);
}
long lowBound = Math.max(firstBackoff.minus(nextBackoff)
.toMillis(), -jitterOffset);
long highBound = Math.min(maxBackoff.minus(nextBackoff)
.toMillis(), jitterOffset);

long jitter;
if (highBound == lowBound) {
if (highBound == 0) jitter = 0;
else jitter = random.nextLong(highBound);
}
else {
jitter = random.nextLong(lowBound, highBound);
}
Duration effectiveBackoff = nextBackoff.plusMillis(jitter);
return Mono.delay(effectiveBackoff, backoffScheduler);
});
if (isTransient) return function;
return function::apply; //do not replace with qualifier, so that instanceof doesn't consider this one transient
}
}
30 changes: 2 additions & 28 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -3693,32 +3693,6 @@ public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
return onAssembly(new MonoRetryWhen<>(this, whenFactory));
}

/**
* Retries this {@link Mono} when a companion sequence signals
* an item in response to this {@link Mono} error signal
* <p>If the companion sequence signals when the {@link Mono} is active, the retry
* attempt is suppressed and any terminal signal will terminate the {@link Mono} source with the same signal
* immediately.
*
* <p>
* <img class="marble" src="doc-files/marbles/retryWhenForMono.svg" alt="">
* <p>
* Note that if the companion {@link Publisher} created by the {@code whenFactory}
* emits {@link Context} as trigger objects, the content of these Context will be added
* to the operator's own {@link Context}.
*
* @param whenFactory the {@link Function} that returns the associated {@link Publisher}
* companion, given a {@link Flux} that signals each onError as a {@link Throwable}.
* @param resetOnNext true to re-apply the factory the first time an element is received <strong>after</strong> a retry.
* this can help with transient errors in long-lived Flux
*
* @return a {@link Mono} that retries on onError when the companion {@link Publisher} produces an
* onNext signal
*/
public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory, boolean resetOnNext) {
return onAssembly(new MonoRetryWhen<>(this, whenFactory, resetOnNext));
}

/**
* In case of error, retry this {@link Mono} up to {@code numRetries} times using a
* randomized exponential backoff strategy (jitter). The jitter factor is {@code 50%}
Expand Down Expand Up @@ -3903,7 +3877,7 @@ public final Mono<T> retryBackoff(long numRetries, Duration firstBackoff, Durati
* @return a {@link Mono} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Mono<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor, Scheduler backoffScheduler) {
return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor, backoffScheduler));
return retryBackoff(numRetries, firstBackoff, maxBackoff, jitterFactor, backoffScheduler, false);
}

/**
Expand Down Expand Up @@ -3947,7 +3921,7 @@ public final Mono<T> retryBackoff(long numRetries, Duration firstBackoff, Durati
* @return a {@link Mono} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Mono<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor, Scheduler backoffScheduler, boolean resetOnNext) {
return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor, backoffScheduler), resetOnNext);
return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor, backoffScheduler, resetOnNext));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,17 @@
final class MonoRetryWhen<T> extends InternalMonoOperator<T, T> {

final Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory;
final boolean resetOnNext;

MonoRetryWhen(Mono<? extends T> source,
Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory) {
this(source, whenSourceFactory, false);
}

MonoRetryWhen(Mono<? extends T> source,
Function<? super Flux<Throwable>, ? extends Publisher<?>> whenSourceFactory,
boolean resetOnNext) {
super(source);
this.whenSourceFactory =
Objects.requireNonNull(whenSourceFactory, "whenSourceFactory");
this.resetOnNext = resetOnNext;
}

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
FluxRetryWhen.subscribe(actual, whenSourceFactory, source, resetOnNext);
FluxRetryWhen.subscribe(actual, whenSourceFactory, source);
return null;
}
}
Loading

0 comments on commit fcd92c0

Please sign in to comment.