Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Boot 2.3.0.RC1 - RSocket over Websocket transport - requestStream - produces additional request(255) #825

Closed
maxim-bandurko-lsvt opened this issue May 8, 2020 · 22 comments · Fixed by #829
Labels

Comments

@maxim-bandurko-lsvt
Copy link

Had created the issue at Spring Boot, but may be this should be posted here:
spring-projects/spring-boot#21360

With latest 2.3.0.RC1 RSocket "requestStream" started producing additional request to Flux inside controller with quantity 255.

Configuration to reproduce the issue:

Spring Boot 2.3.0.RC1
RSocket plugged to server
Transport - Websockets
Using @MessageMapping controller
RSocket connection with Composite Metadata ('message/x.rsocket.composite-metadata.v0'), Binary MimeType ('application/octet-stream') and Buffer encoders
Tested with javascript RSocket client v0.0.19.

Previous version of Spring Boot 2.3.0.M4 hasn't such issue.

@OlegDokuka
Copy link
Member

Hey @maxim-bandurko-lsvt.

This is expected.
It does not produce values over the network but produces it and keep in the local queue. We use the operator limitRate to ensure that a responder stream does not overflow the connection bandwidth when a request sends Long.MAX_VALUE.

Is such behavior breaks your business logic?

Cheers,
Oleh

@maxim-bandurko-lsvt
Copy link
Author

Hello Oleg @OlegDokuka

Thing is that client is not sending request Long.MAX_VALUE. It is sending request(1) or request(10), etc. And Java server is getting this amount of request, but additionally triggers one more request(255). But server doesn't deliver 256 requests, it delivers only 1 to client (that was requested by it). So, other 255 got prefetched.

Here is a basic example on server side:

@MessageMapping("test")
    public Flux<DataBuffer> test(DataBuffer request) {
        
        return Flux.just("a", "b", "c", "d", "e", "f")
        .map(
            data -> {
                log.info("next: " + data);
                return new DefaultDataBufferFactory().wrap(data.getBytes());
            }
        )
        .doOnRequest(
            consumer -> {
                log.info("doOnRequest: " + consumer);
            }
        );
        
        
    }

And here is a log on server side:

doOnRequest: 1
next: a
doOnRequest: 255
next: b
next: c
next: d
next: e
next: f

Javascript client was requesting only 1 element and got only 1, but because on Java side were originated request of 256 in total, all of them are got processed and stored in cache.

In previous Spring Boot version (2.3.0.M4) that had rsocket-core-1.0.0-RC6 with rsocket-transport-netty-1.0.0-RC6 everything was working properly and how it supposed to do, there were no additional request(255) at server side, so backpresure was correct to whole inner streams flow logic.

With new Spring Boot version (2.3.0.RC1) that has rsocket-core-1.0.0-RC7 with rsocket-transport-netty-1.0.0-RC7 everything got changed and stream is getting additional request 255 internally, that is propagated to whole inner streams flow logic.

Based on this issue, server prefetches 255 items internally that got not actual data. As client can do request(10) for example in one hour later, and will get 10 elements that were generated 1 hour earlier with old data.

If such prefetch logic is designed on purpose, is there any way to turn it off for certain situations?

Thank you.

@OlegDokuka
Copy link
Member

OlegDokuka commented May 8, 2020

As I said this is expected. It is done to prevent potential cases when the possible overwhelming may happen in case there is Long.MAX_VALUE requested.

It means that starting from RC7 it will always pre-request in advance 256 elements regardless the number of elements was originally requested. (applies for requestStream and requestChannel cases only)

This is the result of applying the '.limitRate' operator which enables such behavior.

I mean we did that on purpose but I guess if it is critical for you, we can have a configuration to disable it, of course.

@maxim-bandurko-lsvt
Copy link
Author

@OlegDokuka I see.

How can I configure to disable it? That is some kind critical and breaks all initial rules of backpressure logic that were originally developed.

Thank you!

@OlegDokuka
Copy link
Member

Unfortunately, for now, you can not. I will create a PR that should provide a way to configure that.

Please stay tuned

@OlegDokuka OlegDokuka added the bug label May 8, 2020
@maxim-bandurko-lsvt
Copy link
Author

@OlegDokuka btw, may be limitRate can have an option, to not add additional requests if the rate is not reached? I mean, in situation, if requested is lower the minimal rate.

