Skip to content

Commit

Permalink
add javadoc and polish
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Dec 5, 2019
1 parent fb0de93 commit d01f34d
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 16 deletions.
7 changes: 7 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -7231,6 +7231,13 @@ public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
Retry.State::failure))));
}

/**
* Retries this {@link Flux} in case of errors, as configured by the {@link Retry.Builder} passed.
*
* @param builder the {@link Retry.Builder} to configure retries
* @return a {@link Flux} that retries on onError
* @see Retry.Builder
*/
public final Flux<T> retry(Retry.Builder builder) {
return onAssembly(new FluxRetryWhen<>(this, builder.build()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public long failureSubsequentIndex() {

@Override
public Throwable failure() {
assert this.lastFailure != null;
return this.lastFailure;
}

Expand Down
11 changes: 9 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -3695,8 +3695,15 @@ public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
return onAssembly(new MonoRetryWhen<>(this, (Flux<Retry.State> rws) -> whenFactory.apply(rws.map(Retry.State::failure))));
}

public final Mono<T> retry(Retry.Builder retryBuilder) {
return onAssembly(new MonoRetryWhen<>(this, retryBuilder.build()));
/**
* Retries this {@link Mono} in case of errors, as configured by the {@link Retry.Builder} passed.
*
* @param builder the {@link Retry.Builder} to configure retries
* @return a {@link Mono} that retries on onError
* @see Retry.Builder
*/
public final Mono<T> retry(Retry.Builder builder) {
return onAssembly(new MonoRetryWhen<>(this, builder.build()));
}

/**
Expand Down
163 changes: 149 additions & 14 deletions reactor-core/src/main/java/reactor/util/retry/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package reactor.util.retry;

import java.time.Duration;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;

Expand All @@ -28,24 +29,71 @@
import reactor.util.annotation.Nullable;

/**
* Utilities around {@link Flux#retry(Builder) retries} (builder to configure retries,
* retry {@link State}, etc...
*
* @author Simon Baslé
*/
public class Retry {

static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);

/**
* State for a {@link Flux#retry(Builder) Flux retry} or {@link reactor.core.publisher.Mono#retry(Builder) Mono retry}.
* The state is passed to the retry function inside a publisher and gives information about the
* {@link #failure()} that potentially triggers a retry, as well as two indexes: the number of
* errors that happened so far (and were retried) and the same number, but only taking into account
* <strong>subsequent</strong> errors (see {@link #failureSubsequentIndex()}).
*/
public interface State {

/**
* The ZERO BASED index number of this error (can also be read as how many retries have occurred
* so far), since the source was first subscribed to.
*
* @return a 0-index for the error, since original subscription
*/
long failureTotalIndex();

/**
* The ZERO BASED index number of this error since the beginning of the current burst of errors.
* This is reset to zero whenever a retry is made that is followed by at least one
* {@link org.reactivestreams.Subscriber#onNext(Object) onNext}.
*
* @return a 0-index for the error in the current burst of subsequent errors
*/
long failureSubsequentIndex();
@Nullable

/**
* The current {@link Throwable} that needs to be evaluated for retry.
*
* @return the current failure {@link Throwable}
*/
Throwable failure();
}

public static Builder backoff(long max, Duration minBackoff) {
return new Builder(max, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers.parallel());
/**
* A {@link Builder} preconfigured for exponential backoff strategy with jitter, given a maximum number of retry attempts
* and a minimum {@link Duration} for the backoff.
*
* @param maxAttempts the maximum number of retry attempts to allow
* @param minBackoff the minimum {@link Duration} for the first backoff
* @return the builder for further configuration
* @see Builder#maxAttempts(long)
* @see Builder#minBackoff(Duration)
*/
public static Builder backoff(long maxAttempts, Duration minBackoff) {
return new Builder(maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers.parallel());
}

public static Builder max(int max) {
/**
* A {@link Builder} preconfigured for a simple strategy with maximum number of retry attempts.
*
* @param max the maximum number of retry attempts to allow
* @return the builder for further configuration
* @see Builder#maxAttempts(long)
*/
public static Builder max(long max) {
return new Builder(max, t -> true, false, Duration.ZERO, MAX_BACKOFF, 0d, null);
}

Expand All @@ -67,6 +115,9 @@ public static Builder max(int max) {
* had its own backoff, one can choose to set {@link #transientErrors(boolean)} to {@code true}.
* The comparison to {@link #maxAttempts(long)} will then be done with the number of subsequent attempts
* that failed without an {@link org.reactivestreams.Subscriber#onNext(Object) onNext} in between.
* <p>
* The {@link Builder} is copy-on-write and as such can be stored as a "template" and further configured
* by different components without a risk of modifying the original configuration.
*/
public static class Builder {

Expand All @@ -82,12 +133,6 @@ public static class Builder {

/**
* Copy constructor.
* @param max
* @param minBackoff
* @param maxBackoff
* @param jitterFactor
* @param throwablePredicate
* @param backoffScheduler
*/
Builder(long max,
Predicate<? super Throwable> throwablePredicate,
Expand All @@ -103,6 +148,14 @@ public static class Builder {
this.backoffScheduler = backoffScheduler;
}

/**
* Set the maximum number of retry attempts allowed. 1 meaning "1 retry attempt":
* the original subscription plus an extra re-subscription in case of an error, but
* no more.
*
* @param maxAttempts the new retry attempt limit
* @return the builder for further configuration
*/
public Builder maxAttempts(long maxAttempts) {
return new Builder(
maxAttempts,
Expand All @@ -114,19 +167,47 @@ public Builder maxAttempts(long maxAttempts) {
this.backoffScheduler);
}

/**
* Set the {@link Predicate} that will filter which errors can be retried. Exceptions
* that don't pass the predicate will be propagated downstream and terminate the retry
* sequence. Defaults to allowing retries for all exceptions.
*
* @param predicate the predicate to filter which exceptions can be retried
* @return the builder for further configuration
*/
public Builder throwablePredicate(Predicate<? super Throwable> predicate) {
return new Builder(
this.maxAttempts,
predicate,
Objects.requireNonNull(predicate, "predicate"),
this.isTransientErrors,
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler);
}

/**
* Allows to augment a previously {@link #throwablePredicate(Predicate) set} {@link Predicate} with
* a new condition to allow retries of some exception or not. This can typically be used with
* {@link Predicate#and(Predicate)} to combine existing predicate(s) with a new one.
* <p>
* For example:
* <pre><code>
* //given
* Builder retryTwiceIllegalArgument = Retry.max(2)
* .throwablePredicate(e -> e instanceof IllegalArgumentException);
*
* Builder retryTwiceIllegalArgWithCause = retryTwiceIllegalArgument.throwablePredicate(old ->
* old.and(e -> e.getCause() != null));
* </code></pre>
*
* @param predicateAdjuster a {@link Function} that returns a new {@link Predicate} given the
* currently in place {@link Predicate} (usually deriving from the old predicate).
* @return the builder for further configuration
*/
public Builder throwablePredicate(
Function<Predicate<? super Throwable>, Predicate<? super Throwable>> predicateAdjuster) {
Objects.requireNonNull(predicateAdjuster, "predicateAdjuster");
return new Builder(
this.maxAttempts,
predicateAdjuster.apply(throwablePredicate),
Expand All @@ -137,6 +218,20 @@ public Builder throwablePredicate(
this.backoffScheduler);
}

/**
* Set the transient error mode, indicating that the strategy being built should use
* {@link State#failureSubsequentIndex()} rather than {@link State#failureTotalIndex()}.
* Transient errors are errors that could occur in bursts but are then recovered from by
* a retry (with one or more onNext signals) before another error occurs.
* <p>
* In simplified mode, this means that the {@link #maxAttempts(long)} is applied
* to each burst individually. In exponential backoff, the backoff is also computed
* based on the index within the burst, meaning the next error after a recovery will
* be retried with a {@link #minBackoff(Duration)} delay.
*
* @param isTransientErrors {@code true} to activate transient mode
* @return the builder for further configuration
*/
public Builder transientErrors(boolean isTransientErrors) {
return new Builder(
this.maxAttempts,
Expand All @@ -150,28 +245,53 @@ public Builder transientErrors(boolean isTransientErrors) {

//all backoff specific methods should set the default scheduler if needed

/**
* Set the minimum {@link Duration} for the first backoff. This method switches to an
* exponential backoff strategy if not already done so. Defaults to {@link Duration#ZERO}
* when the strategy was initially not a backoff one.
*
* @param minBackoff the minimum backoff {@link Duration}
* @return the builder for further configuration
*/
public Builder minBackoff(Duration minBackoff) {
return new Builder(
this.maxAttempts,
this.throwablePredicate,
this.isTransientErrors,
minBackoff,
Objects.requireNonNull(minBackoff, "minBackoff"),
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler == null ? Schedulers.parallel() : this.backoffScheduler);
}

/**
* Set a hard maximum {@link Duration} for exponential backoffs. This method switches
* to an exponential backoff strategy with a zero minimum backoff if not already a backoff
* strategy. Defaults to {@code Duration.ofMillis(Long.MAX_VALUE)}.
*
* @param maxBackoff the maximum backoff {@link Duration}
* @return the builder for further configuration
*/
public Builder maxBackoff(Duration maxBackoff) {
return new Builder(
this.maxAttempts,
this.throwablePredicate,
this.isTransientErrors,
this.minBackoff,
maxBackoff,
Objects.requireNonNull(maxBackoff, "maxBackoff"),
this.jitterFactor,
this.backoffScheduler == null ? Schedulers.parallel() : this.backoffScheduler);
}

/**
* Set a jitter factor for exponential backoffs that adds randomness to each backoff. This can
* be helpful in reducing cascading failure due to retry-storms. This method switches to an
* exponential backoff strategy with a zero minimum backoff if not already a backoff strategy.
* Defaults to {@code 0.5} (a jitter of at most 50% of the computed delay).
*
* @param jitterFactor the new jitter factor as a {@code double} between {@code 0d} and {@code 1d}
* @return the builder for further configuration
*/
public Builder jitter(double jitterFactor) {
return new Builder(
this.maxAttempts,
Expand All @@ -183,6 +303,15 @@ public Builder jitter(double jitterFactor) {
this.backoffScheduler == null ? Schedulers.parallel() : this.backoffScheduler);
}

/**
* Set a {@link Scheduler} on which to execute the delays computed by the exponential backoff
* strategy. This method switches to an exponential backoff strategy with a zero minimum backoff
* if not already a backoff strategy. Defaults to {@link Schedulers#parallel()} in the backoff
* strategy.
*
* @param backoffScheduler the {@link Scheduler} to use
* @return the builder for further configuration
*/
public Builder scheduler(Scheduler backoffScheduler) {
return new Builder(
this.maxAttempts,
Expand All @@ -191,9 +320,15 @@ public Builder scheduler(Scheduler backoffScheduler) {
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
backoffScheduler);
Objects.requireNonNull(backoffScheduler, "backoffScheduler"));
}

/**
* Build the configured retry strategy as a {@link Function} taking a companion {@link Flux} of
* {@link State retry state} and outputting a {@link Publisher} that emits to signal a retry is allowed.
*
* @return the retry {@link Function} based on a companion flux of {@link State}
*/
public Function<Flux<State>, Publisher<?>> build() {
if (minBackoff == Duration.ZERO && maxBackoff == MAX_BACKOFF && jitterFactor == 0d && backoffScheduler == null) {
return new SimpleRetryFunction(this);
Expand Down

0 comments on commit d01f34d

Please sign in to comment.