diff --git a/reactor-netty5-core/src/main/java/reactor/netty5/channel/AbstractChannelMetricsHandler.java b/reactor-netty5-core/src/main/java/reactor/netty5/channel/AbstractChannelMetricsHandler.java index c82a5e6a69..88afce6143 100644 --- a/reactor-netty5-core/src/main/java/reactor/netty5/channel/AbstractChannelMetricsHandler.java +++ b/reactor-netty5-core/src/main/java/reactor/netty5/channel/AbstractChannelMetricsHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,8 @@ public abstract class AbstractChannelMetricsHandler extends ChannelHandlerAdapte final boolean onServer; + boolean channelOpened; + protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, boolean onServer) { this.remoteAddress = remoteAddress; this.onServer = onServer; @@ -53,6 +55,7 @@ protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, b public void channelActive(ChannelHandlerContext ctx) { if (onServer) { try { + channelOpened = true; recorder().recordServerConnectionOpened(ctx.channel().localAddress()); } catch (RuntimeException e) { @@ -69,7 +72,10 @@ public void channelActive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) { if (onServer) { try { - recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + if (channelOpened) { + channelOpened = false; + recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + } } catch (RuntimeException e) { if (log.isWarnEnabled()) { diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/AbstractHttpServerMetricsHandler.java index 106ba69323..ddfbdf0e0b 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/AbstractHttpServerMetricsHandler.java @@ -46,6 +46,7 @@ abstract class AbstractHttpServerMetricsHandler extends ChannelHandlerAdapter { private static final Logger log = Loggers.getLogger(AbstractHttpServerMetricsHandler.class); boolean channelActivated; + boolean channelOpened; long dataReceived; @@ -78,6 +79,7 @@ public void channelActive(ChannelHandlerContext ctx) { if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) { try { // Always use the real connection local address without any proxy information + channelOpened = true; recorder().recordServerConnectionOpened(ctx.channel().localAddress()); } catch (RuntimeException e) { @@ -95,7 +97,10 @@ public void channelInactive(ChannelHandlerContext ctx) { if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) { try { // Always use the real connection local address without any proxy information - recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + if (channelOpened) { + channelOpened = false; + recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + } } catch (RuntimeException e) { if (log.isWarnEnabled()) { diff --git a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java index adba1d7c95..9639cee440 100644 --- a/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java +++ b/reactor-netty5-http/src/main/java/reactor/netty5/http/server/HttpServerConfig.java @@ -664,6 +664,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, boolean accessLogEnabled, @Nullable Function accessLog, @Nullable BiPredicate compressPredicate, + boolean channelOpened, HttpRequestDecoderSpec decoder, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, @@ -700,7 +701,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, if (metricsRecorder != null) { if (metricsRecorder instanceof HttpServerMetricsRecorder) { - ChannelHandler handler; + AbstractHttpServerMetricsHandler handler; if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder micrometerHttpServerMetricsRecorder) { handler = new MicrometerHttpServerMetricsHandler(micrometerHttpServerMetricsRecorder, uriTagValue); } @@ -710,6 +711,9 @@ else if (metricsRecorder instanceof ContextAwareHttpServerMetricsRecorder contex else { handler = new HttpServerMetricsHandler((HttpServerMetricsRecorder) metricsRecorder, uriTagValue); } + if (channelOpened) { + handler.channelOpened = true; + } p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler, handler); if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder) { // For sake of performance, we can remove the ChannelMetricsHandler because the MicrometerHttpServerMetricsRecorder @@ -1118,7 +1122,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { } if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { - configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressPredicate, + configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressPredicate, true, decoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests, metricsRecorder, minCompressionSize, readTimeout, requestTimeout, uriTagValue); @@ -1210,6 +1214,7 @@ else if ((protocols & h11) == h11) { accessLogEnabled, accessLog, compressPredicate(compressPredicate, minCompressionSize), + false, decoder, formDecoderProvider, forwardedHeaderHandler, @@ -1277,6 +1282,7 @@ else if ((protocols & h11) == h11) { accessLogEnabled, accessLog, compressPredicate(compressPredicate, minCompressionSize), + false, decoder, formDecoderProvider, forwardedHeaderHandler, diff --git a/reactor-netty5-http/src/test/java/reactor/netty5/http/HttpMetricsHandlerTests.java b/reactor-netty5-http/src/test/java/reactor/netty5/http/HttpMetricsHandlerTests.java index ff22de6f55..4879447db5 100644 --- a/reactor-netty5-http/src/test/java/reactor/netty5/http/HttpMetricsHandlerTests.java +++ b/reactor-netty5-http/src/test/java/reactor/netty5/http/HttpMetricsHandlerTests.java @@ -933,6 +933,55 @@ void testServerConnectionsRecorderBadUriForwarded(HttpProtocol[] serverProtocols Function.identity()); } + + @ParameterizedTest + @MethodSource("combinationsIssue2956") + void testIssue2956(boolean isCustomRecorder, boolean isHttp2) throws Exception { + HttpServer server = + httpServer.secure(spec -> spec.sslContext(isHttp2 ? serverCtx2 : serverCtx11)) + .protocol(isHttp2 ? HttpProtocol.H2 : HttpProtocol.HTTP11) + .doOnConnection(conn -> ServerCloseHandler.INSTANCE.register(conn.channel())); + + if (isCustomRecorder) { + server = server.metrics(true, ServerRecorder.supplier()); + } + + disposableServer = server.bindNow(); + + httpClient.protocol(isHttp2 ? HttpProtocol.H2C : HttpProtocol.HTTP11) + .post() + .uri("/1") + .send(BufferFlux.fromString(Mono.just("hello"))) + .responseContent() + .aggregate() + .asString() + .as(StepVerifier::create) + .expectError() + .verify(Duration.ofSeconds(5)); + + assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue(); + + if (isCustomRecorder) { + assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0); + } + else { + InetSocketAddress sa = (InetSocketAddress) disposableServer.channel().localAddress(); + String serverAddress = sa.getHostString() + ":" + sa.getPort(); + String[] tags = new String[]{URI, HTTP, LOCAL_ADDRESS, serverAddress}; + checkGauge(SERVER_CONNECTIONS_TOTAL, false, 0, tags); + } + } + + static Stream combinationsIssue2956() { + return Stream.of( + // isCustomRecorder, isHttp2 + Arguments.of(false, false), + Arguments.of(false, true), + Arguments.of(true, false), + Arguments.of(true, true) + ); + } + private void testServerConnectionsRecorderBadUri(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @Nullable String xForwardedFor, int xForwardedPort,