Skip to content

Commit

Permalink
improves ClientRSocketSession
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Nov 12, 2021
1 parent 9460cfc commit d7ff043
Showing 1 changed file with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.function.Tuple2;
Expand All @@ -58,6 +59,7 @@ public class ClientRSocketSession
final boolean cleanupStoreOnKeepAlive;
final ByteBuf resumeToken;
final String session;
final Disposable reconnectDisposable;

volatile Subscription s;
static final AtomicReferenceFieldUpdater<ClientRSocketSession, Subscription> S =
Expand Down Expand Up @@ -110,10 +112,22 @@ public ClientRSocketSession(
this.resumableConnection = resumableDuplexConnection;

resumableDuplexConnection.onClose().doFinally(__ -> dispose()).subscribe();
resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect);

this.reconnectDisposable =
resumableDuplexConnection.onActiveConnectionClosed().subscribe(this::reconnect);
}

void reconnect(int index) {
if (this.s == Operators.cancelledSubscription()) {
if (logger.isDebugEnabled()) {
logger.debug(
"Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting rejected since session is closed",
session,
index);
}
return;
}

keepAliveSupport.stop();
if (logger.isDebugEnabled()) {
logger.debug(
Expand Down Expand Up @@ -147,6 +161,8 @@ public void onImpliedPosition(long remoteImpliedPos) {
@Override
public void dispose() {
Operators.terminate(S, this);

reconnectDisposable.dispose();
resumableConnection.dispose();
resumableFramesStore.dispose();

Expand Down

0 comments on commit d7ff043

Please sign in to comment.