@OlegDokuka
Copy link
Member

@maxim-bandurko-lsvt

Previously we had such a mechanism, but then we decided to use a built-in option from ProjectReactor.

cc/ @simonbasle @rstoyanchev

@maxim-bandurko-lsvt
Copy link
Author

@OlegDokuka I see,

So, it may be even makes sense to have such mechanism be implemented inside limitRate at ProjectReactor.

Other thing, is that such limiter is needed only at certain level of inner streams, not in the root level. Like, directly between the database operations, etc. So, developer can make a discussion himself, at what exact place to have such limiter injected. As if that will be injected on default, with not correct propagation of backpressure to server, it will break a lot of existing logic implemented by developers. And also, it makes confusing, as in some places using Reactive Streams backpressure in RSocket is not working as it supposed to be.

@OlegDokuka
Copy link
Member

Funny) Feel free to leave your comment on my PR -> reactor/reactor-core#1879

@OlegDokuka
Copy link
Member

Oh, well, you have already left one, which I was not aware of

@OlegDokuka
Copy link
Member

Anyways, let me talk with @rstoyanchev on the closest day to provide a proper resolution for that.

1.0.0 is scheduled for the next Monday, so I really hope that we can solve it by that time

@maxim-bandurko-lsvt
Copy link
Author

@OlegDokuka Thank you very much!

Also, is PR 1.0.0 will be inside upcoming PR of Spring Boot 2.3.0?

@OlegDokuka
Copy link
Member

Yup. 1.0.0 is exactly dedicated for the SB release

@maxim-bandurko-lsvt
Copy link
Author

@OlegDokuka Awesome! Thanks!!!

@rstoyanchev
Copy link
Contributor

Based on this issue, server prefetches 255 items internally that got not actual data. As client can do request(10) for example in one hour later, and will get 10 elements that were generated 1 hour earlier with old data.

@maxim-bandurko-lsvt thanks for the extra details. That's very helpful and timely feedback. I'm wondering have you considered the case where a client can request a very large number (intentionally or not) or is that not possible?

@OlegDokuka not sure if that helps to focus the efforts for reactor/reactor-core#1879 but I think ideally RSocket streams should only be protected against demand greater than N but otherwise respect anything less than that.

In addition we can also think about an option along the lines of #748 to create a RateLimitInterceptor that is configurable with the desired prefetchRate, and expose that via RSocketConnector and RSocketServer to be turned on/off. This would be pretty easy to achieve and provide control while separately making improvements to how it works.

@maxim-bandurko-lsvt
Copy link
Author

maxim-bandurko-lsvt commented May 10, 2020

@rstoyanchev As we have 2 options that requestStream and requestChannel provides (per demand request of data amount of witch will be only initiated and controlled by client, and flow of all available data originated by server that is ready to be delivered), it happens, that in real life applications, logic to request certain amount of data that they need and when they need, is in may times higher.

Even if we compare the imperative way of how web applications works, all data transport logic relies on simple AJAX requests that delivers certain amount of data when it is needed. Not so many chances to see AJAX HTTP2 requests that just stays originated and awaits for incoming available data from server.

So, one more break point that gives RSocket to developers is to show them, that they can use request of large number of data, that will allow server to propagate to client certain parts of data once that is ready to be delivered from server. Basic example, push notifications, chat feed, etc. Or, like there are questions in github, like "How can I send data when it is needed from server to client?". Front end app may not deal with routes mapping at client side, and just request maximum from server. That is all up to the developer and his decision in what way to build logic of data delivery strategies.

