Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce token-based consumption of multipart requests in WebFlux #28006

Closed
poutsma opened this issue Feb 4, 2022 · 8 comments
Closed

Introduce token-based consumption of multipart requests in WebFlux #28006

poutsma opened this issue Feb 4, 2022 · 8 comments
Assignees
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: enhancement A general enhancement
Milestone

Comments

@poutsma
Copy link
Contributor

poutsma commented Feb 4, 2022

In version 5.3, Spring Framework introduced the DefaultPartHttpMessageReader as a fully reactive way to handle multipart upload requests. One of the features of this message reader is streaming mode, where the contents of the uploaded parts is not stored in memory or on disk, but directly passed on to the subscriber (in the form of DataBuffers ). This feature is particularly useful for server proxies, where the controller passes on the data buffers to another service, for instance by using WebClient.

However, there is a problem in the way streaming mode deals with back pressure. In streams produces by other HttpMessageReaders and codecs, there is a clear relationship between the requests made by the subscriber and the request made against the incoming buffer stream (for instance, the ByteBufferDecoder creates one ByteBuffer for each incoming DataBuffer). When DefaultPartHttpMessageReader is used in streaming mode, there is no such relationship, because each Part produced can consists of multiple databuffers. Effectively, there are two kinds of back pressure in streaming mode:

  1. the back pressure of the Part elements, i.e. the 'outer' flux
  2. the back pressure of the DataBuffer contents of each part, i.e. the 'inner' flux.

There are several scenarios to consider:

  • What should happen when a request for a second part comes in, while the contents of the first has not been consumed yet?
  • What should happen when the inner flux is canceled, while the outer flux is not?
  • What should happen when flatMap is used on the Part stream?
  • How should prefetch be used?
  • etc. etc.

Though I am sure we can come up with answers to these questions, the fact remains that in streaming scenarios, representing multipart data as Flux<Part> where each part contains a Flux<DataBuffer> has inherent difficulties with back pressure. Instead, we should introduce an alternative way to consume multipart data, a better fit for streaming scenarios.

Part tokens

Instead Part elements, the multipart upload is represented by a stream of part tokens. Each part in the multipart request is represented by a header token that contains the part headers, followed by one or more body tokens with data buffers containing the part body. Subsequent parts will result in another header token, followed by more body tokens, and so on.

For instance, a multipart message with a form field and a file will produce the following tokens:

  1. header token containing the headers of the form field part
  2. a body token containing the form field value
  3. header token containing the headers of the file part
  4. multiple body tokens containing buffers with the contents of the file

Using part tokens, there is a direct relationship between back pressure of the the token subscriber and that of the buffer input. In the case of body tokens, this even is a 1-on-1 relationship.

For instance, here is a typical controller method that splits the tokens into multiples fluxes that start with a header token, so that each inner flux contains the tokens of one part. The headers from said token can then be used if necessary, and the remaining body tokens can be used as well.

@PostMapping("/tokens")
Flux<String> tokens(@RequestBody Flux<PartToken> tokens) {
	return tokens
			.windowUntil(token -> token instanceof PartToken.Headers, true)  // Flux<Flux<PartToken>>
			.concatMap(t -> t.switchOnFirst((signal, partTokens) -> {
				if (signal.hasValue()) {
					PartToken.Headers headersToken = (PartToken.Headers) signal.get();
					HttpHeaders headers = headersToken.headers();
					// Use info in headers if necessary
					Flux<DataBuffer> bodyBuffers = partTokens
							.filter(token -> token instanceof PartToken.Body)
							.cast(PartToken.Body.class)
							.map(PartToken.Body::buffer);
					// Send body buffers to other service
					return Mono.empty();
				}
				else {
					return releaseBody(partTokens)
							.then(Mono.empty());
				}
			}));

}
@poutsma poutsma self-assigned this Feb 4, 2022
@poutsma poutsma added in: web Issues in web modules (web, webmvc, webflux, websocket) type: enhancement A general enhancement labels Feb 4, 2022
@poutsma
Copy link
Contributor Author

poutsma commented Feb 4, 2022

As part of this effort, we would also deprecate the current streaming support in DefaultPartHttpMessageReader.

@poutsma
Copy link
Contributor Author

poutsma commented Feb 9, 2022

After considering the way (Reactor) Netty deals with multipart streaming, we have come up with an alternative to the token-based approach described above, so that headers and body contents are merged into one PartData object.

Part Data objects

In this design, each part in a multipart HTTP message produces at least one PartData object containing both headers and a DataBuffer with data of the part. If the part is large enough to be split across multiple buffers (i.e. a file upload), the first PartData will be followed by subsequent objects. The final PartData for a particular part will have its
isLast property set to true.

For instance, a multipart message with a form field and a file will produce the following tokens:

  1. a data object containing the headers and data of the form field part. isLast is true.
  2. a data object containing header token containing headers and first buffer of data of the file part. isLast is false.
  3. multiple data object tokens containing headers and buffers with the subsequent contents of the file
  4. a data object containing header token containing headers and final buffer of data of the file part. isLast is true.

@djouvin
Copy link

djouvin commented Feb 12, 2022

The second approach with a unified PartData (we could call it alternatively PartFragment) seems indeed more appealing and polyvalent. And it is simpler in design, thus probably more robust.

However, I think there is a way to still have a composite approach with an outer Flux<Part> (producing a Flux<DataBuffer> for each part's content), while still maintaining a correct relationship between the demand and the incoming data buffers, at least for the inner flux (part content).
For the outer flux, quantitative demand is not really useful anyway since parts may have completely different sizes. The Flux<Part> should work with concatMap but also with flatMap : the difference is that with concatMap the next part consumption would wait the whole previous part content pipeline to complete, whereas with flatMap, it is sufficient that the part content is fully produced to the next operator, but the content can be "in transit / in processing" in the content processing pipeline while the second part is produced (thus parallelizing a bit more the processing).

The conditions for this composite approach to work is to maintain the following predicates true:

  • part demand and part content demand must not be mixed and must be processed differently
  • a part content flux has to be subscribed before any other part is processed
    • the corollary is that the next part must not be delivered by the producer without the consumer having fully consumed, or explicitly cancelled the previous part's content flux : the part should thus expose an explicit cancel or dispose method (for example, by implementing the Disposable interface). We are bending here a little the reactive streams contract semantics (part content are in a way "pre-subscribed"), but there is no other way to ensure parts are not skipped unintentionally by a prefetching operator
    • prefetch should not be used for part consumption on the outer flux, as it will never be honored by the producer, but it can be used for part content consumption on the inner flux (which behaves as a regular Flux<DataBuffer>)
    • the next part should be delivered to the outer flux consumer :
      • when the previous part's content delivery is complete,
      • or when all subscriptions to this part content are cancelled,
      • or when the part itself is cancelled or disposed,
    • and of course part demand is still there and the outer flux itself is not cancelled
  • part content delivery should honore content demand as any data buffer flux would, until of course the end of the part is encountered (then the content flux is completed and the potentially remaining prefetched data buffer are retained by the producer)

I agree that the composite approach implementation is more complex, and I am not sure it is always interesting to have an outer Flux<Part>, because most of the time an HTTP stream has only one, or just a few, parts (so viewing parts as a Flux is not a must). It does however fit well with flatMap and concatMap operators.
And, it can be build on the PartData approach too (as it is now with the PartToken generator) : consumers would have the choice to consume either directly a Flux<PartData> or a Flux<Part> wrapping that Flux<PartData>.

@jomach
Copy link

jomach commented Mar 15, 2022

this is related with #27743 right ?

@poutsma
Copy link
Contributor Author

poutsma commented Apr 20, 2022

This feature is now in main, will be in Spring Framework 6.0 M4 when it comes out on May 11th. I would really appreciate feedback before 6.0 RC is released.

There is no reference documentation as of yet, it will be written when the RC approaches, but for now there is a substantial amount of Javadoc on the main type: PartEvent, see here.

Note that I changed the name of this type from PartData to PartEvent, as I think that more clearly describes the intent of the type.

@poutsma
Copy link
Contributor Author

poutsma commented Apr 20, 2022

@djouvin

The conditions for this composite approach to work is to maintain the following predicates true:

  • part demand and part content demand must not be mixed and must be processed differently

  • a part content flux has to be subscribed before any other part is processed

    • the corollary is that the next part must not be delivered by the producer without the consumer having fully consumed, or explicitly cancelled the previous part's content flux : the part should thus expose an explicit cancel or dispose method (for example, by implementing the Disposable interface). We are bending here a little the reactive streams contract semantics (part content are in a way "pre-subscribed"), but there is no other way to ensure parts are not skipped unintentionally by a prefetching operator

    • prefetch should not be used for part consumption on the outer flux, as it will never be honored by the producer, but it can be used for part content consumption on the inner flux (which behaves as a regular Flux<DataBuffer>)

    • the next part should be delivered to the outer flux consumer :

      • when the previous part's content delivery is complete,
      • or when all subscriptions to this part content are cancelled,
      • or when the part itself is cancelled or disposed,
    • and of course part demand is still there and the outer flux itself is not cancelled

  • part content delivery should honore content demand as any data buffer flux would, until of course the end of the part is encountered (then the content flux is completed and the potentially remaining prefetched data buffer are retained by the producer)

While the user might be able to limit their usage of operators on the Flux<Part> we provide, it is impossible to make the same guarantee when that flux is passed on to another library or framework. As a consequence, things can unexpectedly break when they worked perfectly fine before.

I agree that the composite approach implementation is more complex, and I am not sure it is always interesting to have an outer Flux<Part>, because most of the time an HTTP stream has only one, or just a few, parts (so viewing parts as a Flux is not a must). It does however fit well with flatMap and concatMap operators. And, it can be build on the PartData approach too (as it is now with the PartToken generator) : consumers would have the choice to consume either directly a Flux<PartData> or a Flux<Part> wrapping that Flux<PartData>.

I will try to refactor the PartGenerator to use the functionality for this issue, and will let you know how that proceeds.

@jomach
Copy link

jomach commented Apr 20, 2022

Would be great if you could provide a working example of this :) thx and great work !

poutsma added a commit that referenced this issue May 10, 2022
This commit refactors the PartGenerator to use the newly introduced
Token::isLast property.

See gh-28006
@poutsma
Copy link
Contributor Author

poutsma commented May 10, 2022

Would be great if you could provide a working example of this :) thx and great work !

There is some sample code available on the PartEvent Javadoc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants