From 67bea790b6a0508e8829167ae31133eddc96f0d1 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Sun, 19 May 2019 14:38:01 +0300 Subject: [PATCH] workaround reactor-netty bug when connections are not closed on DisposableChannel close Signed-off-by: Maksym Ostroverkhov --- .../transport/netty/server/TcpServerTransport.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java index b7f60aa6c..427c76db2 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java @@ -17,6 +17,9 @@ package io.rsocket.transport.netty.server; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.DefaultEventExecutor; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.transport.ClientTransport; @@ -94,10 +97,11 @@ public static TcpServerTransport create(TcpServer server) { @Override public Mono start(ConnectionAcceptor acceptor, int mtu) { Objects.requireNonNull(acceptor, "acceptor must not be null"); - + ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor()); return server .doOnConnection( c -> { + group.add(c.channel()); c.addHandlerLast(new RSocketLengthCodec()); DuplexConnection connection; if (mtu > 0) { @@ -114,6 +118,9 @@ public Mono start(ConnectionAcceptor acceptor, int mtu) { acceptor.apply(connection).then(Mono.never()).subscribe(c.disposeSubscriber()); }) .bind() + .doOnNext( + disposableChannel -> + disposableChannel.channel().closeFuture().addListener(f -> group.close())) .map(CloseableChannel::new); } }