Skip to content

Commit

Permalink
Record cancelled client requests in WebClient
Browse files Browse the repository at this point in the history
Prior to this commit, cancelled client requests (for example as a result
of a `timeout()` reactor operator would not be recorded by Micrometer.

This commit instruments the cancelled signal for outgoing client
requests and assigns a status `CLIENT_ERROR`.
The cancellation can be intentional (triggering a timeout and falling
back on a faster alternative) or considered as an error. The intent
cannot be derived from the signal itself so we're considering it as a
client error.

Closes gh-18444
  • Loading branch information
bclozel committed Apr 10, 2020
1 parent a6d1f1c commit 3879a75
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,9 +37,9 @@ public Iterable<Tag> tags(ClientRequest request, ClientResponse response, Throwa
Tag method = WebClientExchangeTags.method(request);
Tag uri = WebClientExchangeTags.uri(request);
Tag clientName = WebClientExchangeTags.clientName(request);
return Arrays.asList(method, uri, clientName,
(response != null) ? WebClientExchangeTags.status(response) : WebClientExchangeTags.status(throwable),
WebClientExchangeTags.outcome(response));
Tag status = WebClientExchangeTags.status(response, throwable);
Tag outcome = WebClientExchangeTags.outcome(response);
return Arrays.asList(method, uri, clientName, status, outcome);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context;

import org.springframework.boot.actuate.metrics.AutoTimer;
Expand Down Expand Up @@ -71,16 +72,27 @@ public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next)
if (!this.autoTimer.isEnabled()) {
return next.exchange(request);
}
return next.exchange(request).doOnEach((signal) -> {
if (!signal.isOnComplete()) {
Long startTime = getStartTime(signal.getContext());
ClientResponse response = signal.get();
Throwable throwable = signal.getThrowable();
Iterable<Tag> tags = this.tagProvider.tags(request, response, throwable);
this.autoTimer.builder(this.metricName).tags(tags).description("Timer of WebClient operation")
.register(this.meterRegistry).record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
return next.exchange(request).as((responseMono) -> instrumentResponse(request, responseMono))
.subscriberContext(this::putStartTime);
}

private Mono<ClientResponse> instrumentResponse(ClientRequest request, Mono<ClientResponse> responseMono) {
return Mono.deferWithContext((ctx) -> responseMono.doOnEach((signal) -> {
if (signal.isOnNext() || signal.isOnError()) {
Iterable<Tag> tags = this.tagProvider.tags(request, signal.get(), signal.getThrowable());
recordTimer(tags, getStartTime(ctx));
}
}).doFinally((signalType) -> {
if (SignalType.CANCEL.equals(signalType)) {
Iterable<Tag> tags = this.tagProvider.tags(request, null, null);
recordTimer(tags, getStartTime(ctx));
}
}).subscriberContext(this::putStartTime);
}));
}

private void recordTimer(Iterable<Tag> tags, Long startTime) {
this.autoTimer.builder(this.metricName).tags(tags).description("Timer of WebClient operation")
.register(this.meterRegistry).record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}

private Long getStartTime(Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,33 @@ private static String extractPath(String url) {
return (path.startsWith("/") ? path : "/" + path);
}

/**
* Creates a {@code status} {@code Tag} derived from the
* {@link ClientResponse#statusCode()} of the given {@code response} if available, the
* thrown exception otherwise, or considers the request as Cancelled as a last resort.
* @param response the response
* @param throwable the exception
* @return the status tag
* @since 2.3.0
*/
public static Tag status(ClientResponse response, Throwable throwable) {
if (response != null) {
return Tag.of("status", String.valueOf(response.rawStatusCode()));
}
else if (throwable != null) {
return (throwable instanceof IOException) ? IO_ERROR : CLIENT_ERROR;
}
return CLIENT_ERROR;
}

/**
* Creates a {@code status} {@code Tag} derived from the
* {@link ClientResponse#statusCode()} of the given {@code response}.
* @param response the response
* @return the status tag
* @deprecated since 2.3.0 in favor of {@link #status(ClientResponse, Throwable)}
*/
@Deprecated
public static Tag status(ClientResponse response) {
return Tag.of("status", String.valueOf(response.rawStatusCode()));
}
Expand All @@ -90,7 +111,9 @@ public static Tag status(ClientResponse response) {
* client.
* @param throwable the exception
* @return the status tag
* @deprecated since 2.3.0 in favor of {@link #status(ClientResponse, Throwable)}
*/
@Deprecated
public static Tag status(Throwable throwable) {
return (throwable instanceof IOException) ? IO_ERROR : CLIENT_ERROR;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -87,4 +87,11 @@ void tagsWhenExceptionShouldReturnClientErrorStatus() {
Tag.of("clientName", "example.org"), Tag.of("status", "CLIENT_ERROR"), Tag.of("outcome", "UNKNOWN"));
}

@Test
void tagsWhenCancelledRequestShouldReturnClientErrorStatus() {
Iterable<Tag> tags = this.tagsProvider.tags(this.request, null, null);
assertThat(tags).containsExactlyInAnyOrder(Tag.of("method", "GET"), Tag.of("uri", "/projects/{project}"),
Tag.of("clientName", "example.org"), Tag.of("status", "CLIENT_ERROR"), Tag.of("outcome", "UNKNOWN"));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.springframework.boot.actuate.metrics.AutoTimer;
import org.springframework.http.HttpMethod;
Expand Down Expand Up @@ -73,7 +74,7 @@ void filterShouldRecordTimer() {
ClientRequest request = ClientRequest
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot")).build();
given(this.response.rawStatusCode()).willReturn(HttpStatus.OK.value());
this.filterFunction.filter(request, this.exchange).block(Duration.ofSeconds(30));
this.filterFunction.filter(request, this.exchange).block(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "200").timer().count()).isEqualTo(1);
}
Expand All @@ -84,7 +85,7 @@ void filterWhenUriTemplatePresentShouldRecordTimer() {
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot"))
.attribute(URI_TEMPLATE_ATTRIBUTE, "/projects/{project}").build();
given(this.response.rawStatusCode()).willReturn(HttpStatus.OK.value());
this.filterFunction.filter(request, this.exchange).block(Duration.ofSeconds(30));
this.filterFunction.filter(request, this.exchange).block(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/{project}", "status", "200").timer().count()).isEqualTo(1);
}
Expand All @@ -95,7 +96,7 @@ void filterWhenIoExceptionThrownShouldRecordTimer() {
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot")).build();
ExchangeFunction errorExchange = (r) -> Mono.error(new IOException());
this.filterFunction.filter(request, errorExchange).onErrorResume(IOException.class, (t) -> Mono.empty())
.block(Duration.ofSeconds(30));
.block(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "IO_ERROR").timer().count())
.isEqualTo(1);
Expand All @@ -107,7 +108,19 @@ void filterWhenExceptionThrownShouldRecordTimer() {
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot")).build();
ExchangeFunction exchange = (r) -> Mono.error(new IllegalArgumentException());
this.filterFunction.filter(request, exchange).onErrorResume(IllegalArgumentException.class, (t) -> Mono.empty())
.block(Duration.ofSeconds(30));
.block(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "CLIENT_ERROR").timer().count())
.isEqualTo(1);
}

@Test
void filterWhenCancelThrownShouldRecordTimer() {
ClientRequest request = ClientRequest
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot")).build();
given(this.response.rawStatusCode()).willReturn(HttpStatus.OK.value());
Mono<ClientResponse> filter = this.filterFunction.filter(request, this.exchange);
StepVerifier.create(filter).thenCancel().verify(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "CLIENT_ERROR").timer().count())
.isEqualTo(1);
Expand All @@ -120,7 +133,7 @@ void filterWhenExceptionAndRetryShouldNotCumulateRecordTime() {
ExchangeFunction exchange = (r) -> Mono.error(new IllegalArgumentException())
.delaySubscription(Duration.ofMillis(300)).cast(ClientResponse.class);
this.filterFunction.filter(request, exchange).retry(1)
.onErrorResume(IllegalArgumentException.class, (t) -> Mono.empty()).block(Duration.ofSeconds(30));
.onErrorResume(IllegalArgumentException.class, (t) -> Mono.empty()).block(Duration.ofSeconds(5));
Timer timer = this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "CLIENT_ERROR").timer();
assertThat(timer.count()).isEqualTo(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -86,24 +86,29 @@ void clientName() {
@Test
void status() {
given(this.response.rawStatusCode()).willReturn(HttpStatus.OK.value());
assertThat(WebClientExchangeTags.status(this.response)).isEqualTo(Tag.of("status", "200"));
assertThat(WebClientExchangeTags.status(this.response, null)).isEqualTo(Tag.of("status", "200"));
}

@Test
void statusWhenIOException() {
assertThat(WebClientExchangeTags.status(new IOException())).isEqualTo(Tag.of("status", "IO_ERROR"));
assertThat(WebClientExchangeTags.status(null, new IOException())).isEqualTo(Tag.of("status", "IO_ERROR"));
}

@Test
void statusWhenClientException() {
assertThat(WebClientExchangeTags.status(new IllegalArgumentException()))
assertThat(WebClientExchangeTags.status(null, new IllegalArgumentException()))
.isEqualTo(Tag.of("status", "CLIENT_ERROR"));
}

@Test
void statusWhenNonStandard() {
given(this.response.rawStatusCode()).willReturn(490);
assertThat(WebClientExchangeTags.status(this.response)).isEqualTo(Tag.of("status", "490"));
assertThat(WebClientExchangeTags.status(this.response, null)).isEqualTo(Tag.of("status", "490"));
}

@Test
void statusWhenCancelled() {
assertThat(WebClientExchangeTags.status(null, null)).isEqualTo(Tag.of("status", "CLIENT_ERROR"));
}

@Test
Expand Down

0 comments on commit 3879a75

Please sign in to comment.