Skip to content

Commit

Permalink
Improve recording of cancellation signal in WebClient
Browse files Browse the repository at this point in the history
With its initial fix in gh-18444, the `WebClient` instrumentation would
record all CANCEL signals, including:

* when a `timeout` expires and the response has not been received
* when the client partially consumes the response body

Since the second use case is arguable intentional, this commit restricts
the instrumentation and thus avoids recording two events for a single
request in that case.

Closes gh-18444
  • Loading branch information
bclozel committed Apr 11, 2020
1 parent 73ca703 commit ce65305
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.boot.actuate.metrics.web.reactive.client;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
Expand Down Expand Up @@ -77,13 +78,15 @@ public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next)
}

private Mono<ClientResponse> instrumentResponse(ClientRequest request, Mono<ClientResponse> responseMono) {
final AtomicBoolean responseReceived = new AtomicBoolean();
return Mono.deferWithContext((ctx) -> responseMono.doOnEach((signal) -> {
if (signal.isOnNext() || signal.isOnError()) {
responseReceived.set(true);
Iterable<Tag> tags = this.tagProvider.tags(request, signal.get(), signal.getThrowable());
recordTimer(tags, getStartTime(ctx));
}
}).doFinally((signalType) -> {
if (SignalType.CANCEL.equals(signalType)) {
if (!responseReceived.get() && SignalType.CANCEL.equals(signalType)) {
Iterable<Tag> tags = this.tagProvider.tags(request, null, null);
recordTimer(tags, getStartTime(ctx));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.search.MeterNotFoundException;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -40,6 +41,7 @@
import org.springframework.web.reactive.function.client.WebClient;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -124,6 +126,23 @@ void filterWhenCancelThrownShouldRecordTimer() {
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "CLIENT_ERROR").timer().count())
.isEqualTo(1);
assertThatThrownBy(() -> this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "200").timer())
.isInstanceOf(MeterNotFoundException.class);
}

@Test
void filterWhenCancelAfterResponseThrownShouldNotRecordTimer() {
ClientRequest request = ClientRequest
.create(HttpMethod.GET, URI.create("https://example.com/projects/spring-boot")).build();
given(this.response.rawStatusCode()).willReturn(HttpStatus.OK.value());
Mono<ClientResponse> filter = this.filterFunction.filter(request, this.exchange);
StepVerifier.create(filter).expectNextCount(1).thenCancel().verify(Duration.ofSeconds(5));
assertThat(this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "200").timer().count()).isEqualTo(1);
assertThatThrownBy(() -> this.registry.get("http.client.requests")
.tags("method", "GET", "uri", "/projects/spring-boot", "status", "CLIENT_ERROR").timer())
.isInstanceOf(MeterNotFoundException.class);
}

@Test
Expand Down

0 comments on commit ce65305

Please sign in to comment.