From 68499e45c767a2438a0d418a8396f064052ca597 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 2 Nov 2023 18:10:57 +0200 Subject: [PATCH] Ensure HttpServer connections.total metric is correct when connection close happens before connection open (#2958) Fixes #2956 --- .../AbstractChannelMetricsHandler.java | 10 +++- .../AbstractHttpServerMetricsHandler.java | 7 ++- .../netty/http/server/HttpServerConfig.java | 10 +++- .../netty/http/HttpMetricsHandlerTests.java | 49 +++++++++++++++++++ 4 files changed, 71 insertions(+), 5 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java b/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java index 0afad61c88..a872ae0cd0 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java +++ b/reactor-netty-core/src/main/java/reactor/netty/channel/AbstractChannelMetricsHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,8 @@ public abstract class AbstractChannelMetricsHandler extends ChannelDuplexHandler final boolean onServer; + boolean channelOpened; + protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, boolean onServer) { this.remoteAddress = remoteAddress; this.onServer = onServer; @@ -53,6 +55,7 @@ protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, b public void channelActive(ChannelHandlerContext ctx) { if (onServer) { try { + channelOpened = true; recorder().recordServerConnectionOpened(ctx.channel().localAddress()); } catch (RuntimeException e) { @@ -69,7 +72,10 @@ public void channelActive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) { if (onServer) { try { - recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + if (channelOpened) { + channelOpened = false; + recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + } } catch (RuntimeException e) { if (log.isWarnEnabled()) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java index 81de6f2570..66a6854e17 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/AbstractHttpServerMetricsHandler.java @@ -46,6 +46,7 @@ abstract class AbstractHttpServerMetricsHandler extends ChannelDuplexHandler { private static final Logger log = Loggers.getLogger(AbstractHttpServerMetricsHandler.class); boolean channelActivated; + boolean channelOpened; long dataReceived; @@ -78,6 +79,7 @@ public void channelActive(ChannelHandlerContext ctx) { if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) { try { // Always use the real connection local address without any proxy information + channelOpened = true; recorder().recordServerConnectionOpened(ctx.channel().localAddress()); } catch (RuntimeException e) { @@ -95,7 +97,10 @@ public void channelInactive(ChannelHandlerContext ctx) { if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) { try { // Always use the real connection local address without any proxy information - recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + if (channelOpened) { + channelOpened = false; + recorder().recordServerConnectionClosed(ctx.channel().localAddress()); + } } catch (RuntimeException e) { if (log.isWarnEnabled()) { diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java index 8936466f8b..704c73be3f 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/server/HttpServerConfig.java @@ -637,6 +637,7 @@ static void configureHttp11Pipeline(ChannelPipeline p, @Nullable BiPredicate compressPredicate, ServerCookieDecoder cookieDecoder, ServerCookieEncoder cookieEncoder, + boolean channelOpened, HttpRequestDecoderSpec decoder, HttpServerFormDecoderProvider formDecoderProvider, @Nullable BiFunction forwardedHeaderHandler, @@ -670,9 +671,12 @@ static void configureHttp11Pipeline(ChannelPipeline p, if (metricsRecorder != null) { if (metricsRecorder instanceof HttpServerMetricsRecorder) { - ChannelHandler handler = metricsRecorder instanceof ContextAwareHttpServerMetricsRecorder ? + AbstractHttpServerMetricsHandler handler = metricsRecorder instanceof ContextAwareHttpServerMetricsRecorder ? new ContextAwareHttpServerMetricsHandler((ContextAwareHttpServerMetricsRecorder) metricsRecorder, uriTagValue) : new HttpServerMetricsHandler((HttpServerMetricsRecorder) metricsRecorder, uriTagValue); + if (channelOpened) { + handler.channelOpened = true; + } p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler, handler); if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder) { // For sake of performance, we can remove the ChannelMetricsHandler because the MicrometerHttpServerMetricsRecorder @@ -1064,7 +1068,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { } if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) { - configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, + configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressPredicate, cookieDecoder, cookieEncoder, true, decoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle, maxKeepAliveRequests, metricsRecorder, minCompressionSize, uriTagValue); @@ -1160,6 +1164,7 @@ else if ((protocols & h11) == h11) { compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, + false, decoder, formDecoderProvider, forwardedHeaderHandler, @@ -1227,6 +1232,7 @@ else if ((protocols & h11) == h11) { compressPredicate(compressPredicate, minCompressionSize), cookieDecoder, cookieEncoder, + false, decoder, formDecoderProvider, forwardedHeaderHandler, diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java index 6e72b9b551..5bf898a395 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java @@ -926,6 +926,55 @@ void testServerConnectionsRecorderBadUriForwarded(HttpProtocol[] serverProtocols Function.identity()); } + + @ParameterizedTest + @MethodSource("combinationsIssue2956") + void testIssue2956(boolean isCustomRecorder, boolean isHttp2) throws Exception { + HttpServer server = + httpServer.secure(spec -> spec.sslContext(isHttp2 ? serverCtx2 : serverCtx11)) + .protocol(isHttp2 ? HttpProtocol.H2 : HttpProtocol.HTTP11) + .doOnConnection(conn -> ServerCloseHandler.INSTANCE.register(conn.channel())); + + if (isCustomRecorder) { + server = server.metrics(true, ServerRecorder.supplier()); + } + + disposableServer = server.bindNow(); + + httpClient.protocol(isHttp2 ? HttpProtocol.H2C : HttpProtocol.HTTP11) + .post() + .uri("/1") + .send(ByteBufFlux.fromString(Mono.just("hello"))) + .responseContent() + .aggregate() + .asString() + .as(StepVerifier::create) + .expectError() + .verify(Duration.ofSeconds(5)); + + assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue(); + + if (isCustomRecorder) { + assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0); + } + else { + InetSocketAddress sa = (InetSocketAddress) disposableServer.channel().localAddress(); + String serverAddress = sa.getHostString() + ":" + sa.getPort(); + String[] tags = new String[]{URI, HTTP, LOCAL_ADDRESS, serverAddress}; + checkGauge(SERVER_CONNECTIONS_TOTAL, false, 0, tags); + } + } + + static Stream combinationsIssue2956() { + return Stream.of( + // isCustomRecorder, isHttp2 + Arguments.of(false, false), + Arguments.of(false, true), + Arguments.of(true, false), + Arguments.of(true, true) + ); + } + private void testServerConnectionsRecorderBadUri(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols, @Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx, @Nullable String xForwardedFor, int xForwardedPort,