Skip to content

Commit

Permalink
When metrics are enabled and responseTimeout is configured, ensure th…
Browse files Browse the repository at this point in the history
…e correct order for ChannelHandlers (#3090)

- Place ResponseTimeoutHandler before HttpMetricsHandler so that
is an error happened it can be recorded
- Add test to verify that errors with establishing a connection are recorded

Fixes #3060
  • Loading branch information
violetagg authored Mar 11, 2024
1 parent 7f47b89 commit 3caade4
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,20 @@ static void addStreamHandlers(
}

if (responseTimeoutMillis > -1) {
Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
Connection conn = Connection.from(ch);
if (ch.pipeline().get(NettyPipeline.HttpMetricsHandler) != null) {
if (ch.pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) {
ch.pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
if (conn.isPersistent()) {
conn.onTerminate().subscribe(null, null, () -> conn.removeHandler(NettyPipeline.ResponseTimeoutHandler));
}
}
}
else {
conn.addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
}
}

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,19 @@ else if (markSentBody()) {
}
listener().onStateChange(this, HttpClientState.REQUEST_SENT);
if (responseTimeout != null) {
addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
if (channel().pipeline().get(NettyPipeline.HttpMetricsHandler) != null) {
if (channel().pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) {
channel().pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
if (isPersistent()) {
onTerminate().subscribe(null, null, () -> removeHandler(NettyPipeline.ResponseTimeoutHandler));
}
}
}
else {
addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
}
}
channel().read();
if (channel().parent() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -98,6 +99,7 @@
import static reactor.netty.Metrics.DATA_RECEIVED_TIME;
import static reactor.netty.Metrics.DATA_SENT;
import static reactor.netty.Metrics.DATA_SENT_TIME;
import static reactor.netty.Metrics.ERROR;
import static reactor.netty.Metrics.ERRORS;
import static reactor.netty.Metrics.HTTP_CLIENT_PREFIX;
import static reactor.netty.Metrics.HTTP_SERVER_PREFIX;
Expand Down Expand Up @@ -730,12 +732,16 @@ void testServerConnectionsMicrometerConnectionClose(HttpProtocol[] serverProtoco
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
assertGauge(registry, SERVER_CONNECTIONS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1);
}
else {
// make sure the client stream is closed on the server side before checking server metrics
assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(1);
assertGauge(registry, SERVER_STREAMS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}
}
Expand Down Expand Up @@ -828,13 +834,17 @@ void testServerConnectionsRecorderConnectionClose(HttpProtocol[] serverProtocols
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1);
}
else {
assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}
}
Expand Down Expand Up @@ -967,6 +977,27 @@ void testIssue2956(boolean isCustomRecorder, boolean isHttp2) throws Exception {
}
}

@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testIssue3060ConnectTimeoutException(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
customizeClientOptions(httpClient, clientCtx, clientProtocols)
.remoteAddress(() -> new InetSocketAddress("1.1.1.1", 11111))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10)
.doOnChannelInit((o, c, address) -> c.closeFuture().addListener(f -> latch.countDown()))
.post()
.uri("/1")
.send(ByteBufFlux.fromString(Mono.just("hello")))
.responseContent()
.subscribe();

assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();

String[] summaryTags = new String[]{REMOTE_ADDRESS, "1.1.1.1:11111", STATUS, ERROR};
assertTimer(registry, CLIENT_CONNECT_TIME, summaryTags).hasCountEqualTo(1);
}

static Stream<Arguments> combinationsIssue2956() {
return Stream.of(
// isCustomRecorder, isHttp2
Expand Down

0 comments on commit 3caade4

Please sign in to comment.