Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provides implementation for reconnect mono #229

Closed

Conversation

OlegDokuka
Copy link
Contributor

@OlegDokuka OlegDokuka commented Mar 16, 2020

This PR provides derived implementation of Reaconnectable MonoProcessor which allows reobtaining the value from upstream + doing reconnection in case of error.

Behaviour

  1. Subscribe to Mono and notify every Downstream subscriber about the value (the basic MonoProcessor behavior)
  2. Subscribe to Mono;
    • In case of complete - reconnect;
    • In case of error - reconnect;
    • Notify Subscribers only if value or Reconnect Exhausted
  3. Once the value is obtained there is a possibility to reset (or basically expire) cached value using
.build(/*applyResetOnValueHandler*/(value, callMeWhenResetRunnable) -> ...)

APi via given callMeWhenResetRunnable runnable.
4) Up on reset is called, the first subscription will be delayed depends on the time of the first subscription:
* Subscription happens immeditelly -> delay is defined by the min delay of retry strategy
* Subscription happens after some periode of time -> calculated delay is equal to subscriptionTime - resetTime (if negative -> subscribe immediatelly)

API

final ReconnectMono<RSocket> reconnectableSource =
					Mono.defer(monos::poll)
					    .as(Reconnect::fromSource)
					    .reconnectMax(Long.MAX_VALUE)
					    .exponentialBackoff(Duration.ofSeconds(minBackoff), Duration.ofSeconds(maxBackoff))
					    .timeout(Duration.ofSeconds(timeout))
					    .doOnReconnect(__ -> ...)
					    .build((rsocket, resetRunner) -> {
					        ...
					        rsocket.onClose().subscribe(null, null, resetRunner);
					    });

...
reconnectableSource.dispose()

API is derived from Retry + renaming and extra builder methods such as build
and Reconnect<T, C> doOnDispose(Consumer<? super T> onDispose); which allows disposing value in case of reconnectableSource disposure.

Why not Retry API

I tried to use Retry interface but then I figured out that the functionality that I need (and this functionality is basically calculateBackoff method is not coupled with Retry itself so I can not be sure that the given interface has a proper method and not just custom implementation. In addition, Reconnect does not require that Function<Flux, Flux> logic so it can be a standalone implementation which is builder only and extends a proper method from AbstractRetry

@OlegDokuka
Copy link
Contributor Author

@rstoyanchev

@OlegDokuka OlegDokuka force-pushed the feature/reconnecting-mono branch from 6d93e0c to 69308dc Compare March 19, 2020 08:21
@OlegDokuka
Copy link
Contributor Author

We just discussed with @simonbasle that it makes sense to get rid of Retry at all and let the user configure the retry logic for the upstream.

WDYT on that @rstoyanchev?

@rstoyanchev
Copy link

It's looking more and more like a candidate for an overloaded cache in core.

@OlegDokuka
Copy link
Contributor Author

Closing here and moves back to rsocket again :D

@OlegDokuka OlegDokuka closed this Mar 20, 2020
Comment on lines +110 to +111
actual.onNext(this.value);
actual.onComplete();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these two lines don't account for the possibility that the actual Subscriber hasn't necessarily immediately requested upon onSubscribe (even though this is usually the case for Monos).

this should be toAdd.complete(this.value);, as you correctly do in the complete() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Missed that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants