Skip to content

Commit

Permalink
Ensure 4xx responses are reported by HttpServer metrics (#2149)
Browse files Browse the repository at this point in the history
Fixes #2145
  • Loading branch information
violetagg authored Apr 8, 2022
1 parent e521d61 commit fa5f2d1
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ else if (msg instanceof ByteBuf) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path),
ops.method().name(), ops.status().codeAsText().toString());
if (!ops.isHttp2()) {
if (!ops.isHttp2() && ops.hostAddress() != null) {
// This metric is not applicable for HTTP/2
// ops.hostAddress() == null when request decoding failed, in this case
// we do not report active connection, so we do not report inactive connection
recordInactiveConnection(ops);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
final BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle;
final HttpRequest nettyRequest;
final HttpResponse nettyResponse;
final String path;
final HttpHeaders responseHeaders;
final String scheme;

Function<? super String, Map<String, String>> paramsResolver;
String path;
Consumer<? super HttpHeaders> trailerHeadersConsumer;

volatile Context currentContext;
Expand Down Expand Up @@ -730,15 +730,14 @@ static long requestsCounter(Channel channel) {
return ((AtomicLong) ops.connection()).get();
}

@SuppressWarnings("FutureReturnValueIgnored")
static void sendDecodingFailures(
ChannelHandlerContext ctx,
ConnectionObserver listener,
boolean secure,
Throwable t,
Object msg) {

Connection conn = Connection.from(ctx.channel());

Throwable cause = t.getCause() != null ? t.getCause() : t;

if (log.isWarnEnabled()) {
Expand All @@ -747,26 +746,29 @@ static void sendDecodingFailures(

ReferenceCountUtil.release(msg);

HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0,
HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
cause instanceof TooLongFrameException ? HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE :
HttpResponseStatus.BAD_REQUEST);
response.headers()
.setInt(HttpHeaderNames.CONTENT_LENGTH, 0)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
ctx.writeAndFlush(response)
.addListener(ChannelFutureListener.CLOSE);

HttpRequest request = null;
if (msg instanceof HttpRequest) {
request = (HttpRequest) msg;
}
else {
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel());
if (ops instanceof HttpServerOperations) {
request = ((HttpServerOperations) ops).nettyRequest;
Connection ops = ChannelOperations.get(ctx.channel());
if (ops == null) {
Connection conn = Connection.from(ctx.channel());
if (msg instanceof HttpRequest) {
ops = new FailedHttpServerRequest(conn, listener, (HttpRequest) msg, response, secure);
ops.bind();
}
else {
ops = conn;
}
}
listener.onStateChange(new FailedHttpServerRequest(conn, listener, request, response, secure), REQUEST_DECODING_FAILED);

//"FutureReturnValueIgnored" this is deliberate
ctx.channel().writeAndFlush(response);

listener.onStateChange(ops, REQUEST_DECODING_FAILED);
}

/**
Expand Down Expand Up @@ -919,12 +921,22 @@ static final class FailedHttpServerRequest extends HttpServerOperations {
FailedHttpServerRequest(
Connection c,
ConnectionObserver listener,
@Nullable HttpRequest nettyRequest,
HttpRequest nettyRequest,
HttpResponse nettyResponse,
boolean secure) {
super(c, listener, nettyRequest, null, null, ServerCookieDecoder.STRICT, ServerCookieEncoder.STRICT,
DEFAULT_FORM_DECODER_SPEC, null, false, secure);
this.customResponse = nettyResponse;
String tempPath = "";
try {
tempPath = resolvePath(nettyRequest.uri());
}
catch (RuntimeException e) {
tempPath = "/bad-request";
}
finally {
this.path = tempPath;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,40 @@ void testIssue896() throws Exception {
checkCounter(CLIENT_ERRORS, summaryTags, true, 2);
}

// https://github.com/reactor/reactor-netty/issues/2145
@ParameterizedTest
@MethodSource("http11CompatibleProtocols")
void testBadRequest(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx) throws Exception {
CountDownLatch latch1 = new CountDownLatch(4); // expect to observe 2 server disconnect + 2 client disconnect events
AtomicReference<CountDownLatch> latchRef = new AtomicReference<>(latch1);
ConnectionObserver observerDisconnect = observeDisconnect(latchRef);

disposableServer = customizeServerOptions(httpServer, serverCtx, serverProtocols)
.httpRequestDecoder(spec -> spec.maxHeaderSize(32))
.childObserve(observerDisconnect)
.bindNow();

AtomicReference<SocketAddress> serverAddress = new AtomicReference<>();
httpClient = customizeClientOptions(httpClient, clientCtx, clientProtocols).doAfterRequest((req, conn) ->
serverAddress.set(conn.channel().remoteAddress())
).observe(observerDisconnect);

httpClient.get()
.uri("/max_header_size")
.responseSingle((res, byteBufMono) -> Mono.just(res.status().code()))
.as(StepVerifier::create)
.expectNext(413)
.expectComplete()
.verify(Duration.ofSeconds(30));

assertThat(latch1.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();

InetSocketAddress sa = (InetSocketAddress) serverAddress.get();

checkExpectationsBadRequest(sa.getHostString() + ":" + sa.getPort(), serverCtx != null);
}

private ConnectionObserver observeDisconnect(AtomicReference<CountDownLatch> latchRef) {
return (connection, state) -> {
if (state == ConnectionObserver.State.DISCONNECTING) {
Expand Down Expand Up @@ -741,6 +775,35 @@ private void checkExpectationsNonExisting(String serverAddress, int connIndex, i
}


private void checkExpectationsBadRequest(String serverAddress, boolean checkTls) {
String uri = "/max_header_size";
String[] timerTags1 = new String[] {URI, uri, METHOD, "GET", STATUS, "413"};
String[] summaryTags1 = new String[] {URI, uri};

checkTimer(SERVER_RESPONSE_TIME, timerTags1, 1);
checkTimer(SERVER_DATA_SENT_TIME, timerTags1, 1);
checkDistributionSummary(SERVER_DATA_SENT, summaryTags1, 1, 0);
checkCounter(SERVER_ERRORS, summaryTags1, false, 0);

timerTags1 = new String[] {REMOTE_ADDRESS, serverAddress, URI, uri, METHOD, "GET", STATUS, "413"};
String[] timerTags2 = new String[] {REMOTE_ADDRESS, serverAddress, URI, uri, METHOD, "GET"};
String[] timerTags3 = new String[] {REMOTE_ADDRESS, serverAddress, STATUS, "SUCCESS"};
summaryTags1 = new String[] {REMOTE_ADDRESS, serverAddress, URI, uri};
String[] summaryTags2 = new String[] {REMOTE_ADDRESS, serverAddress, URI, "http"};

checkTimer(CLIENT_RESPONSE_TIME, timerTags1, 1);
checkTimer(CLIENT_DATA_SENT_TIME, timerTags2, 1);
checkTimer(CLIENT_DATA_RECEIVED_TIME, timerTags1, 1);
checkTimer(CLIENT_CONNECT_TIME, timerTags3, 1);
if (checkTls) {
checkTlsTimer(CLIENT_TLS_HANDSHAKE_TIME, timerTags3, 1);
}
checkDistributionSummary(CLIENT_DATA_RECEIVED, summaryTags1, 1, 0);
checkCounter(CLIENT_ERRORS, summaryTags1, false, 0);
checkDistributionSummary(CLIENT_DATA_SENT, summaryTags2, 1, 118);
checkCounter(CLIENT_ERRORS, summaryTags2, false, 0);
}

HttpServer customizeServerOptions(HttpServer httpServer, @Nullable ProtocolSslContextSpec ctx, HttpProtocol[] protocols) {
return ctx == null ? httpServer.protocol(protocols) : httpServer.protocol(protocols).secure(spec -> spec.sslContext(ctx));
}
Expand Down Expand Up @@ -789,6 +852,17 @@ void checkGauge(String name, boolean exists, double expectedCount, String... tag
}
}

static Stream<Arguments> http11CompatibleProtocols() {
return Stream.of(
Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null),
Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11},
Named.of("Http11SslContextSpec", serverCtx11), Named.of("Http11SslContextSpec", clientCtx11)),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11},
Named.of("Http2SslContextSpec", serverCtx2), Named.of("Http11SslContextSpec", clientCtx11)),
Arguments.of(new HttpProtocol[]{HttpProtocol.H2C, HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null)
);
}

static Stream<Arguments> httpCompatibleProtocols() {
return Stream.of(
Arguments.of(new HttpProtocol[]{HttpProtocol.HTTP11}, new HttpProtocol[]{HttpProtocol.HTTP11}, null, null),
Expand Down

0 comments on commit fa5f2d1

Please sign in to comment.