diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java index 6003d5f3b0..3a5968f681 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientConfig.java @@ -574,8 +574,20 @@ static void addStreamHandlers( } if (responseTimeoutMillis > -1) { - Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler, - new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS)); + Connection conn = Connection.from(ch); + if (ch.pipeline().get(NettyPipeline.HttpMetricsHandler) != null) { + if (ch.pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) { + ch.pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler, + new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS)); + if (conn.isPersistent()) { + conn.onTerminate().subscribe(null, null, () -> conn.removeHandler(NettyPipeline.ResponseTimeoutHandler)); + } + } + } + else { + conn.addHandlerFirst(NettyPipeline.ResponseTimeoutHandler, + new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS)); + } } if (log.isDebugEnabled()) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java index dc98049c22..51a14ff49e 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/HttpClientOperations.java @@ -639,8 +639,19 @@ else if (markSentBody()) { } listener().onStateChange(this, HttpClientState.REQUEST_SENT); if (responseTimeout != null) { - addHandlerFirst(NettyPipeline.ResponseTimeoutHandler, - new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS)); + if (channel().pipeline().get(NettyPipeline.HttpMetricsHandler) != null) { + if (channel().pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) { + channel().pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler, + new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS)); + if (isPersistent()) { + onTerminate().subscribe(null, null, () -> removeHandler(NettyPipeline.ResponseTimeoutHandler)); + } + } + } + else { + addHandlerFirst(NettyPipeline.ResponseTimeoutHandler, + new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS)); + } } channel().read(); if (channel().parent() != null) { diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index c16beb01f4..36108b808f 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; @@ -98,6 +99,7 @@ import static reactor.netty.Metrics.DATA_RECEIVED_TIME; import static reactor.netty.Metrics.DATA_SENT; import static reactor.netty.Metrics.DATA_SENT_TIME; +import static reactor.netty.Metrics.ERROR; import static reactor.netty.Metrics.ERRORS; import static reactor.netty.Metrics.HTTP_CLIENT_PREFIX; import static reactor.netty.Metrics.HTTP_SERVER_PREFIX; @@ -730,12 +732,16 @@ void testServerConnectionsMicrometerConnectionClose(HttpProtocol[] serverProtoco assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue(); assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0); assertGauge(registry, SERVER_CONNECTIONS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0); + // https://github.com/reactor/reactor-netty/issues/3060 + assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1); } else { // make sure the client stream is closed on the server side before checking server metrics assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue(); assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(1); assertGauge(registry, SERVER_STREAMS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0); + // https://github.com/reactor/reactor-netty/issues/3060 + assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1); // in case of H2, the tearDown method will ensure client socket is closed on the server side } } @@ -828,6 +834,8 @@ void testServerConnectionsRecorderConnectionClose(HttpProtocol[] serverProtocols assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0); assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address); assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address); + // https://github.com/reactor/reactor-netty/issues/3060 + assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1); } else { assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue(); @@ -835,6 +843,8 @@ void testServerConnectionsRecorderConnectionClose(HttpProtocol[] serverProtocols assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0); assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address); assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address); + // https://github.com/reactor/reactor-netty/issues/3060 + assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1); // in case of H2, the tearDown method will ensure client socket is closed on the server side } } @@ -967,6 +977,27 @@ void testIssue2956(boolean isCustomRecorder, boolean isHttp2) throws Exception { } } + @ParameterizedTest + @MethodSource("httpCompatibleProtocols") + void testIssue3060ConnectTimeoutException(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, + @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + customizeClientOptions(httpClient, clientCtx, clientProtocols) + .remoteAddress(() -> new InetSocketAddress("1.1.1.1", 11111)) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10) + .doOnChannelInit((o, c, address) -> c.closeFuture().addListener(f -> latch.countDown())) + .post() + .uri("/1") + .send(ByteBufFlux.fromString(Mono.just("hello"))) + .responseContent() + .subscribe(); + + assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue(); + + String[] summaryTags = new String[]{REMOTE_ADDRESS, "1.1.1.1:11111", STATUS, ERROR}; + assertTimer(registry, CLIENT_CONNECT_TIME, summaryTags).hasCountEqualTo(1); + } + static Stream combinationsIssue2956() { return Stream.of( // isCustomRecorder, isHttp2