Skip to content

Commit

Permalink
Merge #3090 into 2.0.0-M4
Browse files Browse the repository at this point in the history
violetagg committed Mar 11, 2024
2 parents 7f35f88 + 21d7af7 commit feac1f7
Showing 3 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -562,8 +562,20 @@ else if (metricsRecorder instanceof ContextAwareHttpClientMetricsRecorder contex
}

if (responseTimeoutMillis > -1) {
Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
Connection conn = Connection.from(ch);
if (ch.pipeline().get(NettyPipeline.HttpMetricsHandler) != null) {
if (ch.pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) {
ch.pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
if (conn.isPersistent()) {
conn.onTerminate().subscribe(null, null, () -> conn.removeHandler(NettyPipeline.ResponseTimeoutHandler));
}
}
}
else {
conn.addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
}
}

if (log.isDebugEnabled()) {
Original file line number Diff line number Diff line change
@@ -616,8 +616,19 @@ else if (markSentBody()) {
}
listener().onStateChange(this, HttpClientState.REQUEST_SENT);
if (responseTimeout != null) {
addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
if (channel().pipeline().get(NettyPipeline.HttpMetricsHandler) != null) {
if (channel().pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) {
channel().pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
if (isPersistent()) {
onTerminate().subscribe(null, null, () -> removeHandler(NettyPipeline.ResponseTimeoutHandler));
}
}
}
else {
addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
}
}
channel().read();
if (channel().parent() != null) {
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.ChannelOption;
import io.netty5.channel.ChannelPipeline;
import io.netty5.channel.group.ChannelGroup;
import io.netty5.channel.group.DefaultChannelGroup;
@@ -90,25 +91,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static reactor.netty5.Metrics.CONNECTIONS_ACTIVE;
import static reactor.netty5.Metrics.CONNECTIONS_TOTAL;
import static reactor.netty5.Metrics.CONNECT_TIME;
import static reactor.netty5.Metrics.DATA_RECEIVED;
import static reactor.netty5.Metrics.DATA_RECEIVED_TIME;
import static reactor.netty5.Metrics.DATA_SENT;
import static reactor.netty5.Metrics.DATA_SENT_TIME;
import static reactor.netty5.Metrics.ERRORS;
import static reactor.netty5.Metrics.HTTP_CLIENT_PREFIX;
import static reactor.netty5.Metrics.HTTP_SERVER_PREFIX;
import static reactor.netty5.Metrics.LOCAL_ADDRESS;
import static reactor.netty5.Metrics.METHOD;
import static reactor.netty5.Metrics.REMOTE_ADDRESS;
import static reactor.netty5.Metrics.RESPONSE_TIME;
import static reactor.netty5.Metrics.STATUS;
import static reactor.netty5.Metrics.STREAMS_ACTIVE;
import static reactor.netty5.Metrics.TLS_HANDSHAKE_TIME;
import static reactor.netty5.Metrics.URI;
import static reactor.netty5.Metrics.formatSocketAddress;
import static reactor.netty5.Metrics.*;
import static reactor.netty5.micrometer.CounterAssert.assertCounter;
import static reactor.netty5.micrometer.DistributionSummaryAssert.assertDistributionSummary;
import static reactor.netty5.micrometer.GaugeAssert.assertGauge;
@@ -729,12 +712,16 @@ void testServerConnectionsMicrometerConnectionClose(HttpProtocol[] serverProtoco
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
assertGauge(registry, SERVER_CONNECTIONS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1);
}
else {
// make sure the client stream is closed on the server side before checking server metrics
assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(1);
assertGauge(registry, SERVER_STREAMS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}
}
@@ -827,13 +814,17 @@ void testServerConnectionsRecorderConnectionClose(HttpProtocol[] serverProtocols
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1);
}
else {
assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}
}
@@ -972,6 +963,27 @@ void testIssue2956(boolean isCustomRecorder, boolean isHttp2) throws Exception {
}
}

@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testIssue3060ConnectTimeoutException(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
customizeClientOptions(httpClient, clientCtx, clientProtocols)
.remoteAddress(() -> new InetSocketAddress("1.1.1.1", 11111))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10)
.doOnChannelInit((o, c, address) -> c.closeFuture().addListener(f -> latch.countDown()))
.post()
.uri("/1")
.send(BufferFlux.fromString(Mono.just("hello")))
.responseContent()
.subscribe();

assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();

String[] summaryTags = new String[]{REMOTE_ADDRESS, "1.1.1.1:11111", STATUS, ERROR};
assertTimer(registry, CLIENT_CONNECT_TIME, summaryTags).hasCountEqualTo(1);
}

static Stream<Arguments> combinationsIssue2956() {
return Stream.of(
// isCustomRecorder, isHttp2

0 comments on commit feac1f7

Please sign in to comment.