-
Notifications
You must be signed in to change notification settings - Fork 353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
This PR drafts high-performant low-overhead reconnecting rsocket #685
Conversation
ea46b0d
to
2ae1c0b
Compare
Scheduler scheduler = Schedulers.parallel(); | ||
Predicate<Throwable> errorPredicate = DEFAULT_ERROR_PREDICATE; | ||
|
||
public Builder withSourceRSocket(Supplier<Mono<RSocket>> sourceSupplier) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just Mono here? That could be an instance of Mono.defer called externally. I always assumed Mono.defer taking Supplier was more about having a SAM and being able to call some sync block that returned e.g. Mono.just(...)
|
||
private void reconnect() { | ||
ThreadLocalRandom random = ThreadLocalRandom.current(); | ||
long nextRandomDelay = random.nextLong(backoffMinInMillis, backoffMaxInMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the backoff strategy be hidden behind a Flux without losing much? Or would that negate "high-performant low-overhead"? I note you mention "low object overhead" so assuming it's this.
Would the intention be to make this the default implementation (used in below code)? Or is there any likely functional differences?
|
import reactor.util.context.Context; | ||
|
||
@SuppressWarnings("unchecked") | ||
public class ReconnectingRSocket implements CoreSubscriber<RSocket>, RSocket, Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Runnable need to be part of the public API? Is this to remove one allocation of an anonymous Runnable instance?
Are you working against some higher level performance/allocation targets? Are those things that can be made visible. This seems cool, I can understand in concept why you're doing it, but maybe just not when is it good enough? |
@OlegDokuka putting it here just in case since I noticed the mention of addons with retry/backoff: there is now a |
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
d57e8e0
to
b07d742
Compare
I can't review, so here are a few initial comments:
Overall a promising direction. Personally I'm also interested in parallel to see if we can achieve the same with just Reactor Core operators. I've discussed with @simonbasle a potential |
I assume users will be writing something like rsocket.requestStream().retryBackoff(...).subscriber();
rsocket.requestStream().retryWhen(...).subscriber(); Every stream can be retried independently. If you mean that after the error we immediately try to reconnect using the specified strategy, then, in theory, I can do a lazy reconnection, but does it make any sense? I see the case when the lazy (lazy in this context is a reconnection which happens only when there is a need for that) reconnection happens immediately, so there is no backoff. As a subsequence of that, one can require reconnection with backoff, so it may complicate internals but it is possible.
Correct, this is a draft. Just to make sure it is useful. Once POC is good to go I will provide api identical to Retry from project reactor.
Indeed. Going to fix that.
I propagate exception because it is up to business logic whether it is necessary to retire it or not. Imagine flux of elements. In the middle of the stream, we observed an exception. IF we swallow it internally and then reconnect, the user will never know that the stream is not working anymore and itis necessary to call it again |
So let me see if I get the logic on request failure in if (this.parent.tryResubscribe(rSocket, t)) {
this.actual = null;
this.state = NONE;
} else {
done = true;
}
actual.onError(t); If the error is connection related, as per Assuming this is correct:
Okay, so this would be required in any scenario with reconnect then. That increases the need to use consistent retry API.
In the current proposal in Reactor Core there is Furthermore downstream retry logic would also need to check the type of error with something like: private static final Predicate<Throwable> DEFAULT_ERROR_PREDICATE =
throwable ->
throwable instanceof ClosedChannelException
|| throwable instanceof ConnectionCloseException
|| throwable instanceof ConnectionErrorException; This is also where a rsocket.requestStream().retry(ReconnectRetry.max(5)).subscriber(); |
After the discussion with @rstoyanchev, we ended up to derive common as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works great in my use case and testing. Thanks!
@OlegDokuka this can be closed now (superseded by #759)? |
@rstoyanchev yes |
This PR introduces High-Performant low-object overhead reconnecting rsocket, which could be useful for many engineering cases.
The work is in progress, but the draft is ready to review.
Plan:
flatMap
flatMapMany
implementationsRSocket#<logicalCall>
(e.g.RSocket#requestResponse.retryBackoff(...)
) should be followed with retry operator on logical streams, so it would make sense to have it built-in as well (so we will get rid of 3 more objects)Signed-off-by: Oleh Dokuka [email protected]