Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When metrics are enabled and responseTimeout is configured, ensure the correct order for ChannelHandlers #3090

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,20 @@ static void addStreamHandlers(
}

if (responseTimeoutMillis > -1) {
Connection.from(ch).addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
Connection conn = Connection.from(ch);
if (ch.pipeline().get(NettyPipeline.HttpMetricsHandler) != null) {
if (ch.pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) {
ch.pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
if (conn.isPersistent()) {
conn.onTerminate().subscribe(null, null, () -> conn.removeHandler(NettyPipeline.ResponseTimeoutHandler));
}
}
}
else {
conn.addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeoutMillis, TimeUnit.MILLISECONDS));
}
}

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,19 @@ else if (markSentBody()) {
}
listener().onStateChange(this, HttpClientState.REQUEST_SENT);
if (responseTimeout != null) {
addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
if (channel().pipeline().get(NettyPipeline.HttpMetricsHandler) != null) {
if (channel().pipeline().get(NettyPipeline.ResponseTimeoutHandler) == null) {
channel().pipeline().addBefore(NettyPipeline.HttpMetricsHandler, NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
if (isPersistent()) {
onTerminate().subscribe(null, null, () -> removeHandler(NettyPipeline.ResponseTimeoutHandler));
}
}
}
else {
addHandlerFirst(NettyPipeline.ResponseTimeoutHandler,
new ReadTimeoutHandler(responseTimeout.toMillis(), TimeUnit.MILLISECONDS));
}
}
channel().read();
if (channel().parent() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -98,6 +99,7 @@
import static reactor.netty.Metrics.DATA_RECEIVED_TIME;
import static reactor.netty.Metrics.DATA_SENT;
import static reactor.netty.Metrics.DATA_SENT_TIME;
import static reactor.netty.Metrics.ERROR;
import static reactor.netty.Metrics.ERRORS;
import static reactor.netty.Metrics.HTTP_CLIENT_PREFIX;
import static reactor.netty.Metrics.HTTP_SERVER_PREFIX;
Expand Down Expand Up @@ -730,12 +732,16 @@ void testServerConnectionsMicrometerConnectionClose(HttpProtocol[] serverProtoco
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
assertGauge(registry, SERVER_CONNECTIONS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1);
}
else {
// make sure the client stream is closed on the server side before checking server metrics
assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(1);
assertGauge(registry, SERVER_STREAMS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/6").hasCountGreaterThanOrEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}
}
Expand Down Expand Up @@ -828,13 +834,17 @@ void testServerConnectionsRecorderConnectionClose(HttpProtocol[] serverProtocols
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1);
}
else {
assertThat(StreamCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(1);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
// https://github.com/reactor/reactor-netty/issues/3060
assertCounter(registry, CLIENT_ERRORS, REMOTE_ADDRESS, address, URI, "/7").hasCountGreaterThanOrEqualTo(1);
// in case of H2, the tearDown method will ensure client socket is closed on the server side
}
}
Expand Down Expand Up @@ -967,6 +977,27 @@ void testIssue2956(boolean isCustomRecorder, boolean isHttp2) throws Exception {
}
}

@ParameterizedTest
@MethodSource("httpCompatibleProtocols")
void testIssue3060ConnectTimeoutException(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
customizeClientOptions(httpClient, clientCtx, clientProtocols)
.remoteAddress(() -> new InetSocketAddress("1.1.1.1", 11111))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10)
.doOnChannelInit((o, c, address) -> c.closeFuture().addListener(f -> latch.countDown()))
.post()
.uri("/1")
.send(ByteBufFlux.fromString(Mono.just("hello")))
.responseContent()
.subscribe();

assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();

String[] summaryTags = new String[]{REMOTE_ADDRESS, "1.1.1.1:11111", STATUS, ERROR};
assertTimer(registry, CLIENT_CONNECT_TIME, summaryTags).hasCountEqualTo(1);
}

static Stream<Arguments> combinationsIssue2956() {
return Stream.of(
// isCustomRecorder, isHttp2
Expand Down