I agree, that large number in request can cause issues to server, mostly that happens with SQL select query with no limit/offset (pageable), but that happens only if developer had missed to limit the requests onto batches at certain chains of inner streams. Coming from that, I really think, that will be much beneficial to have a bold notice, that will point everybody, that they may need to place limitRate in certain places of their app or use Pageable (btw, Spring still hasn't such pageable implementation that works with Flux signals, and I am using my own one). And, to more on that, it is not even issue with RSocket way of client and server communication. If we take just WebFlux, request to SQL that has no limits, will also have exactly the same issue, and may break server. So, it is mostly on behalf of developer to inject limiters, paginations, etc. that will make server working correctly in any situations.

In my practice, I have around 95% of small requests and only 5% of infinity requests. But in all small number request mappings I am always using my own pageable implementation that works similar to limitRate, but expose BiFunction that provides Long limit and Long offset that are used as a parameters in returned stream that should be batched. As all spring modules like (r2dbc, elasticsearch, etc), they all do not control the amount of requested data by native Flux signals.

@OlegDokuka Thanks for removing limit rate operator!!!

@maxim-bandurko-lsvt
Copy link
Author

maxim-bandurko-lsvt commented May 10, 2020

@rstoyanchev @OlegDokuka Seems I wrote too much at previous comment and forgot to add couple of more.

Limit rate operator almost not solving issues with breaking server in cases with streams end points that works with databases. And more on that, it breaks other logics. Even having ability to define at LimitRateInterceptor the default prefetch number and highTide number, that makes limit rating to be global everywhere, with no ability to override per specific stream.

Example is that at some streams that deals with SQL where could be needed no prefetch and maximum limit 100, or prefetch 100 and limit 1k. With another database, like Elasticsearch these values should be different, and have prefetch 1k with limit 10k (as to be able to utilize database with better performance). I have a case, where have custom operator that switches streams to different databases based on single number in request (lower than 1k - use SQL with no prefetch and limit 100, above 1k - use Elasticsearch with prefetch and limit 10k).

So, plugging limitRate completely makes logic of working with databases not flexible, as having like 100 as limit, will work good for one database and overload the performance of another.

And last, as limitRate mostly not needed everywhere, and needed at certain cases, having it everywhere will just consume server resources.

@OlegDokuka
Copy link
Member

OlegDokuka commented May 10, 2020

@maxim-bandurko-lsvt Right. That is why I completely removed It and just kept an example on how limitRate might be installed.

Please keep in mind that limitRate is not for databases, or upstream systems, it was there for rsocket safety.

Remember that along with requester demand there is network bandwidth and if your app overwhelms network bendwidths, it is going to endup with OOM exception and simply crush of the requester/responder app.

Thus, it is crucial to follow both demands.

As of now we do not do that and decided first to workaround that with limitRate operator.

However, in 1.1 we will redisign transport and provide proper management of backpressure that will take into account transport demand along with requester demand.

What does it means. It means that if users requested 512 elements, but transport is ready to consume 128, the upstream will be demanded with 128, then with another demand that transport expose, until the whole requester demand will be delivered.

The same applies for reversal case, where transport demands 128 but requester sends 5 elements to deliver, in that case 5 elements will be delivered and 123 will be remining from the transport side

@maxim-bandurko-lsvt
Copy link
Author

@OlegDokuka I see. Thanks for detailed explanation!

Was thinking about original idea idea on backpresure, and like with simple websockets (not rsocket), any app that uses this transport had to consider the bandwidth usage in it's operational logic. So, apps are originally engineered to not get in situations with overwhelming client or server with produced data (I am not talking about apps, that developed with no clue of such situations). So switching to RSocket, right now makes a lot easier to control consuming bandwidth and demand. But getting the ability to have that handled natively by RSocket (client and server), is awesome idea!

Will 1.1 keep existing scheme of original request logic and there will be some kind new internal signal that controls back pressure just between client and server (not affecting request quantity demand)?

@OlegDokuka
Copy link
Member

yes. that will be configurable

@maxim-bandurko-lsvt
Copy link
Author

Awesome, thanks!

@hjohn
Copy link

hjohn commented Feb 3, 2022

However, in 1.1 we will redisign transport and provide proper management of backpressure that will take into account transport demand along with requester demand.

What does it means. It means that if users requested 512 elements, but transport is ready to consume 128, the upstream will be demanded with 128, then with another demand that transport expose, until the whole requester demand will be delivered.

The same applies for reversal case, where transport demands 128 but requester sends 5 elements to deliver, in that case 5 elements will be delivered and 123 will be remining from the transport side

Was this done? Because the back pressure mechanism doesn't seem to work for me at all in 1.1.0. A request-stream where the server is sending a (potentially) infinite amount of data as fast as the system allows is being buffered on my client which goes OOM. A limitRate on either side is not doing anything to alleviate this. This was working relatively well in previous versions, specifically 1.0.0-RC5.

I'm now going to be implementing the "example on how limitRate might be installed" but it is somewhat odd that back pressure is not working out of the box anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants