Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add auto reconnection option #987

Open
sergey-mesh opened this issue Feb 22, 2021 · 4 comments
Open

add auto reconnection option #987

sergey-mesh opened this issue Feb 22, 2021 · 4 comments
Milestone

Comments

@sergey-mesh
Copy link

sergey-mesh commented Feb 22, 2021

It would be convenient to have two options for setting up a connection: lazy on any first call and automatic when the server is available. For example, chat. It is desirable that the running client automatically connects to the server and shows the server availability as soon as possible.
The first option works great, but for the second it have to make an additional crutch.
Thanks to Oleg @OlegDokuka.
He helped in an optimal solution to this issue.

requester
                .rsocketClient()
                .source()
                .flatMap(rsocket -> rsocket.onClose())
                .repeat()
                .retryWhen(Retry.backoff(...))
                .subscribe();

This works great.
But it would be more convenient to implement a similar option out of the box. So that you can choose one of two options in the connection settings (lazy or auto-connect). It is even possible to set the condition for switching from one mode to another at runtime.

@ivangfr
Copy link

ivangfr commented Mar 9, 2021

Hey, I have a similar problem...

I've been working on this project https://github.com/ivangfr/springboot-rsocket-webflux-aop where I use rsocket.

The project is composed of movie-client-shell, movie-client-ui, and movie-server. Here is the Project Architecture where you can find a diagram that explains the communication among the services.

One problem that I see in my solution is that, once movie-server restarts, movie-client-ui doesn't connect to it anymore.

I have changed the MovieServerRSocketConfig of movie-client-ui service as @OlegDokuka suggested in the issue #984

Now, I am creating RSocketRequester this way

...
SocketAcceptor socketAcceptor = RSocketMessageHandler.responder(rSocketStrategies, movieClientUiController);

RSocketRequester rSocketRequester = rSocketRequesterBuilder
        .setupRoute("client.registration")
        .setupData(clientId)
        .rsocketStrategies(rSocketStrategies)
        .rsocketConnector(connector -> connector.acceptor(socketAcceptor))
        .transport(clientTransport);

rSocketRequester.rsocketClient()
        .source()
        .flatMap(RSocket::onClose)
        .repeat()
        .retryWhen(Retry.fixedDelay(120, Duration.ofSeconds(1)))
        .doOnError(error -> log.warn("Connection CLOSED"))
        .doFinally(consumer -> log.info("DISCONNECTED"))
        .subscribe();
...

However, it's not working.

Basically,

  1. I am starting movie-server and movie-client-ui.
  2. Once both are up and running, I restart movie-server. It takes around 10 seconds to shutdown and start.
  3. Now, both movie-server and movie-client-ui are up and running, however, movie-client-ui is not connected to movie-server.
  4. Two minutes later, I see the following exception in movie-client-ui. It looks like it's trying to connect without success.
    ERROR 17499 --- [     parallel-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped
    
    reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 120/120
    Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 120/120
    	at reactor.core.Exceptions.retryExhausted(Exceptions.java:290) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:374) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:295) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:884) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:305) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:119) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.3.jar:3.4.3]
    	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.3.jar:3.4.3]
    	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8080
    Caused by: java.net.ConnectException: Connection refused
    	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]
    	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) ~[na:na]
    	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)    ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
    	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    
    INFO 17499 --- [     parallel-1] c.m.m.config.MovieServerRSocketConfig    : DISCONNECTED
    

Update

Btw, I can start all services using the profile rsocket-websocket or rsocket-tcp.

The previous execution was using rsocket-websocket profile, that is why it's trying to connect to movie-server on port 8080

Running now the simulation using the rsocket-tcp, the retry exception is

ERROR 18727 --- [     parallel-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 120/120
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 120/120
	at reactor.core.Exceptions.retryExhausted(Exceptions.java:290) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:374) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:295) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:884) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:305) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:119) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.4.3.jar:3.4.3]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.4.3.jar:3.4.3]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:7000
Caused by: java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) ~[na:na]
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.59.Final.jar:4.1.59.Final]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

INFO 18727 --- [     parallel-1] c.m.m.config.MovieServerRSocketConfig    : DISCONNECTED

Now, trying to connect to movie-server port on 7000 (that seems correct),

Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:7000

@OlegDokuka
Copy link
Member

@ivangfr in your case you have not added the .reconnect(Retry...) bits to your setup, that is why you were not able to reconnect.
Please modify your setup like the following:

RSocketRequester rSocketRequester = rSocketRequesterBuilder
        .setupRoute("client.registration")
        .setupData(clientId)
        .rsocketStrategies(rSocketStrategies)
        .rsocketConnector(connector -> connector.acceptor(socketAcceptor).reconnect(Retry...))
        .transport(clientTransport);

Also, your setup works only for a specific transport, thus changing "on-flight" between websocket and tcp will not work at all, since in that case the client will be still trying to reconnect via the websocket transport

@ivangfr
Copy link

ivangfr commented Mar 10, 2021

Thanks @OlegDokuka

It works!

So, I needed to do the following config

        SocketAcceptor socketAcceptor = RSocketMessageHandler.responder(rSocketStrategies, movieClientUiController);
        RetryBackoffSpec retryBackoffSpec = Retry.fixedDelay(120, Duration.ofSeconds(1));

        RSocketRequester rSocketRequester = rSocketRequesterBuilder
                .setupRoute("client.registration")
                .setupData(clientId)
                .rsocketStrategies(rSocketStrategies)
                .rsocketConnector(connector -> connector.acceptor(socketAcceptor).reconnect(retryBackoffSpec))
                .transport(clientTransport);

        rSocketRequester.rsocketClient()
                .source()
                .flatMap(RSocket::onClose)
                .repeat()
                .retryWhen(retryBackoffSpec)
                .doOnError(error -> log.warn("Connection CLOSED"))
                .doFinally(consumer -> log.info("DISCONNECTED"))
                .subscribe();

In fact, I don't change the transport on the fly. As I described in the README, the user needs to pick (during the startup) the profile. It can be rsocket-tcp, rsocket-websocket or default.

So, if movie-server is started with rsocket-tcp, movie-client-ui must also be started withrsocket-tcp, as I observe in this comment

...
- movie-client-ui

Open a new terminal and, inside springboot-rsocket-webflux-aop root folder, run one of the following commands (it should match with the one picked to run movie-server)
...

Thanks again for the help!

@OlegDokuka
Copy link
Member

just a link to another example which shows the need for this feature (see #1039)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants