Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose toEntityFlux methods in WebClient.ResponseSpec #26023

Closed
mplain opened this issue Nov 3, 2020 · 16 comments
Closed

Expose toEntityFlux methods in WebClient.ResponseSpec #26023

mplain opened this issue Nov 3, 2020 · 16 comments
Assignees
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: enhancement A general enhancement
Milestone

Comments

@mplain
Copy link

mplain commented Nov 3, 2020

Hello!

I have a simple application that acts as a proxy between a client and a server
It receives a client request and passes it over to the server, then passes the server response back to the client
There is no need to process the request/response body, or buffer them in memory
Here is my current code:

webClient
   .method(request.method)
   .uri(request.path)
   .headers { it.addAll(request.headers().asHttpHeaders()) }
   .body(request.bodyToFlux<DataBuffer>())
   .exchangeToMono { response ->
      ServerResponse
         .status(response.statusCode())
         .headers { it.addAll(response.headers().asHttpHeaders()) }
         .body(response.bodyToFlux<DataBuffer>())
   }

After upgrading to WebFlux 5.3.0 I had to replace the .exchange() method with .exchangeToMono(). This new method in turn calls .releaseIfNotConsumed(), and apparently passing the body as a Flux<DataBuffer> is not consuming it, so my client always gets a response with an empty body.

What is the proper way to address my use case?

P.S. I created an issue here due to this request

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Nov 3, 2020
@bclozel
Copy link
Member

bclozel commented Nov 3, 2020

Closing in favor of (if I'm not mistaken) your question on StackOverflow.

@bclozel bclozel closed this as completed Nov 3, 2020
@bclozel bclozel added for: stackoverflow A question that's better suited to stackoverflow.com status: invalid An issue that we don't feel is valid and removed status: waiting-for-triage An issue we've not yet triaged or decided on labels Nov 3, 2020
@bclozel
Copy link
Member

bclozel commented Nov 3, 2020

Sorry, I didn't see Rossen's comment. Reopening.

@bclozel bclozel reopened this Nov 3, 2020
@rstoyanchev rstoyanchev added in: web Issues in web modules (web, webmvc, webflux, websocket) type: enhancement A general enhancement and removed for: stackoverflow A question that's better suited to stackoverflow.com status: invalid An issue that we don't feel is valid labels Nov 3, 2020
@rstoyanchev rstoyanchev self-assigned this Nov 3, 2020
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Nov 3, 2020

Yes the new exchangeToMono and exchangeToFlux methods expect the body to be decoded inside the callback. However in this case it is only wrapped as ServerResponse and decoded later when that response is written.

The above should be straight forward with .retrieve() but there is currently no option for ResponseEntity<Flux<T>>. There is only an option for ResponseEntity<List<T>> but it should be the other way around. You can get always get a List<T> from a Flux while having only List<T> imposes buffering.

Now that exchange() is deprecated in 5.3 with #25751, having an option for retrieve().entityToFlux(..) looks essential in order to allow decoding the body in any way while also having access to the response status and headers.

@rstoyanchev rstoyanchev added this to the 5.3.1 milestone Nov 3, 2020
@rstoyanchev rstoyanchev changed the title What is the proper way to pass request/response body without processing it? (WebFlux 5.3.0) Expose toEntityFlux methods in WebClient.ResponseSpec Nov 3, 2020
@mplain
Copy link
Author

mplain commented Nov 3, 2020

@rstoyanchev Thank you for the quick response! 👍

If I may, a question regarding the implementation of .toEntityFlux()
There's a class WebClientUtils, it contains the underlying logic for .toEntity() and .toEntityList(), shared by both WebClient and ClientResponse
You did not add the logic of .toEntityFlux() to that class. Is that intentional? In order to reduce the incentive to use .exchange()?
Perhaps .exchangeToFlux(ClientResponse::toEntityFlux) could be useful...

(I noticed because I wanted to submit the corresponding Kotlin extension functions)

@rstoyanchev
Copy link
Contributor

@mplain I see no need for an additional method in WebClientUtils since the toEntityFlux methods are currently only exposed from one place and the logic is trivial.

@robotmrv
Copy link

@rstoyanchev
Is not toEntityFlux() method unsafe the same way as exchange()? (but in addition, to transfer real response you need to override default onStatus(Predicate, Function))
Why not just to un-deprecate exchange() as it fits current case?

@mplain
Copy link
Author

mplain commented Nov 10, 2020

@robotmrv My guess is that when users would use toEntityFlux, they'd intend to consume the flux
But currently when users use exchange, they often dont think about the body at all
So it's more about helping people not make accidental mistakes, and not about tying their hands
Any method exposing the flux of data buffers would need to be the user's responsibility...

@rstoyanchev
Copy link
Contributor

Indeed as @mplain said it's about providing more structure and avoiding accidental mistakes. The exchange() method is too wide open and in most cases it's used unnecessarily. For .retrieve() you have to specify how to decode the body and at this point in most cases you likely not use toEntityFlux. Even if you do, error responses are taken care of, and it's obvious that you need to consume the body. None of that is the case with exchange.

@mplain
Copy link
Author

mplain commented Nov 10, 2020

@rstoyanchev, actually, looking at this code, where are the exceptions taken care of?

		public <T> Mono<ResponseEntity<Flux<T>>> toEntityFlux(Class<T> elementType) {
			return this.responseMono.map(response ->
					ResponseEntity.status(response.rawStatusCode())
							.headers(response.headers().asHttpHeaders())
							.body(response.bodyToFlux(elementType)));
		}

Comparing to another method:

		public <T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> elementClass) {
			return this.responseMono.flatMap(response ->
					WebClientUtils.mapToEntityList(response,
							handleBodyFlux(response, response.bodyToFlux(elementClass))));
		}

Exceptions are handled in the handleBodyFlux method, which is not used in toEntityFlux...?

@rstoyanchev
Copy link
Contributor

You're right @mplain and I've created #26069.

@HaloFour
Copy link
Contributor

This deprecation of exchange() is also affecting my projects where I'm using a custom BodyExtractor to read a very large JSON array as a Flux<T> on which I transform the elements individually and return them as a Flux<T> to the client, to avoid having to buffer the data in memory. I also need to pass along any etag header from the upstream service back down to the client, so I end up translating the ClientResponse into a POJO that contains a Flux<T> of the body.

As far as I can tell I can't do this with exchangeToMono or exchangeToFlux. It continues to work with the deprecated exchange but I assume it's deprecated because you intend to remove it in the future.

For more info, see: #24951

@rstoyanchev
Copy link
Contributor

As far as I can tell I can't do this with exchangeToMono or exchangeToFlux

Can you clarify why?

@HaloFour
Copy link
Contributor

@rstoyanchev

Can you clarify why?

A simple attempt at refactoring is yielding an empty response body. I haven't figured out why and it's quite likely that I'm doing it wrong.

I'm trying to take the following:

.exchange()
.flatMap(response -> {
    if (statusCode.isError()) {
        return response.createException().flatMap(Mono::error);
    } else {
        var headers = response.headers().asHttpHeaders();
        var pointer = JsonPointer.compile("/response/entities");
        var body = response.body(streamingJsonBodyExtractor.toFlux(Entity.class, pointer));

        return Mono.just(new ResponseWrapper(headers, body));
    }
});

and I've attempted to refactor it into the following:

.exchangeToMono(response -> {
    if (statusCode.isError()) {
        return response.createException().flatMap(Mono::error);
    } else {
        var headers = response.headers().asHttpHeaders();
        var pointer = JsonPointer.compile("/response/entities");
        var body = response.body(streamingJsonBodyExtractor.toFlux(Entity.class, pointer));

        return Mono.just(new EntitiesResponse(headers, body));
    }
});

If I instead use exchangeToFlux and flow the Flux<Entity> to the response it appears to work, but I need to capture and communicate etag and cache headers to the client as well, which is why I'm wrapping the headers and body in this Mono<EntitiesResponse>.

@rstoyanchev
Copy link
Contributor

This is what the toEntityFlux methods introduced for this issue are for, if you need to wrap the decoded response.

That said what actually consumes the body downstream? I ask is wrapping like that and letting it be consumed later opens a possibility that it may not be consumed at all, for example in case of a cancellation signal.

@HaloFour
Copy link
Contributor

@rstoyanchev

This is what the toEntityFlux methods introduced for this issue are for, if you need to wrap the decoded response.

As far as I can tell I can't use those methods as they expect that the response being decoded is in a very specific format. In my case the response is application/json where a very large array of objects is embedded within a wrapper object. I would need a similar entityTo* method that could accept an arbitrary BodyExtractor<T>.

That said what actually consumes the body downstream?

The controller returns a Flux<T> and streams the response to the client.

@HaloFour
Copy link
Contributor

This is probably fodder for another issue but could I propose some combination of the following method(s) on ResponseSpec as well?

<T> Mono<ResponseEntity<T>> toEntity(BodyExtractor<T> extractor);
<T> Mono<ResponseEntity<T>> toEntity(Function<ClientResponse, Mono<T>> function);
<T> Mono<ResponseEntity<T>> toEntityFlux(Function<ClientResponse, Flux<T>> function);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

6 participants