diff --git a/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java b/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java index 1b4897b307..9761b3c785 100644 --- a/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java +++ b/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java @@ -16,6 +16,7 @@ package reactor.ipc.netty.http.client; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -132,11 +133,6 @@ static HttpOperations bindHttp(Channel channel, chunkedTransfer(true); } - @Override - protected boolean shouldEmitEmptyContext() { - return true; - } - @Override public HttpClientRequest addCookie(Cookie cookie) { if (!hasSentHeaders()) { @@ -260,6 +256,15 @@ protected void onInboundCancel() { channel().close(); } + @Override + protected void onInboundComplete() { + if (responseState == null) { + parentContext().fireContextError(new IOException("Connection closed prematurely")); + return; + } + super.onInboundComplete(); + } + @Override public HttpClientRequest header(CharSequence name, CharSequence value) { if (!hasSentHeaders()) { diff --git a/src/test/java/reactor/ipc/netty/channel/ChannelOperationsHandlerTest.java b/src/test/java/reactor/ipc/netty/channel/ChannelOperationsHandlerTest.java index 693ef80cd2..7ffa958554 100644 --- a/src/test/java/reactor/ipc/netty/channel/ChannelOperationsHandlerTest.java +++ b/src/test/java/reactor/ipc/netty/channel/ChannelOperationsHandlerTest.java @@ -18,7 +18,16 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -27,9 +36,11 @@ import reactor.core.publisher.Mono; import reactor.ipc.netty.FutureMono; import reactor.ipc.netty.NettyContext; +import reactor.ipc.netty.SocketUtils; import reactor.ipc.netty.http.client.HttpClient; import reactor.ipc.netty.http.client.HttpClientResponse; import reactor.ipc.netty.http.server.HttpServer; +import reactor.ipc.netty.resources.PoolResources; import reactor.test.StepVerifier; public class ChannelOperationsHandlerTest { @@ -81,4 +92,89 @@ private void doTestPrefetchSize(int writeBufferLowWaterMark, int writeBufferHigh assertThat(handler.prefetch == (handler.inner.requested - handler.inner.produced)).isTrue(); } + + @Test + public void testChannelInactiveThrowsAbortedException() throws Exception { + ExecutorService threadPool = Executors.newCachedThreadPool(); + + int abortServerPort = SocketUtils.findAvailableTcpPort(); + ConnectionAbortServer abortServer = new ConnectionAbortServer(abortServerPort); + + threadPool.submit(abortServer); + + if(!abortServer.await(10, TimeUnit.SECONDS)){ + throw new IOException("Fail to start test server"); + } + + Mono response = + HttpClient.create(ops -> ops.host("localhost") + .port(abortServerPort) + .poolResources(PoolResources.fixed("http", 1))) + .get("/", req -> req.sendString(Flux.just("a", "b", "c"))); + + StepVerifier.create(response) + .expectError() + .verify(Duration.ofSeconds(1)); + + abortServer.close(); + } + + private static final class ConnectionAbortServer extends CountDownLatch implements Runnable { + + private final int port; + private final ServerSocketChannel server; + private volatile boolean read = false; + private volatile Thread thread; + + private ConnectionAbortServer(int port) { + super(1); + this.port = port; + try { + server = ServerSocketChannel.open(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void run() { + try { + server.configureBlocking(true); + server.socket() + .bind(new InetSocketAddress(port)); + countDown(); + thread = Thread.currentThread(); + while (true) { + SocketChannel ch = server.accept(); + + while (true) { + int bytes = ch.read(ByteBuffer.allocate(256)); + if (bytes > 0) { + if (!read) { + read = true; + } + else { + ch.close(); + return; + } + } + } + } + } + catch (IOException e) { + } + } + + public void close() throws IOException { + Thread thread = this.thread; + if (thread != null) { + thread.interrupt(); + } + ServerSocketChannel server = this.server; + if (server != null) { + server.close(); + } + } + } }