Skip to content

Commit

Permalink
fix #1978 Introduce retry builder and state-Function-based retry
Browse files Browse the repository at this point in the history
This introduces a retry variant based on a `Function`, a bit like
retryWhen, except the input is not merely a `Throwable` but a
`RetrySignal`. This allows retry function to check if there was some
success (onNext) since last retry attempt, in which case the current
attempt can be interpreted as if this was the first ever error.

This is especially useful for cases where exponential backoff delays
should be reset, for long lived sequences that only see intermittent
bursts of errors.

The Function is actually provided through a `Supplier`, and one such
supplier is the newly introduced `Retry.Builder`.

The builder is more simple than the one in addons, but covers some good
ground. It allows predicate on either exponential backoff strategy or
simple retry strategy. In both cases one can also chose to consider
`transientError(boolean)` (reset on onNext). For the simple case, this
means that the remaining number of retries is reset in case of onNext.
For the exponential case, this means retry delay is reset to minimum
after an onNext.

Old `retryWhen` decorates the user provided function to only look at
the exception.

We have only 1 builder, that switches from simple to backoff as soon
as one of the backoff configuration methods are invoked. One cannot
easily switch back. Factory methods help select the backoff strategy
right away.

The API is based on a `Supplier<Function>` so that it is not constrained
on the provided `Retry.Builder`: anybody can easily write their own
builder of advanced retry functions.
  • Loading branch information
simonbasle committed Feb 7, 2020
1 parent 64d0ea2 commit e964a16
Show file tree
Hide file tree
Showing 9 changed files with 834 additions and 124 deletions.
52 changes: 46 additions & 6 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import reactor.util.function.Tuple7;
import reactor.util.function.Tuple8;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;

/**
* A Reactive Streams {@link Publisher} with rx operators that emits 0 to N elements, and then completes
Expand Down Expand Up @@ -7249,7 +7250,30 @@ public final Flux<T> retry(long numRetries, Predicate<? super Throwable> retryMa
* onNext signal
*/
public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
return onAssembly(new FluxRetryWhen<>(this, whenFactory));
Objects.requireNonNull(whenFactory, "whenFactory");
return onAssembly(new FluxRetryWhen<>(this, fluxRetryWhenState -> whenFactory.apply(fluxRetryWhenState.map(
Retry.RetrySignal::failure))));
}

/**
* Retries this {@link Flux} in case of errors, as configured by the {@link Function} supplied
* (typically a {@link Retry.Builder}, or a custom function derived from a companion flux of
* {@link Retry.RetrySignal}). The output is a {@link Publisher} that can emit an arbitrary object
* to signal a retry is allowed, and when the resubscription must occur.
* <p>
* Note that the {@link Retry.RetrySignal} state can be transient and change between each source
* {@link org.reactivestreams.Subscriber#onError(Throwable) onError} or
* {@link org.reactivestreams.Subscriber#onNext(Object) onNext}. If processed with a delay,
* this could lead to the represented state being out of sync with the state at which the retry
* was evaluated. Map it to {@link Retry.RetrySignal#retain()} right away to mediate this.
*
* @param strategySupplier a supplier of a retry {@link Function}, typically a {@link Retry.Builder} to configure retries
* @return a {@link Flux} that retries on onError
* @see Retry.Builder
* @see Retry.Builder#get()
*/
public final Flux<T> retry(Supplier<Function<Flux<Retry.RetrySignal>, Publisher<?>>> strategySupplier) {
return onAssembly(new FluxRetryWhen<>(this, strategySupplier.get()));
}

/**
Expand Down Expand Up @@ -7283,7 +7307,7 @@ public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
*/
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff) {
return retryBackoff(numRetries, firstBackoff, FluxRetryWhen.MAX_BACKOFF, 0.5d);
return retry(Retry.backoff(numRetries, firstBackoff));
}

/**
Expand Down Expand Up @@ -7317,9 +7341,12 @@ public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff) {
* minimum delay even taking jitter into account.
* @param maxBackoff the maximum delay to apply despite exponential growth and jitter.
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
* @deprecated use {@link #retry(Retry.Builder)} with a {@link Retry#backoff(long, Duration)} base
*/
@Deprecated
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff) {
return retryBackoff(numRetries, firstBackoff, maxBackoff, 0.5d);
return retry(Retry.backoff(numRetries, firstBackoff)
.maxBackoff(maxBackoff));
}

/**
Expand Down Expand Up @@ -7355,9 +7382,13 @@ public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Durati
* @param maxBackoff the maximum delay to apply despite exponential growth and jitter.
* @param backoffScheduler the {@link Scheduler} on which the delays and subsequent attempts are executed.
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
* @deprecated use {@link #retry(Retry.Builder)} with a {@link Retry#backoff(long, Duration)} base
*/
@Deprecated
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, Scheduler backoffScheduler) {
return retryBackoff(numRetries, firstBackoff, maxBackoff, 0.5d, backoffScheduler);
return retry(Retry.backoff(numRetries, firstBackoff)
.maxBackoff(maxBackoff)
.scheduler(backoffScheduler));
}

/**
Expand Down Expand Up @@ -7393,9 +7424,13 @@ public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Durati
* @param maxBackoff the maximum delay to apply despite exponential growth and jitter.
* @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0).
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
* @deprecated use {@link #retry(Retry.Builder)} with a {@link Retry#backoff(long, Duration)} base
*/
@Deprecated
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor) {
return retryBackoff(numRetries, firstBackoff, maxBackoff, jitterFactor, Schedulers.parallel());
return retry(Retry.backoff(numRetries, firstBackoff)
.maxBackoff(maxBackoff)
.jitter(jitterFactor));
}

/**
Expand Down Expand Up @@ -7434,9 +7469,14 @@ public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Durati
* @param backoffScheduler the {@link Scheduler} on which the delays and subsequent attempts are executed.
* @param jitterFactor the jitter percentage (as a double between 0.0 and 1.0).
* @return a {@link Flux} that retries on onError with exponentially growing randomized delays between retries.
* @deprecated use {@link #retry(Retry.Builder)} with a {@link Retry#backoff(long, Duration)} base
*/
@Deprecated
public final Flux<T> retryBackoff(long numRetries, Duration firstBackoff, Duration maxBackoff, double jitterFactor, Scheduler backoffScheduler) {
return retryWhen(FluxRetryWhen.randomExponentialBackoffFunction(numRetries, firstBackoff, maxBackoff, jitterFactor, backoffScheduler));
return retry(Retry.backoff(numRetries, firstBackoff)
.maxBackoff(maxBackoff)
.jitter(jitterFactor)
.scheduler(backoffScheduler));
}

/**
Expand Down
Loading

0 comments on commit e964a16

Please sign in to comment.