From ec4ab8018e4a7d578d12624dd518eed6eef6eab8 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Sun, 19 May 2019 14:36:18 +0300 Subject: [PATCH 1/9] remove stream subscription on cancel as hookFinally() callback is executed only on BaseSubscriber.cancel() Signed-off-by: Maksym Ostroverkhov --- gradle.properties | 2 +- rsocket-core/src/main/java/io/rsocket/RSocketServer.java | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/gradle.properties b/gradle.properties index d5b97cda1..3c59b8f4b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=0.12.2-RC3-SNAPSHOT +version=0.12.2-RC3-cleanup-on-cancel-SNAPSHOT diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index 451ed7b42..d6505b3cb 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -99,11 +99,8 @@ class RSocketServer implements ResponderRSocket { sendProcessor .doOnRequest( - r -> { - for (LimitableRequestPublisher lrp : sendingLimitableSubscriptions.values()) { - lrp.increaseInternalLimit(r); - } - }) + r -> + sendingLimitableSubscriptions.values().forEach(lrp -> lrp.increaseInternalLimit(r))) .transform(connection::send) .doFinally(this::handleSendProcessorCancel) .subscribe(null, this::handleSendProcessorError); @@ -535,7 +532,7 @@ private void handleCancelFrame(int streamId) { Subscription subscription = sendingSubscriptions.remove(streamId); if (subscription == null) { - subscription = sendingLimitableSubscriptions.get(streamId); + subscription = sendingLimitableSubscriptions.remove(streamId); } if (subscription != null) { From eaeee41086ea52746d5ba3e9a0f15cef346c8d96 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Sun, 19 May 2019 14:38:01 +0300 Subject: [PATCH 2/9] workaround reactor-netty bug when connections are not closed on DisposableChannel close Signed-off-by: Maksym Ostroverkhov --- .../transport/netty/server/TcpServerTransport.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java index b7f60aa6c..427c76db2 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java @@ -17,6 +17,9 @@ package io.rsocket.transport.netty.server; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.DefaultEventExecutor; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.transport.ClientTransport; @@ -94,10 +97,11 @@ public static TcpServerTransport create(TcpServer server) { @Override public Mono start(ConnectionAcceptor acceptor, int mtu) { Objects.requireNonNull(acceptor, "acceptor must not be null"); - + ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor()); return server .doOnConnection( c -> { + group.add(c.channel()); c.addHandlerLast(new RSocketLengthCodec()); DuplexConnection connection; if (mtu > 0) { @@ -114,6 +118,9 @@ public Mono start(ConnectionAcceptor acceptor, int mtu) { acceptor.apply(connection).then(Mono.never()).subscribe(c.disposeSubscriber()); }) .bind() + .doOnNext( + disposableChannel -> + disposableChannel.channel().closeFuture().addListener(f -> group.close())) .map(CloseableChannel::new); } } From 1b1efa9a6a013da19993e81d6ebd976252a8c802 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Mon, 20 May 2019 07:57:31 +0300 Subject: [PATCH 3/9] responder stream/channel: calculate initialRequestN before applying handler Signed-off-by: Maksym Ostroverkhov --- .../src/main/java/io/rsocket/RSocketServer.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index d6505b3cb..7606e5621 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -319,16 +319,14 @@ private void handleFrame(ByteBuf frame) { handleRequestN(streamId, frame); break; case REQUEST_STREAM: - handleStream( - streamId, - requestStream(payloadDecoder.apply(frame)), - RequestStreamFrameFlyweight.initialRequestN(frame)); + int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame); + Payload streamPayload = payloadDecoder.apply(frame); + handleStream(streamId, requestStream(streamPayload), streamInitialRequestN); break; case REQUEST_CHANNEL: - handleChannel( - streamId, - payloadDecoder.apply(frame), - RequestChannelFrameFlyweight.initialRequestN(frame)); + int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame); + Payload channelPayload = payloadDecoder.apply(frame); + handleChannel(streamId, channelPayload, channelInitialRequestN); break; case METADATA_PUSH: metadataPush(payloadDecoder.apply(frame)); From b061dcc01f8c16b3bbdf4e32c2ec3b0bc8a619d1 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Mon, 20 May 2019 08:32:56 +0300 Subject: [PATCH 4/9] disable LimitableRequestPublisher transport request coordination Signed-off-by: Maksym Ostroverkhov --- .../src/main/java/io/rsocket/RSocketClient.java | 13 +++---------- .../src/main/java/io/rsocket/RSocketServer.java | 9 +++------ 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index b4c9dbb20..9ffe61334 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -81,15 +81,8 @@ class RSocketClient implements RSocket { this.sendProcessor = new UnboundedProcessor<>(); connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer); - - sendProcessor - .doOnRequest( - r -> { - for (LimitableRequestPublisher lrp : senders.values()) { - lrp.increaseInternalLimit(r); - } - }) - .transform(connection::send) + connection + .send(sendProcessor) .doFinally(this::handleSendProcessorCancel) .subscribe(null, this::handleSendProcessorError); @@ -329,7 +322,7 @@ public void accept(long n) { .transform( f -> { LimitableRequestPublisher wrapped = - LimitableRequestPublisher.wrap(f, sendProcessor.available()); + LimitableRequestPublisher.wrap(f, Long.MAX_VALUE); // Need to set this to one for first the frame wrapped.request(1); senders.put(streamId, wrapped); diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index 7606e5621..8fb887418 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -97,11 +97,8 @@ class RSocketServer implements ResponderRSocket { // connections this.sendProcessor = new UnboundedProcessor<>(); - sendProcessor - .doOnRequest( - r -> - sendingLimitableSubscriptions.values().forEach(lrp -> lrp.increaseInternalLimit(r))) - .transform(connection::send) + connection + .send(sendProcessor) .doFinally(this::handleSendProcessorCancel) .subscribe(null, this::handleSendProcessorError); @@ -454,7 +451,7 @@ private void handleStream(int streamId, Flux response, int initialReque .transform( frameFlux -> { LimitableRequestPublisher payloads = - LimitableRequestPublisher.wrap(frameFlux, sendProcessor.available()); + LimitableRequestPublisher.wrap(frameFlux, Long.MAX_VALUE); sendingLimitableSubscriptions.put(streamId, payloads); payloads.request( initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN); From bc6308354558ff5bdba008dbda897d5d117f63dc Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Mon, 20 May 2019 08:36:33 +0300 Subject: [PATCH 5/9] update version Signed-off-by: Maksym Ostroverkhov --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 3c59b8f4b..6f53c60b6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=0.12.2-RC3-cleanup-on-cancel-SNAPSHOT +version=0.12.2-RC3-cleanup-on-cancel-rm-coordination-SNAPSHOT From 8e47f9fff33d643718162b83ff6b604fab6d0496 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Mon, 20 May 2019 09:02:48 +0300 Subject: [PATCH 6/9] disable rsocket-transport request coordination tests Signed-off-by: Maksym Ostroverkhov --- rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java | 2 ++ rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java index 2494cbbca..6c849ab36 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java @@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -214,6 +215,7 @@ public void testChannelRequestServerSideCancellation() { Assertions.assertThat(request.isDisposed()).isTrue(); } + @Ignore @Test(timeout = 2_000) @SuppressWarnings("unchecked") public void diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java index 9f2541975..db73fb3af 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java @@ -111,6 +111,7 @@ public Mono requestResponse(Payload payload) { assertThat("Subscription not cancelled.", cancelled.get(), is(true)); } + @Ignore @Test(timeout = 2_000) @SuppressWarnings("unchecked") public void @@ -152,6 +153,7 @@ public Flux requestStream(Payload payload) { Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); } + @Ignore @Test(timeout = 2_000) @SuppressWarnings("unchecked") public void From 69eb69c63071bee923efc6faa10ccd50b232ec1c Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Mon, 20 May 2019 12:45:24 +0300 Subject: [PATCH 7/9] add LimitableRequestPublisher.wrap(Flux) remove rsocket-transport request coordination tests Signed-off-by: Maksym Ostroverkhov --- .../main/java/io/rsocket/RSocketClient.java | 2 +- .../main/java/io/rsocket/RSocketServer.java | 2 +- .../internal/LimitableRequestPublisher.java | 4 + .../java/io/rsocket/RSocketClientTest.java | 31 ------- .../java/io/rsocket/RSocketServerTest.java | 90 ------------------- 5 files changed, 6 insertions(+), 123 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 9ffe61334..539e104b4 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -322,7 +322,7 @@ public void accept(long n) { .transform( f -> { LimitableRequestPublisher wrapped = - LimitableRequestPublisher.wrap(f, Long.MAX_VALUE); + LimitableRequestPublisher.wrap(f); // Need to set this to one for first the frame wrapped.request(1); senders.put(streamId, wrapped); diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java index 8fb887418..6ce5ef88d 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketServer.java @@ -451,7 +451,7 @@ private void handleStream(int streamId, Flux response, int initialReque .transform( frameFlux -> { LimitableRequestPublisher payloads = - LimitableRequestPublisher.wrap(frameFlux, Long.MAX_VALUE); + LimitableRequestPublisher.wrap(frameFlux); sendingLimitableSubscriptions.put(streamId, payloads); payloads.request( initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : initialRequestN); diff --git a/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java b/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java index 2eafd3d61..8adb7542a 100755 --- a/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/LimitableRequestPublisher.java @@ -56,6 +56,10 @@ public static LimitableRequestPublisher wrap(Publisher source, long pr return new LimitableRequestPublisher<>(source, prefetch); } + public static LimitableRequestPublisher wrap(Publisher source) { + return wrap(source, Long.MAX_VALUE); + } + @Override public void subscribe(CoreSubscriber destination) { synchronized (this) { diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java index 6c849ab36..c60dba312 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java @@ -28,18 +28,14 @@ import io.rsocket.exceptions.ApplicationErrorException; import io.rsocket.exceptions.RejectedSetupException; import io.rsocket.frame.*; -import io.rsocket.test.util.TestDuplexConnection; import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -215,33 +211,6 @@ public void testChannelRequestServerSideCancellation() { Assertions.assertThat(request.isDisposed()).isTrue(); } - @Ignore - @Test(timeout = 2_000) - @SuppressWarnings("unchecked") - public void - testClientSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() { - final Queue requests = new ConcurrentLinkedQueue<>(); - rule.connection.dispose(); - rule.connection = new TestDuplexConnection(); - rule.connection.setInitialSendRequestN(256); - rule.init(); - - rule.socket - .requestChannel( - Flux.generate(s -> s.next(EmptyPayload.INSTANCE)).doOnRequest(requests::add)) - .subscribe(); - - int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL); - - assertThat("Unexpected error.", rule.errors, is(empty())); - - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, 2)); - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE)); - Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); - } - public int sendRequestResponse(Publisher response) { Subscriber sub = TestSubscriber.create(); response.subscribe(sub); diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java index db73fb3af..32c0406b9 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketServerTest.java @@ -29,17 +29,12 @@ import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; import java.util.Collection; -import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.assertj.core.api.Assertions; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; -import org.mockito.Mockito; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class RSocketServerTest { @@ -111,91 +106,6 @@ public Mono requestResponse(Payload payload) { assertThat("Subscription not cancelled.", cancelled.get(), is(true)); } - @Ignore - @Test(timeout = 2_000) - @SuppressWarnings("unchecked") - public void - testServerSideRequestStreamShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() { - final int streamId = 5; - final Queue received = new ConcurrentLinkedQueue<>(); - final Queue requests = new ConcurrentLinkedQueue<>(); - - rule.setAcceptingSocket( - new AbstractRSocket() { - @Override - public Flux requestStream(Payload payload) { - return Flux.generate(s -> s.next(payload.retain())).doOnRequest(requests::add); - } - }, - 256); - - rule.sendRequest(streamId, FrameType.REQUEST_STREAM); - - assertThat("Unexpected error.", rule.errors, is(empty())); - - Subscriber next = rule.connection.getSendSubscribers().iterator().next(); - - Mockito.doAnswer( - invocation -> { - received.add(invocation.getArgument(0)); - - if (received.size() == 256) { - throw new RuntimeException(); - } - - return null; - }) - .when(next) - .onNext(Mockito.any()); - - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE)); - Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); - } - - @Ignore - @Test(timeout = 2_000) - @SuppressWarnings("unchecked") - public void - testServerSideRequestChannelShouldNotHangInfinitelySendingElementsAndShouldProduceDataValuingConnectionBackpressure() { - final int streamId = 5; - final Queue received = new ConcurrentLinkedQueue<>(); - final Queue requests = new ConcurrentLinkedQueue<>(); - - rule.setAcceptingSocket( - new AbstractRSocket() { - @Override - public Flux requestChannel(Publisher payload) { - return Flux.generate(s -> s.next(EmptyPayload.INSTANCE)) - .doOnRequest(requests::add); - } - }, - 256); - - rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL); - - assertThat("Unexpected error.", rule.errors, is(empty())); - - Subscriber next = rule.connection.getSendSubscribers().iterator().next(); - - Mockito.doAnswer( - invocation -> { - received.add(invocation.getArgument(0)); - - if (received.size() == 256) { - throw new RuntimeException(); - } - - return null; - }) - .when(next) - .onNext(Mockito.any()); - - rule.connection.addToReceivedBuffer( - RequestNFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId, Integer.MAX_VALUE)); - Assertions.assertThat(requests).containsOnly(1L, 2L, 253L); - } - public static class ServerSocketRule extends AbstractSocketRule { private RSocket acceptingSocket; From f056c188f5a28a8f0c91ee8bb222b3bbb2600c2b Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Mon, 20 May 2019 13:52:05 +0300 Subject: [PATCH 8/9] use correct RSocket version Signed-off-by: Maksym Ostroverkhov --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 6f53c60b6..d5b97cda1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,4 +11,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=0.12.2-RC3-cleanup-on-cancel-rm-coordination-SNAPSHOT +version=0.12.2-RC3-SNAPSHOT From ea266c1950bfbad5bc6be2de50d25d27b35e8296 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Tue, 21 May 2019 07:54:46 +0300 Subject: [PATCH 9/9] Revert "workaround reactor-netty bug when connections are not closed on DisposableChannel close" This reverts commit 67bea790 Signed-off-by: Maksym Ostroverkhov --- .../transport/netty/server/TcpServerTransport.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java index 427c76db2..b7f60aa6c 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java @@ -17,9 +17,6 @@ package io.rsocket.transport.netty.server; import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.util.concurrent.DefaultEventExecutor; import io.rsocket.DuplexConnection; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.transport.ClientTransport; @@ -97,11 +94,10 @@ public static TcpServerTransport create(TcpServer server) { @Override public Mono start(ConnectionAcceptor acceptor, int mtu) { Objects.requireNonNull(acceptor, "acceptor must not be null"); - ChannelGroup group = new DefaultChannelGroup(new DefaultEventExecutor()); + return server .doOnConnection( c -> { - group.add(c.channel()); c.addHandlerLast(new RSocketLengthCodec()); DuplexConnection connection; if (mtu > 0) { @@ -118,9 +114,6 @@ public Mono start(ConnectionAcceptor acceptor, int mtu) { acceptor.apply(connection).then(Mono.never()).subscribe(c.disposeSubscriber()); }) .bind() - .doOnNext( - disposableChannel -> - disposableChannel.channel().closeFuture().addListener(f -> group.close())) .map(CloseableChannel::new); } }