Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of retryWhen to switch to a Spec/Builder model #1979

Merged
merged 3 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/asciidoc/apdx-operatorChoice.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ I want to deal with:
** by falling back:
*** to a value: `onErrorReturn`
*** to a `Publisher` or `Mono`, possibly different ones depending on the error: `Flux#onErrorResume` and `Mono#onErrorResume`
** by retrying: `retry`
** by retrying...
*** ...with a simple policy (max number of attempts): `retry()`, `retry(long)`
*** ...triggered by a companion control Flux: `retryWhen`
*** ... using a standard backoff strategy (exponential backoff with jitter): `retryBackoff`
*** ...using a standard backoff strategy (exponential backoff with jitter): `retryWhen(Retry.backoff(...))`

* I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)...
** by throwing a special `IllegalStateException`: `Flux#onBackpressureError`
Expand Down
3 changes: 3 additions & 0 deletions docs/asciidoc/apdx-reactorExtra.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ Since 3.2.0, one of the most advanced retry strategies offered by these utilitie
also part of the `reactor-core` main artifact directly. Exponential backoff is
available as the `Flux#retryBackoff` operator.

Since 3.3.4, the `Retry` builder is offered directly in core and has a few more possible
customizations, being based on a `RetrySignal` that encapsulates additional state than the
error.

[[extra-schedulers]]
== Schedulers
Expand Down
62 changes: 58 additions & 4 deletions docs/asciidoc/coreFeatures.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -879,8 +879,8 @@ There is a more advanced version of `retry` (called `retryWhen`) that uses a "`c
created by the operator but decorated by the user, in order to customize the retry
condition.

The companion `Flux` is a `Flux<Throwable>` that gets passed to a `Function`, the sole
parameter of `retryWhen`. As the user, you define that function and make it return a new
The companion `Flux` is a `Flux<RetrySignal>` that gets passed to a `Retry` strategy/function,
supplied as the sole parameter of `retryWhen`. As the user, you define that function and make it return a new
`Publisher<?>`. Retry cycles go as follows:

. Each time an error happens (giving potential for a retry), the error is emitted into the
Expand All @@ -902,11 +902,13 @@ companion would effectively swallow an error. Consider the following way of emul
Flux<String> flux = Flux
.<String>error(new IllegalArgumentException()) // <1>
.doOnError(System.out::println) // <2>
.retryWhen(companion -> companion.take(3)); // <3>
.retryWhen(() -> // <3>
companion -> companion.take(3)); // <4>
----
<1> This continuously produces errors, calling for retry attempts.
<2> `doOnError` before the retry lets us log and see all failures.
<3> Here, we consider the first three errors as retry-able (`take(3)`) and then give up.
<3> The `Retry` function is passed as a `Supplier`
<4> Here, we consider the first three errors as retry-able (`take(3)`) and then give up.
====

In effect, the preceding example results in an empty `Flux`, but it completes successfully. Since
Expand All @@ -916,9 +918,61 @@ In effect, the preceding example results in an empty `Flux`, but it completes su
Getting to the same behavior involves a few additional tricks:
include::snippetRetryWhenRetry.adoc[]

TIP: One can use the builders exposed in `Retry` to achieve the same in a more fluent manner, as
well as more finely tuned retry strategies: `errorFlux.retryWhen(Retry.max(3));`.

TIP: You can use similar code to implement an "`exponential backoff and retry`" pattern,
as shown in the <<faq.exponentialBackoff,FAQ>>.

The core-provided `Retry` builders, `RetrySpec` and `RetryBackoffSpec`, both allow advanced customizations like:

- setting the `filter(Predicate)` for the exceptions that can trigger a retry
- modifying such a previously set filter through `modifyErrorFilter(Function)`
- triggering a side effect like logging around the retry trigger (ie for backoff before and after the delay), provided the retry is validated (`doBeforeRetry()` and `doAfterRetry()` are additive)
- triggering an asynchronous `Mono<Void>` around the retry trigger, which allows to add asynchronous behavior on top of the base delay but thus further delay the trigger (`doBeforeRetryAsync` and `doAfterRetryAsync` are additive)
- customizing the exception in case the maximum number of attempts has been reached, through `onRetryExhaustedThrow(BiFunction)`.
By default, `Exceptions.retryExhausted(...)` is used, which can be distinguished with `Exceptions.isRetryExhausted(Throwable)`
- activating the handling of _transient errors_ (see below)

Transient error handling in the `Retry` specs makes use of `RetrySignal#totalRetriesInARow()`: to check whether to retry or not and to compute the retry delays, the index used is an alternative one that is reset to 0 each time an `onNext` is emitted.
This has the consequence that if a re-subscribed source generates some data before failing again, previous failures don't count toward the maximum number of retry attempts.
In the case of exponential backoff strategy, this also means that the next attempt will be back to using the minimum `Duration` backoff instead of a longer one.
This can be especially useful for long-lived sources that see sporadic bursts of errors (or _transient_ errors), where each burst should be retried with its own backoff.

====
[source,java]
----
AtomicInteger errorCount = new AtomicInteger(); // <1>
AtomicInteger transientHelper = new AtomicInteger();
Flux<Integer> transientFlux = Flux.<Integer>generate(sink -> {
int i = transientHelper.getAndIncrement();
if (i == 10) { // <2>
sink.next(i);
sink.complete();
}
else if (i % 3 == 0) { // <3>
sink.next(i);
}
else {
sink.error(new IllegalStateException("Transient error at " + i)); // <4>
}
})
.doOnError(e -> errorCount.incrementAndGet());

transientFlux.retryWhen(Retry.max(2).transientErrors(true)) // <5>
.blockLast();
assertThat(errorCount).hasValue(6); // <6>
----
<1> We will count the number of errors in the retried sequence.
<2> We `generate` a source that has bursts of errors. It will successfully complete when the counter reaches 10.
<3> If the `transientHelper` atomic is at a multiple of `3`, we emit `onNext` and thus end the current burst.
<4> In other cases we emit an `onError`. That's 2 out of 3 times, so bursts of 2 `onError` interrupted by 1 `onNext`.
<5> We use `retryWhen` on that source, configured for at most 2 retry attempts, but in `transientErrors` mode.
<6> At the end, the sequence reaches `onNext(10)` and completes, after `6` errors have been registered in `errorCount`.
====

Without the `transientErrors(true)`, the configured maximum attempt of `2` would be reached by the second burst and the sequence would fail after having emitted `onNext(3)`.

=== Handling Exceptions in Operators or Functions

In general, all operators can themselves contain code that potentially trigger an
Expand Down
36 changes: 19 additions & 17 deletions docs/asciidoc/faq.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -176,32 +176,34 @@ an unstable state and is not likely to immediately recover from it. So blindly
retrying immediately is likely to produce yet another error and add to the
instability.

Since `3.2.0.RELEASE`, Reactor comes with such a retry baked in: `Flux.retryBackoff`.
Since `3.3.4.RELEASE`, Reactor comes with a builder for such a retry baked in: `Retry.backoff`.

The following example shows how to implement an exponential backoff with `retryWhen`.
The following example showcases a simple use of the builder, with hooks logging message right before
and after the retry attempt delays.
It delays retries and increases the delay between each attempt (pseudocode:
delay = attempt number * 100 milliseconds):

====
[source,java]
----
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.doOnNext(s -> System.out.println(s + " at " + LocalTime.now())) // <1>
.zipWith(Flux.range(1, 4), (error, index) -> { // <2>
if (index < 4) return index;
else throw Exceptions.propagate(error);
})
.flatMap(index -> Mono.delay(Duration.ofMillis(index * 100))) // <3>
.doOnNext(s -> System.out.println("retried at " + LocalTime.now())) // <4>
);
Flux.<String>error(new IllegalStateException("boom"))
.doOnError(e -> { // <1>
errorCount.incrementAndGet();
System.out.println(e + " at " + LocalTime.now());
})
.retryWhen(Retry
.backoff(3, Duration.ofMillis(100)).jitter(0d) // <2>
.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now())) // <3>
.onRetryExhaustedThrow((spec, rs) -> rs.failure()) // <4>
);
----
<1> We log the time of errors.
<2> We use the `retryWhen` + `zipWith` trick to propagate the error after three
retries.
<3> Through `flatMap`, we cause a delay that depends on the attempt's index.
<4> We also log the time at which the retry happens.
<1> We will log the time of errors emitted by the source and count them.
<2> We configure an exponential backoff retry with at most 3 attempts and no jitter.
<3> We also log the time at which the retry happens.
<4> By default an `Exceptions.retryExhausted` exception would be thrown, with the last `failure()` as a cause.
Here we customize that to directly emit the cause as `onError`.
====

