From 73edc10f114ad7e27eb48ba0fc0be044cb2350ac Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 6 Nov 2018 16:17:56 +0200 Subject: [PATCH] fix #495 Ensure all children channels are closed when invoking DisposableServer#dispose --- .../java/reactor/netty/tcp/TcpServerBind.java | 15 ++++++++-- .../netty/http/server/HttpServerTests.java | 30 +++++++++++++++++++ .../reactor/netty/tcp/TcpServerTests.java | 25 ++++++++++++++++ 3 files changed, 67 insertions(+), 3 deletions(-) diff --git a/src/main/java/reactor/netty/tcp/TcpServerBind.java b/src/main/java/reactor/netty/tcp/TcpServerBind.java index 9a11f5bbde..724b4a3787 100644 --- a/src/main/java/reactor/netty/tcp/TcpServerBind.java +++ b/src/main/java/reactor/netty/tcp/TcpServerBind.java @@ -216,9 +216,18 @@ public void onUncaughtException(Connection connection, Throwable error) { @Override public void onStateChange(Connection connection, State newState) { - if (newState == State.DISCONNECTING) { - if (connection.channel() - .isActive() && !connection.isPersistent()) { + Channel channel = connection.channel(); + if (newState == State.CONNECTED) { + channel.parent() + .closeFuture() + .addListener(future -> { + if (channel.isActive()) { + connection.dispose(); + } + }); + } + else if (newState == State.DISCONNECTING) { + if (channel.isActive() && !connection.isPersistent()) { connection.dispose(); } } diff --git a/src/test/java/reactor/netty/http/server/HttpServerTests.java b/src/test/java/reactor/netty/http/server/HttpServerTests.java index d037655718..862ae4f8d5 100644 --- a/src/test/java/reactor/netty/http/server/HttpServerTests.java +++ b/src/test/java/reactor/netty/http/server/HttpServerTests.java @@ -52,6 +52,7 @@ import org.testng.Assert; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.netty.ByteBufFlux; import reactor.netty.ChannelBindException; import reactor.netty.Connection; @@ -869,6 +870,35 @@ public void httpServerRequestConfigInjectAttributes() { assertThat(validate.get()).as("validate headers").isFalse(); } + @Test + public void testIssue495() { + MonoProcessor serverConnDisposed = MonoProcessor.create(); + + DisposableServer boundServer = + HttpServer.create() + .port(0) + .tcpConfiguration(tcpServer -> tcpServer.doOnConnection(serverConnection -> + serverConnection.onDispose() + .subscribe(serverConnDisposed))) + .handle((request, res) -> res.sendString(Mono.just("test"))) + .wiretap(true) + .bindNow(); + + String response = + HttpClient.create() + .addressSupplier(boundServer::address) + .wiretap(true) + .get() + .responseContent() + .aggregate() + .asString() + .block(); + + assertThat(response).isEqualTo("test"); + boundServer.disposeNow(); + serverConnDisposed.block(Duration.ofSeconds(30)); + } + private Object getValueReflection(Object obj, String fieldName, int superLevel) { try { Field field; diff --git a/src/test/java/reactor/netty/tcp/TcpServerTests.java b/src/test/java/reactor/netty/tcp/TcpServerTests.java index 0a9175eaf4..72e179054a 100644 --- a/src/test/java/reactor/netty/tcp/TcpServerTests.java +++ b/src/test/java/reactor/netty/tcp/TcpServerTests.java @@ -849,6 +849,31 @@ public void testEchoWithLineBasedFrameDecoder() throws Exception { latch.await(30, TimeUnit.SECONDS); } + @Test + public void testIssue495() { + MonoProcessor serverConnDisposed = MonoProcessor.create(); + + DisposableServer boundServer = + TcpServer.create() + .port(0) + .doOnConnection(serverConnection -> + serverConnection.onDispose() + .subscribe(serverConnDisposed)) + .wiretap(true) + .bindNow(); + + Connection clientConnection = + TcpClient.create() + .addressSupplier(boundServer::address) + .wiretap(true) + .connectNow(); + + boundServer.disposeNow(); + serverConnDisposed.block(Duration.ofSeconds(5)); + + clientConnection.disposeNow(); + } + private static class SimpleClient extends Thread { private final int port; private final CountDownLatch latch;