Skip to content

Commit

Permalink
workaround reactor-netty bug when connections are not closed on Dispo…
Browse files Browse the repository at this point in the history
…sableChannel close

Signed-off-by: Maksym Ostroverkhov <[email protected]>
  • Loading branch information
mostroverkhov committed May 20, 2019
1 parent 762fc26 commit 67bea79
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package io.rsocket.transport.netty.server;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.rsocket.DuplexConnection;
import io.rsocket.fragmentation.FragmentationDuplexConnection;
import io.rsocket.transport.ClientTransport;
Expand Down Expand Up @@ -94,10 +97,11 @@ public static TcpServerTransport create(TcpServer server) {
@Override
public Mono<CloseableChannel> start(ConnectionAcceptor acceptor, int mtu) {
Objects.requireNonNull(acceptor, "acceptor must not be null");

ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor());
return server
.doOnConnection(
c -> {
group.add(c.channel());
c.addHandlerLast(new RSocketLengthCodec());
DuplexConnection connection;
if (mtu > 0) {
Expand All @@ -114,6 +118,9 @@ public Mono<CloseableChannel> start(ConnectionAcceptor acceptor, int mtu) {
acceptor.apply(connection).then(Mono.<Void>never()).subscribe(c.disposeSubscriber());
})
.bind()
.doOnNext(
disposableChannel ->
disposableChannel.channel().closeFuture().addListener(f -> group.close()))
.map(CloseableChannel::new);
}
}

0 comments on commit 67bea79

Please sign in to comment.