When subscribed to, this fails and terminates after printing out the following:
Expand Down
24 changes: 12 additions & 12 deletions docs/asciidoc/snippetRetryWhenRetry.adoc
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
====
[source,java]
----
AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.retryWhen(companion -> companion
.zipWith(Flux.range(1, 4), // <1>
(error, index) -> { // <2>
if (index < 4) return index; // <3>
else throw Exceptions.propagate(error); // <4>
})
);
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(() -> companion -> // <1>
companion.map(rs -> { // <2>
if (rs.totalRetries() < 3) return rs.totalRetries(); // <3>
else throw Exceptions.propagate(rs.failure()); // <4>
})
);
----
<1> Trick one: use `zip` and a `range` of "number of acceptable retries + 1".
<2> The `zip` function lets you count the retries while keeping track of the original
error.
<3> To allow for three retries, indexes before 4 return a value to emit.
<1> `retryWhen` expects a `Supplier<Retry>`
<2> The companion emits `RetrySignal` objects, which bear number of retries so far and last failure
<3> To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index).
<4> In order to terminate the sequence in error, we throw the original exception after
these three retries.
====
45 changes: 45 additions & 0 deletions reactor-core/src/main/java/reactor/core/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -24,7 +25,9 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import reactor.core.publisher.Flux;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

/**
* Global Reactor Core Exception handling and utils to operate on.
Expand Down Expand Up @@ -275,6 +278,19 @@ public static RejectedExecutionException failWithRejected(String message) {
return new ReactorRejectedExecutionException(message);
}

/**
* Return a new {@link RuntimeException} that represents too many failures on retry.
* This nature can be detected via {@link #isRetryExhausted(Throwable)}.
* The cause of the last retry attempt is passed and stored as this exception's {@link Throwable#getCause() cause}.
*
* @param message the message
* @param cause the cause of the last retry attempt that failed (or null if irrelevant)
* @return a new {@link RuntimeException} representing retry exhaustion due to too many attempts
*/
public static RuntimeException retryExhausted(String message, @Nullable Throwable cause) {
return cause == null ? new RetryExhaustedException(message) : new RetryExhaustedException(message, cause);
}

/**
* Check if the given exception represents an {@link #failWithOverflow() overflow}.
* @param t the {@link Throwable} error to check
Expand Down Expand Up @@ -324,6 +340,17 @@ public static boolean isMultiple(@Nullable Throwable t) {
return t instanceof CompositeException;
}

/**
* Check a {@link Throwable} to see if it indicates too many retry attempts have failed.
* Such an exception can be created via {@link #retryExhausted(String, Throwable)}.
*
* @param t the {@link Throwable} to check, {@literal null} always yields {@literal false}
* @return true if the Throwable is an instance representing retry exhaustion, false otherwise
*/
public static boolean isRetryExhausted(@Nullable Throwable t) {
return t instanceof RetryExhaustedException;
}

/**
* Check a {@link Throwable} to see if it is a traceback, as created by the checkpoint operator or debug utilities.
*
Expand Down Expand Up @@ -667,6 +694,24 @@ static final class OverflowException extends IllegalStateException {
}
}

/**
* A specialized {@link IllegalStateException} to signify a {@link Flux#retryWhen(Retry) retry}
* has failed (eg. after N attempts, or a timeout).
*
* @see #retryExhausted(String, Throwable)
* @see #isRetryExhausted(Throwable)
*/
static final class RetryExhaustedException extends IllegalStateException {

RetryExhaustedException(String message) {
super(message);
}

RetryExhaustedException(String message, Throwable cause) {
super(message, cause);
}
}

static class ReactorRejectedExecutionException extends RejectedExecutionException {

ReactorRejectedExecutionException(String message, Throwable cause) {
Expand Down
Loading