Skip to content

Commit

Permalink
toEntityFlux methods apply error status handling
Browse files Browse the repository at this point in the history
Closes gh-26069
  • Loading branch information
rstoyanchev committed Nov 12, 2020
1 parent 94fcb37 commit 42d3bc4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -534,25 +534,29 @@ public ResponseSpec onRawStatus(IntPredicate statusCodePredicate,
@Override
public <T> Mono<T> bodyToMono(Class<T> elementClass) {
Assert.notNull(elementClass, "Class must not be null");
return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementClass)));
return this.responseMono.flatMap(response ->
handleBodyMono(response, response.bodyToMono(elementClass)));
}

@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> elementTypeRef) {
Assert.notNull(elementTypeRef, "ParameterizedTypeReference must not be null");
return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementTypeRef)));
return this.responseMono.flatMap(response ->
handleBodyMono(response, response.bodyToMono(elementTypeRef)));
}

@Override
public <T> Flux<T> bodyToFlux(Class<T> elementClass) {
Assert.notNull(elementClass, "Class must not be null");
return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementClass)));
return this.responseMono.flatMapMany(response ->
handleBodyFlux(response, response.bodyToFlux(elementClass)));
}

@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementTypeRef) {
Assert.notNull(elementTypeRef, "ParameterizedTypeReference must not be null");
return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementTypeRef)));
return this.responseMono.flatMapMany(response ->
handleBodyFlux(response, response.bodyToFlux(elementTypeRef)));
}

@Override
Expand Down Expand Up @@ -585,18 +589,14 @@ public <T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference

@Override
public <T> Mono<ResponseEntity<Flux<T>>> toEntityFlux(Class<T> elementType) {
return this.responseMono.map(response ->
ResponseEntity.status(response.rawStatusCode())
.headers(response.headers().asHttpHeaders())
.body(response.bodyToFlux(elementType)));
return this.responseMono.flatMap(response ->
handlerEntityFlux(response, response.bodyToFlux(elementType)));
}

@Override
public <T> Mono<ResponseEntity<Flux<T>>> toEntityFlux(ParameterizedTypeReference<T> elementTypeReference) {
return this.responseMono.map(response ->
ResponseEntity.status(response.rawStatusCode())
.headers(response.headers().asHttpHeaders())
.body(response.bodyToFlux(elementTypeReference)));
public <T> Mono<ResponseEntity<Flux<T>>> toEntityFlux(ParameterizedTypeReference<T> elementTypeRef) {
return this.responseMono.flatMap(response ->
handlerEntityFlux(response, response.bodyToFlux(elementTypeRef)));
}

@Override
Expand All @@ -619,6 +619,16 @@ private <T> Publisher<T> handleBodyFlux(ClientResponse response, Flux<T> body) {
return (result != null ? result.flux().switchIfEmpty(body) : body);
}

private <T> Mono<? extends ResponseEntity<Flux<T>>> handlerEntityFlux(ClientResponse response, Flux<T> body) {
ResponseEntity<Flux<T>> entity = new ResponseEntity<>(
body.onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, exceptionWrappingFunction(response)),
response.headers().asHttpHeaders(),
response.rawStatusCode());

Mono<ResponseEntity<Flux<T>>> result = applyStatusHandlers(response);
return (result != null ? result.defaultIfEmpty(entity) : Mono.just(entity));
}

private <T> Function<Throwable, Mono<? extends T>> exceptionWrappingFunction(ClientResponse response) {
return t -> response.createException().flatMap(ex -> Mono.error(ex.initCause(t)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.springframework.core.NamedThreadLocal;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
Expand Down Expand Up @@ -443,6 +445,27 @@ public void onStatusHandlersDefaultHandlerIsLast() {
verify(predicate2).test(HttpStatus.BAD_REQUEST);
}

@Test // gh-26069
public void onStatusHandlersApplyForToEntityMethods() {

ClientResponse response = ClientResponse.create(HttpStatus.BAD_REQUEST).build();
given(exchangeFunction.exchange(any())).willReturn(Mono.just(response));

WebClient.ResponseSpec spec = this.builder.build().get().uri("/path").retrieve();

testStatusHandlerForToEntity(spec.toEntity(String.class));
testStatusHandlerForToEntity(spec.toEntity(new ParameterizedTypeReference<String>() {}));
testStatusHandlerForToEntity(spec.toEntityList(String.class));
testStatusHandlerForToEntity(spec.toEntityList(new ParameterizedTypeReference<String>() {}));
testStatusHandlerForToEntity(spec.toEntityFlux(String.class));
testStatusHandlerForToEntity(spec.toEntityFlux(new ParameterizedTypeReference<String>() {}));
}

private void testStatusHandlerForToEntity(Publisher<?> responsePublisher) {
StepVerifier.create(responsePublisher).expectError(WebClientResponseException.class).verify();
}


private ClientRequest verifyAndGetRequest() {
ClientRequest request = this.captor.getValue();
verify(this.exchangeFunction).exchange(request);
Expand Down

0 comments on commit 42d3bc4

Please sign in to comment.