-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor of retryWhen to switch to a Spec/Builder model #1979
Conversation
RetryWhenOtherSubscriber other = new RetryWhenOtherSubscriber(); | ||
Subscriber<Throwable> signaller = Operators.serialize(other.completionSignal); | ||
|
||
signaller.onSubscribe(Operators.emptySubscription()); | ||
|
||
CoreSubscriber<T> serial = Operators.serialize(s); | ||
|
||
RetryWhenMainSubscriber<T> main = | ||
new RetryWhenMainSubscriber<>(serial, signaller, source); | ||
Runnable otherReset = () -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allocation police (lol 😂):
WDYT about avoiding this Runnable
and introducing a more specialised RetryWhenMainSubscriber
OR passing a boolean (and other necessary variables) to the existing one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into passing the arguments directly to the ctor rather than relying on the Runnable
capturing that state. That said allocation wise it is only one allocation per original subscription (not on the critical data path).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially started with passing these elements, but it was cumbersome
@smaldini I will need to clean up commits in preparation of rebase-merge once this is approved |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, could you elaborate on why waiting first onNext and not just onSubcribe
@smaldini if you're referring to One typical scenario we want to cover is roughly:
|
2126aca
to
fcd92c0
Compare
Codecov Report
@@ Coverage Diff @@
## master #1979 +/- ##
=========================================
Coverage ? 81.93%
Complexity ? 4086
=========================================
Files ? 380
Lines ? 31214
Branches ? 5796
=========================================
Hits ? 25575
Misses ? 4060
Partials ? 1579
Continue to review full report at Codecov.
|
@smaldini I tried a quite different approach. Also relates to #1905 Big refactor: introduce builder and change retryWhen companion
|
reactor-core/src/test/java/reactor/core/publisher/FluxRetryPredicateTest.java
Show resolved
Hide resolved
ba3ea73
to
b030fe5
Compare
made it depend on #2039 (rebased) and squashed the rest of the commits with a detailed explanation in commit body. |
@rstoyanchev I'd be interested in your opinion about the |
b030fe5
to
e964a16
Compare
e964a16
to
6965bdf
Compare
(updated the PR today with removal of the need for a new |
reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/util/retry/SimpleRetryFunction.java
Outdated
Show resolved
Hide resolved
reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java
Outdated
Show resolved
Hide resolved
@simonbasle too bad but good you found out at this stage. I'm personally still not in favor of One other thought. What if the public final Flux<T> retryMax(long max, Consumer<RetrySpec> consumer) {}
public final Flux<T> retryBackOff(long max, Duration min, Consumer<RetryBackoffSpec> consumer) {} If a custom |
0d98781
to
4caf692
Compare
The |
4caf692
to
d5caf4e
Compare
@rstoyanchev @bsideup updated the PR with one last commit getting rid of the |
note that my intent is now to squash the commits in this PR. the resulting commit will be quite big but at this point I doubt it is worth the trouble to try and re-split into meaningful commits per issue that this PR fixes. |
@simonbasle aren't we breaking the source compatibility here? |
done, thanks for the suggestion |
@bsideup not really, no. as discussed off-band that change may raise complaints from the compiler if a lambda is used with japicmp should catch source compatibility breakage. |
/** | ||
* Set the generator for the {@link Exception} to be propagated when the maximum amount of retries | ||
* is exhausted. By default, throws an {@link Exceptions#retryExhausted(String, Throwable)} with the | ||
* message reflecting the total attempt index, transien attempt index and maximum retry count. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: "transien": -> "transient".
2c11515
to
85c049c
Compare
This big commit is a large refactor of the `retryWhen` operator in order to add several features. Fixes #1978 Fixes #1905 Fixes #2063 Fixes #2052 Fixes #2064 * Expose more state to `retryWhen` companion (#1978) This introduces a retryWhen variant based on a `Retry` functional interface. This "function" deals not with a Flux of `Throwable` but of `RetrySignal`. This allows retry function to check if there was some success (onNext) since last retry attempt, in which case the current attempt can be interpreted as if this was the first ever error. This is especially useful for cases where exponential backoff delays should be reset, for long lived sequences that only see intermittent bursts of errors (transient errors). We take that opportunity to offer a builder for such a function that could take transient errors into account. * the `Retry` builders Inspired by the `Retry` builder in addons, we introduce two classes: `RetrySpec` and `RetryBackoffSpec`. We name them Spec and not Builder because they don't require to call a `build()` method. Rather, each configuration step produces A) a new instance (copy on write) that B) is by itself already a `Retry`. The `Retry` + `xxxSpec` approach allows us to offer 2 standard strategies that both support transient error handling, while letting users write their own strategy (either as a standalone `Retry` concrete implementation, or as a builder/spec that builds one). Both specs allow to handle `transientErrors(boolean)`, which when true relies on the extra state exposed by the `RetrySignal`. For the simple case, this means that the remaining number of retries is reset in case of onNext. For the exponential case, this means retry delay is reset to minimum after an onNext (#1978). Additionally, the introduction of the specs allows us to add more features and support some features on more combinations, see below. * `filter` exceptions (#1905) Previously we could only filter exceptions to be retried on the simple long-based `retry` methods. With the specs we can `filter` in both immediate and exponential backoff retry strategies. * Add pre/post attempt hooks (#2063) The specs let the user configure two types of pre/post hooks. Note that if the retry attempt is denied (eg. we've reached the maximum number of attempts), these hooks are NOT executed. Synchronous hooks (`doBeforeRetry` and `doAfterRetry`) are side effects that should not block for too long and are executed right before and right after the retry trigger is sent by the companion publisher. Asynchronous hooks (`doBeforeRetryAsync` and `doAfterRetryAsync`) are composed into the companion publisher which generates the triggers, and they both delay the emission of said trigger in non-blocking and asynchronous fashion. Having pre and post hooks allows a user to better manage the order in which these asynchronous side effect should be performed. * Retry exhausted meaningful exception (#2052) The `Retry` function implemented by both spec throw a `RuntimeException` with a meaningful message when the configured maximum amount of attempts is reached. That exception can be pinpointed by calling the utility `Exceptions.isRetryExhausted` method. For further customization, users can replace that default with their own custom exception via `onRetryExhaustedThrow`. The BiFunction lets user access the Spec, which has public final fields that can be used to produce a meaningful message. * Ensure retry hooks completion is taken into account (#2064) The old `retryBackoff` would internally use a `flatMap`, which can cause issues. The Spec functions use `concatMap`. /!\ CAVEAT This commit deprecates all of the retryBackoff methods as well as the original `retryWhen` (based on Throwable companion publisher) in order to introduce the new `RetrySignal` based signature. The use of `Retry` explicit type lifts any ambiguity when using the Spec but using a lambda instead will raise some ambiguity at call sites of `retryWhen`. We deem that acceptable given that the migration is quite easy (turn `e -> whatever(e)` to `(Retry) rs -> whatever(rs.failure())`). Furthermore, `retryWhen` is an advanced operator, and we expect most uses to be combined with the retry builder in reactor-extra, which lifts the ambiguity itself.
85c049c
to
9e68bca
Compare
[old PR body]:
I have now squashed the changes into a single commit with detailed body. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@simonbasle don't forget to update the license headers
This big commit is a large refactor of the `retryWhen` operator in order to add several features. Fixes #1978 Fixes #1905 Fixes #2063 Fixes #2052 Fixes #2064 * Expose more state to `retryWhen` companion (#1978) This introduces a retryWhen variant based on a `Retry` functional interface. This "function" deals not with a Flux of `Throwable` but of `RetrySignal`. This allows retry function to check if there was some success (onNext) since last retry attempt, in which case the current attempt can be interpreted as if this was the first ever error. This is especially useful for cases where exponential backoff delays should be reset, for long lived sequences that only see intermittent bursts of errors (transient errors). We take that opportunity to offer a builder for such a function that could take transient errors into account. * the `Retry` builders Inspired by the `Retry` builder in addons, we introduce two classes: `RetrySpec` and `RetryBackoffSpec`. We name them Spec and not Builder because they don't require to call a `build()` method. Rather, each configuration step produces A) a new instance (copy on write) that B) is by itself already a `Retry`. The `Retry` + `xxxSpec` approach allows us to offer 2 standard strategies that both support transient error handling, while letting users write their own strategy (either as a standalone `Retry` concrete implementation, or as a builder/spec that builds one). Both specs allow to handle `transientErrors(boolean)`, which when true relies on the extra state exposed by the `RetrySignal`. For the simple case, this means that the remaining number of retries is reset in case of onNext. For the exponential case, this means retry delay is reset to minimum after an onNext (#1978). Additionally, the introduction of the specs allows us to add more features and support some features on more combinations, see below. * `filter` exceptions (#1905) Previously we could only filter exceptions to be retried on the simple long-based `retry` methods. With the specs we can `filter` in both immediate and exponential backoff retry strategies. * Add pre/post attempt hooks (#2063) The specs let the user configure two types of pre/post hooks. Note that if the retry attempt is denied (eg. we've reached the maximum number of attempts), these hooks are NOT executed. Synchronous hooks (`doBeforeRetry` and `doAfterRetry`) are side effects that should not block for too long and are executed right before and right after the retry trigger is sent by the companion publisher. Asynchronous hooks (`doBeforeRetryAsync` and `doAfterRetryAsync`) are composed into the companion publisher which generates the triggers, and they both delay the emission of said trigger in non-blocking and asynchronous fashion. Having pre and post hooks allows a user to better manage the order in which these asynchronous side effect should be performed. * Retry exhausted meaningful exception (#2052) The `Retry` function implemented by both spec throw a `RuntimeException` with a meaningful message when the configured maximum amount of attempts is reached. That exception can be pinpointed by calling the utility `Exceptions.isRetryExhausted` method. For further customization, users can replace that default with their own custom exception via `onRetryExhaustedThrow`. The BiFunction lets user access the Spec, which has public final fields that can be used to produce a meaningful message. * Ensure retry hooks completion is taken into account (#2064) The old `retryBackoff` would internally use a `flatMap`, which can cause issues. The Spec functions use `concatMap`. /!\ CAVEAT This commit deprecates all of the retryBackoff methods as well as the original `retryWhen` (based on Throwable companion publisher) in order to introduce the new `RetrySignal` based signature. The use of `Retry` explicit type lifts any ambiguity when using the Spec but using a lambda instead will raise some ambiguity at call sites of `retryWhen`. We deem that acceptable given that the migration is quite easy (turn `e -> whatever(e)` to `(Retry) rs -> whatever(rs.failure())`). Furthermore, `retryWhen` is an advanced operator, and we expect most uses to be combined with the retry builder in reactor-extra, which lifts the ambiguity itself.
72acb9f
to
754b5e9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, with a few minor comments/suggestions.
* @see RetryBackoffSpec#maxAttempts(long) | ||
* @see RetryBackoffSpec#minBackoff(Duration) | ||
*/ | ||
static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this have a marble diagram like the retryBackOff methods that are now deprecated and will go away? This will become the main entry point for the backoff strategy.
* @return the builder for further configuration | ||
* @see RetrySpec#maxAttempts(long) | ||
*/ | ||
static RetrySpec max(long max) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise here a marble diagram like on retry(long)
?
Retry is now an abstract class, which lift the lambda ambiguity (since this disallows lambda usage). To help expressing simple Retry strategies as lambdas, a Retry.from(Function) is provided. A couple of factories have been added: - indefinitely() is a RetrySpec configured to always immediately retry (but can be further configured) - fixedDelay(maxAttempts, duration) is a RetryBackoffSpec configured with a minBackoff equal to maxBackoff and no jitter, effectively emulating a fixed delay retry Additionally, a few polish are applied to the original PR: - javadocs have been improved - marble diagrams have been added to the Retry factories - for RetryBackoffSpec, one can now reset the default Scheduler by passing null. That default is also lazily resolved, making it compatible with StepVerifier#withVirtualTime
This big commit is a large refactor of the
retryWhen
operator in orderto add several features.
Fixes #1978
Fixes #1905
Fixes #2063
Fixes #2052
Fixes #2064
retryWhen
companion ([Enhancement] Retry backoff reset #1978)This introduces a retryWhen variant based on a
Retry
functionalinterface. This "function" deals not with a Flux of
Throwable
but ofRetrySignal
. This allows retry function to check if there was somesuccess (onNext) since last retry attempt, in which case the current
attempt can be interpreted as if this was the first ever error.
This is especially useful for cases where exponential backoff delays
should be reset, for long lived sequences that only see intermittent
bursts of errors (transient errors).
We take that opportunity to offer a builder for such a function that
could take transient errors into account.
Retry
buildersInspired by the
Retry
builder in addons, we introduce two classes:RetrySpec
andRetryBackoffSpec
. We name them Spec and not Builderbecause they don't require to call a
build()
method. Rather, eachconfiguration step produces A) a new instance (copy on write) that B)
is by itself already a
Retry
.The
Retry
+xxxSpec
approach allows us to offer 2 standardstrategies that both support transient error handling, while letting
users write their own strategy (either as a standalone
Retry
concreteimplementation, or as a builder/spec that builds one).
Both specs allow to handle
transientErrors(boolean)
, which when truerelies on the extra state exposed by the
RetrySignal
. For the simplecase, this means that the remaining number of retries is reset in case
of onNext. For the exponential case, this means retry delay is reset to
minimum after an onNext (#1978).
Additionally, the introduction of the specs allows us to add more
features and support some features on more combinations, see below.
filter
exceptions (Mono/Fluxretry
andretryBackoff
should support a filter for errors that are retried. #1905)Previously we could only filter exceptions to be retried on the simple
long-based
retry
methods. With the specs we canfilter
in bothimmediate and exponential backoff retry strategies.
The specs let the user configure two types of pre/post hooks.
Note that if the retry attempt is denied (eg. we've reached the maximum
number of attempts), these hooks are NOT executed.
Synchronous hooks (
doBeforeRetry
anddoAfterRetry
) are side effectsthat should not block for too long and are executed right before and
right after the retry trigger is sent by the companion publisher.
Asynchronous hooks (
doBeforeRetryAsync
anddoAfterRetryAsync
) arecomposed into the companion publisher which generates the triggers, and
they both delay the emission of said trigger in non-blocking and
asynchronous fashion. Having pre and post hooks allows a user to better
manage the order in which these asynchronous side effect should be
performed.
The
Retry
function implemented by both spec throw aRuntimeException
with a meaningful message when the configured maximum amount of attempts
is reached. That exception can be pinpointed by calling the utility
Exceptions.isRetryExhausted
method.For further customization, users can replace that default with their
own custom exception via
onRetryExhaustedThrow
. The BiFunction letsuser access the Spec, which has public final fields that can be
used to produce a meaningful message.
The old
retryBackoff
would internally use aflatMap
, which cancause issues. The Spec functions use
concatMap
./!\ CAVEAT
This commit deprecates all of the retryBackoff methods as well as the
original
retryWhen
(based on Throwable companion publisher) in orderto introduce the new
RetrySignal
based signature.The use of
Retry
explicit type lifts any ambiguity when using the Specbut using a lambda instead will raise some ambiguity at call sites of
retryWhen
.We deem that acceptable given that the migration is quite easy
(turn
e -> whatever(e)
to(Retry) rs -> whatever(rs.failure())
).Furthermore,
retryWhen
is an advanced operator, and we expect mostuses to be combined with the retry builder in reactor-extra, which lifts
the ambiguity itself.