Skip to content

Commit

Permalink
Improve WebClient observations handling of CANCEL signal
Browse files Browse the repository at this point in the history
Prior to this commit, `WebClient` observations would be recorded as
aborted (with tags "outcome":"UNKNOWN", "status":"CLIENT_ERROR")
for use cases like this:

```
Flux<String> result = client.get()
    .uri("/path")
    .retrieve()
    .bodyToFlux(String.class)
    .take(1);
```

This is due to operators like `take` or `next` that consume *some*
`onNext` signals and then cancels the subscription before completion.
This means the subscriber is only partially interested in the response
and we should not count this as a client error.

This commit ensures that observations are only recorded as aborted if
the response was not published at the time the CANCEL signal was
received.

The code snippet above will now publish observations with
"outcome":"SUCCESS" and "status":"200" tags, for example.

Closes gh-30070
  • Loading branch information
bclozel committed Apr 7, 2023
1 parent cef597b commit 01f9788
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntPredicate;
Expand All @@ -37,6 +38,7 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.util.context.Context;

import org.springframework.core.ParameterizedTypeReference;
Expand Down Expand Up @@ -455,13 +457,16 @@ public Mono<ClientResponse> exchange() {
if (this.contextModifier != null) {
responseMono = responseMono.contextWrite(this.contextModifier);
}
final AtomicBoolean responseReceived = new AtomicBoolean();
return responseMono
.doOnNext(response -> responseReceived.set(true))
.doOnError(observationContext::setError)
.doOnCancel(() -> {
observationContext.setAborted(true);
.doFinally(signalType -> {
if (signalType == SignalType.CANCEL && !responseReceived.get()) {
observationContext.setAborted(true);
}
observation.stop();
})
.doOnTerminate(observation::stop)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, observation));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationHandler;
Expand All @@ -27,10 +30,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -59,7 +65,10 @@ class WebClientObservationTests {
void setup() {
ClientResponse mockResponse = mock();
when(mockResponse.statusCode()).thenReturn(HttpStatus.OK);
when(mockResponse.headers()).thenReturn(new MockClientHeaders());
when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty());
when(mockResponse.bodyToFlux(String.class)).thenReturn(Flux.just("first", "second"));
when(mockResponse.releaseBody()).thenReturn(Mono.empty());
given(this.exchangeFunction.exchange(this.request.capture())).willReturn(Mono.just(mockResponse));
this.builder = WebClient.builder().baseUrl("/base").exchangeFunction(this.exchangeFunction).observationRegistry(this.observationRegistry);
this.observationRegistry.observationConfig().observationHandler(new HeaderInjectingHandler());
Expand Down Expand Up @@ -114,6 +123,16 @@ void recordsObservationForCancelledExchange() {
.hasLowCardinalityKeyValue("status", "CLIENT_ERROR");
}

@Test
void recordsObservationForCancelledExchangeDuringResponse() {
StepVerifier.create(this.builder.build().get().uri("/path").retrieve().bodyToFlux(String.class).take(1))
.expectNextCount(1)
.expectComplete()
.verify(Duration.ofSeconds(5));
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
.hasLowCardinalityKeyValue("status", "200");
}

@Test
void setsCurrentObservationInReactorContext() {
ExchangeFilterFunction assertionFilter = new ExchangeFilterFunction() {
Expand All @@ -130,7 +149,7 @@ public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction chain
this.builder.filter(assertionFilter).build().get().uri("/resource/{id}", 42)
.retrieve().bodyToMono(Void.class)
.block(Duration.ofSeconds(10));
verifyAndGetRequest();
verifyAndGetRequest();
}

@Test
Expand Down Expand Up @@ -170,4 +189,29 @@ public boolean supportsContext(Observation.Context context) {
}
}

static class MockClientHeaders implements ClientResponse.Headers {

private HttpHeaders headers = new HttpHeaders();

@Override
public OptionalLong contentLength() {
return OptionalLong.empty();
}

@Override
public Optional<MediaType> contentType() {
return Optional.empty();
}

@Override
public List<String> header(String headerName) {
return Collections.emptyList();
}

@Override
public HttpHeaders asHttpHeaders() {
return this.headers;
}
}

}

0 comments on commit 01f9788

Please sign in to comment.