Skip to content

Commit

Permalink
polish: rename State to RetrySignal, turn Builder into a Supplier, ...
Browse files Browse the repository at this point in the history
 - warn transient nature of state in javadoc
 - builder being replaced with a Supplier<Function> in the Flux API
 means anybody can easily write their own, a bit like retryWhen, but
 with the flexibility of a builder pattern as the "default"
  - added RetrySignal#retain()
  - polished the functions
  • Loading branch information
simonbasle committed Dec 9, 2019
1 parent d01f34d commit ba3ea73
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 53 deletions.
20 changes: 15 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -7228,18 +7228,28 @@ public final Flux<T> retry(long numRetries, Predicate<? super Throwable> retryMa
public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
Objects.requireNonNull(whenFactory, "whenFactory");
return onAssembly(new FluxRetryWhen<>(this, fluxRetryWhenState -> whenFactory.apply(fluxRetryWhenState.map(
Retry.State::failure))));
Retry.RetrySignal::failure))));
}

/**
* Retries this {@link Flux} in case of errors, as configured by the {@link Retry.Builder} passed.
* Retries this {@link Flux} in case of errors, as configured by the {@link Function} supplied
* (typically a {@link Retry.Builder}, or a custom function derived from a companion flux of
* {@link Retry.RetrySignal}). The output is a {@link Publisher} that can emit an arbitrary object
* to signal a retry is allowed, and when the resubscription must occur.
* <p>
* Note that the {@link Retry.RetrySignal} state can be transient and change between each source
* {@link org.reactivestreams.Subscriber#onError(Throwable) onError} or
* {@link org.reactivestreams.Subscriber#onNext(Object) onNext}. If processed with a delay,
* this could lead to the represented state being out of sync with the state at which the retry
* was evaluated. Map it to {@link Retry.RetrySignal#retain()} right away to mediate this.
*
* @param builder the {@link Retry.Builder} to configure retries
* @param strategySupplier a supplier of a retry {@link Function}, typically a {@link Retry.Builder} to configure retries
* @return a {@link Flux} that retries on onError
* @see Retry.Builder
* @see Retry.Builder#get()
*/
public final Flux<T> retry(Retry.Builder builder) {
return onAssembly(new FluxRetryWhen<>(this, builder.build()));
public final Flux<T> retry(Supplier<Function<Flux<Retry.RetrySignal>, Publisher<?>>> strategySupplier) {
return onAssembly(new FluxRetryWhen<>(this, strategySupplier.get()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@
*/
final class FluxRetryWhen<T> extends InternalFluxOperator<T, T> {

final Function<? super Flux<Retry.State>, ? extends Publisher<?>> whenSourceFactory;
final Function<? super Flux<Retry.RetrySignal>, ? extends Publisher<?>> whenSourceFactory;

FluxRetryWhen(Flux<? extends T> source, Function<? super Flux<Retry.State>, ? extends Publisher<?>> whenSourceFactory) {
FluxRetryWhen(Flux<? extends T> source, Function<? super Flux<Retry.RetrySignal>, ? extends Publisher<?>> whenSourceFactory) {
super(source);
this.whenSourceFactory = Objects.requireNonNull(whenSourceFactory, "whenSourceFactory");
}

static <T> void subscribe(CoreSubscriber<? super T> s,
Function<? super Flux<Retry.State>, ? extends Publisher<?>> whenSourceFactory,
Function<? super Flux<Retry.RetrySignal>, ? extends Publisher<?>> whenSourceFactory,
CorePublisher<? extends T> source) {
RetryWhenOtherSubscriber other = new RetryWhenOtherSubscriber();
Subscriber<Retry.State> signaller = Operators.serialize(other.completionSignal);
Subscriber<Retry.RetrySignal> signaller = Operators.serialize(other.completionSignal);

signaller.onSubscribe(Operators.emptySubscription());

Expand Down Expand Up @@ -88,11 +88,11 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
}

static final class RetryWhenMainSubscriber<T> extends Operators.MultiSubscriptionSubscriber<T, T>
implements Retry.State {
implements Retry.RetrySignal {

final Operators.SwapSubscription otherArbiter;

final Subscriber<Retry.State> signaller;
final Subscriber<Retry.RetrySignal> signaller;

final CorePublisher<? extends T> source;

Expand All @@ -110,7 +110,7 @@ static final class RetryWhenMainSubscriber<T> extends Operators.MultiSubscriptio
long produced;

RetryWhenMainSubscriber(CoreSubscriber<? super T> actual,
Subscriber<Retry.State> signaller,
Subscriber<Retry.RetrySignal> signaller,
CorePublisher<? extends T> source) {
super(actual);
this.signaller = signaller;
Expand Down Expand Up @@ -221,11 +221,11 @@ void whenComplete() {
}
}

static final class RetryWhenOtherSubscriber extends Flux<Retry.State>
implements InnerConsumer<Object>, OptimizableOperator<Retry.State, Retry.State> {
static final class RetryWhenOtherSubscriber extends Flux<Retry.RetrySignal>
implements InnerConsumer<Object>, OptimizableOperator<Retry.RetrySignal, Retry.RetrySignal> {
RetryWhenMainSubscriber<?> main;

final DirectProcessor<Retry.State> completionSignal = new DirectProcessor<>();
final DirectProcessor<Retry.RetrySignal> completionSignal = new DirectProcessor<>();

@Override
public Context currentContext() {
Expand Down Expand Up @@ -262,22 +262,22 @@ public void onComplete() {
}

@Override
public void subscribe(CoreSubscriber<? super Retry.State> actual) {
public void subscribe(CoreSubscriber<? super Retry.RetrySignal> actual) {
completionSignal.subscribe(actual);
}

@Override
public CoreSubscriber<? super Retry.State> subscribeOrReturn(CoreSubscriber<? super Retry.State> actual) {
public CoreSubscriber<? super Retry.RetrySignal> subscribeOrReturn(CoreSubscriber<? super Retry.RetrySignal> actual) {
return actual;
}

@Override
public DirectProcessor<Retry.State> source() {
public DirectProcessor<Retry.RetrySignal> source() {
return completionSignal;
}

@Override
public OptimizableOperator<?, ? extends Retry.State> nextOptimizableSource() {
public OptimizableOperator<?, ? extends Retry.RetrySignal> nextOptimizableSource() {
return null;
}
}
Expand Down
23 changes: 17 additions & 6 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -3692,18 +3692,29 @@ public final Mono<T> retry(long numRetries, Predicate<? super Throwable> retryMa
*/
public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
Objects.requireNonNull(whenFactory, "whenFactory");
return onAssembly(new MonoRetryWhen<>(this, (Flux<Retry.State> rws) -> whenFactory.apply(rws.map(Retry.State::failure))));
return onAssembly(new MonoRetryWhen<>(this, (Flux<Retry.RetrySignal> rws) -> whenFactory.apply(rws.map(
Retry.RetrySignal::failure))));
}

/**
* Retries this {@link Mono} in case of errors, as configured by the {@link Retry.Builder} passed.
* Retries this {@link Mono} in case of errors, as configured by the {@link Function} supplied
* (typically a {@link Retry.Builder}, or a custom function derived from a companion flux of
* {@link Retry.RetrySignal}). The output is a {@link Publisher} that can emit an arbitrary object
* to signal a retry is allowed, and when the resubscription must occur.
* <p>
* Note that the {@link Retry.RetrySignal} state can be transient and change between each source
* {@link org.reactivestreams.Subscriber#onError(Throwable) onError} or
* {@link org.reactivestreams.Subscriber#onNext(Object) onNext}. If processed with a delay,
* this could lead to the represented state being out of sync with the state at which the retry
* was evaluated. Map it to {@link Retry.RetrySignal#retain()} right away to mediate this.
*
* @param builder the {@link Retry.Builder} to configure retries
* @return a {@link Mono} that retries on onError
* @param strategySupplier a supplier of a retry {@link Function}, typically a {@link Retry.Builder} to configure retries
* @return a {@link Flux} that retries on onError
* @see Retry.Builder
* @see Retry.Builder#get()
*/
public final Mono<T> retry(Retry.Builder builder) {
return onAssembly(new MonoRetryWhen<>(this, builder.build()));
public final Mono<T> retry(Supplier<Function<Flux<Retry.RetrySignal>, Publisher<?>>> strategySupplier) {
return onAssembly(new MonoRetryWhen<>(this, strategySupplier.get()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
*/
final class MonoRetryWhen<T> extends InternalMonoOperator<T, T> {

final Function<? super Flux<Retry.State>, ? extends Publisher<?>> whenSourceFactory;
final Function<? super Flux<Retry.RetrySignal>, ? extends Publisher<?>> whenSourceFactory;

MonoRetryWhen(Mono<? extends T> source,
Function<? super Flux<Retry.State>, ? extends Publisher<?>> whenSourceFactory) {
Function<? super Flux<Retry.RetrySignal>, ? extends Publisher<?>> whenSourceFactory) {
super(source);
this.whenSourceFactory = Objects.requireNonNull(whenSourceFactory, "whenSourceFactory");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.util.retry.Retry.State;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry.RetrySignal;

class ExponentialBackoffFunction extends SimpleRetryFunction {

Expand All @@ -39,23 +40,24 @@ class ExponentialBackoffFunction extends SimpleRetryFunction {
if (jitterFactor < 0 || jitterFactor > 1) throw new IllegalArgumentException("jitterFactor must be between 0 and 1 (default 0.5)");
this.firstBackoff = builder.minBackoff;
this.maxBackoff = builder.maxBackoff;
this.backoffScheduler = builder.backoffScheduler;
this.backoffScheduler = builder.backoffScheduler == null ? Schedulers.parallel() : builder.backoffScheduler;
}

@Override
public Publisher<?> apply(Flux<State> t) {
public Publisher<?> apply(Flux<RetrySignal> t) {
return t.flatMap(retryWhenState -> {
//capture the state immediately
Throwable currentFailure = retryWhenState.failure();
long iteration = isTransientErrors ? retryWhenState.failureSubsequentIndex() : retryWhenState.failureTotalIndex();

if (currentFailure == null) {
return Mono.error(new IllegalStateException("Retry.State#failure() not expected to be null"));
return Mono.error(new IllegalStateException("Retry.RetrySignal#failure() not expected to be null"));
}

if (!throwablePredicate.test(currentFailure)) {
return Mono.error(currentFailure);
}

long iteration = isTransientErrors ? retryWhenState.failureSubsequentIndex() : retryWhenState.failureTotalIndex();

if (iteration >= maxAttempts) {
return Mono.error(new IllegalStateException("Retries exhausted: " + iteration + "/" + maxAttempts, currentFailure));
}
Expand Down
61 changes: 53 additions & 8 deletions reactor-core/src/main/java/reactor/util/retry/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;

Expand All @@ -30,7 +31,7 @@

/**
* Utilities around {@link Flux#retry(Builder) retries} (builder to configure retries,
* retry {@link State}, etc...
* retry {@link RetrySignal signal}, etc...)
*
* @author Simon Baslé
*/
Expand All @@ -45,7 +46,7 @@ public class Retry {
* errors that happened so far (and were retried) and the same number, but only taking into account
* <strong>subsequent</strong> errors (see {@link #failureSubsequentIndex()}).
*/
public interface State {
public interface RetrySignal {

/**
* The ZERO BASED index number of this error (can also be read as how many retries have occurred
Expand All @@ -70,6 +71,17 @@ public interface State {
* @return the current failure {@link Throwable}
*/
Throwable failure();

/**
* If this {@link RetrySignal} is a transient view of the state of the underlying retry subscriber,
* return an immutable copy of that view that is guaranteed to be consistent with the time at which
* this method is invoked.
*
* @return an immutable copy of the current {@link RetrySignal}, always safe to use
*/
default RetrySignal retain() {
return new ImmutableRetrySignal(failureTotalIndex(), failureSubsequentIndex(), failure());
}
}

/**
Expand Down Expand Up @@ -119,7 +131,7 @@ public static Builder max(long max) {
* The {@link Builder} is copy-on-write and as such can be stored as a "template" and further configured
* by different components without a risk of modifying the original configuration.
*/
public static class Builder {
public static class Builder implements Supplier<Function<Flux<RetrySignal>, Publisher<?>>> {

final Duration minBackoff;
final Duration maxBackoff;
Expand Down Expand Up @@ -220,7 +232,7 @@ public Builder throwablePredicate(

/**
* Set the transient error mode, indicating that the strategy being built should use
* {@link State#failureSubsequentIndex()} rather than {@link State#failureTotalIndex()}.
* {@link RetrySignal#failureSubsequentIndex()} rather than {@link RetrySignal#failureTotalIndex()}.
* Transient errors are errors that could occur in bursts but are then recovered from by
* a retry (with one or more onNext signals) before another error occurs.
* <p>
Expand Down Expand Up @@ -324,16 +336,49 @@ public Builder scheduler(Scheduler backoffScheduler) {
}

/**
* Build the configured retry strategy as a {@link Function} taking a companion {@link Flux} of
* {@link State retry state} and outputting a {@link Publisher} that emits to signal a retry is allowed.
* Build the configured retry strategy.
*
* @return the retry {@link Function} based on a companion flux of {@link State}
* @return the retry {@link Function} based on a companion flux of {@link RetrySignal}
*/
public Function<Flux<State>, Publisher<?>> build() {
public Function<Flux<RetrySignal>, Publisher<?>> get() {
if (minBackoff == Duration.ZERO && maxBackoff == MAX_BACKOFF && jitterFactor == 0d && backoffScheduler == null) {
return new SimpleRetryFunction(this);
}
return new ExponentialBackoffFunction(this);
}
}

static class ImmutableRetrySignal implements RetrySignal {

final long failureTotalIndex;
final long failureSubsequentIndex;
final Throwable failure;

ImmutableRetrySignal(long failureTotalIndex, long failureSubsequentIndex,
Throwable failure) {
this.failureTotalIndex = failureTotalIndex;
this.failureSubsequentIndex = failureSubsequentIndex;
this.failure = failure;
}

@Override
public long failureTotalIndex() {
return this.failureTotalIndex;
}

@Override
public long failureSubsequentIndex() {
return this.failureSubsequentIndex;
}

@Override
public Throwable failure() {
return this.failure;
}

@Override
public RetrySignal retain() {
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry.State;

class SimpleRetryFunction implements Function<Flux<State>, Publisher<?>> {
class SimpleRetryFunction implements Function<Flux<Retry.RetrySignal>, Publisher<?>> {

final long maxAttempts;
final Predicate<? super Throwable> throwablePredicate;
Expand All @@ -38,9 +37,12 @@ class SimpleRetryFunction implements Function<Flux<State>, Publisher<?>> {
}

@Override
public Publisher<?> apply(Flux<Retry.State> flux) {
public Publisher<?> apply(Flux<Retry.RetrySignal> flux) {
return flux.flatMap(retryWhenState -> {
//capture the state immediately
Throwable currentFailure = retryWhenState.failure();
long iteration = isTransientErrors ? retryWhenState.failureSubsequentIndex() : retryWhenState.failureTotalIndex();

if (currentFailure == null) {
return Mono.error(new IllegalStateException("RetryWhenState#failure() not expected to be null"));
}
Expand All @@ -49,8 +51,6 @@ public Publisher<?> apply(Flux<Retry.State> flux) {
return Mono.error(currentFailure);
}

long iteration = isTransientErrors ? retryWhenState.failureSubsequentIndex() : retryWhenState.failureTotalIndex();

if (iteration >= maxAttempts) {
return Mono.error(new IllegalStateException("Retries exhausted: " + iteration + "/" + maxAttempts, currentFailure));
}
Expand Down
Loading

0 comments on commit ba3ea73

Please sign in to comment.