Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible OoME due to lack of coordination between Transport and RSocket #514

Closed
mostroverkhov opened this issue Jul 10, 2018 · 7 comments
Closed
Assignees
Labels

Comments

@mostroverkhov
Copy link
Member

mostroverkhov commented Jul 10, 2018

Untrusted requsters may send large RequestN, and if handler is fast compared to Transport, response frames queued on UnboundedProcessor may lead to large memory usage.

@yschimke
Copy link
Member

+1

Given accounting is number of frame based, supporting max message sizes (fragmentation aware) sounds like a good tuning parameter. I would expect to be able to size the memory of an rsocket server predictably for the number of supported clients and configured credits etc.

@mostroverkhov
Copy link
Member Author

mostroverkhov commented Jul 11, 2018

That may help also. More specifically, my suggestion is to limit the number of frames on queue - if Subscription does request(1_000_000) over network, I'd expect responder Processor does not request that 1m items blindly from RSocketHandler, but also accounts for requests from transport DuplexConnection.send(Publisher).

@yschimke
Copy link
Member

Just in case you haven't hit it, we inherit some behaviour from reactor-core here also, i.e. it has some default behaviour of take(5) being limit(MAX) and cancel after 5. limitRate can workaround that.

Mentioning for context

@mostroverkhov
Copy link
Member Author

Yeah, I remember heated discussion on reactor/reactor-core#879, sad it is not used here yet despite insistent demand. Still I believe it is slightly different problem here: limitRequest allows to tackle this problem for well-behaved requester, so there is no huge requestN over network, but one cannot assume untrusted client be well behaved. IMO now its to easy to blow up server with rogue request (even unintentionally)

@robertroeser
Copy link
Member

The LimitableRequestPublisher arbitrages between the underlying transport and the request n for the caller. It shouldn't be using up large amounts of memory because it shouldn't be processing data the transport buffers can handle it? We could try adding limitRate with a low and high water mark somewhere in the change to keep running smoother.

I guess the problem is that on the Java one then that would cause request n of 1,000,000 to be sent as request n of 128 over the network. That is probably fine but it might confuse people if they were to debug it.

@OlegDokuka
Copy link
Member

OlegDokuka commented Feb 27, 2019

I discovered the same issue while debugging https://github.com/scalecube/reactor-aeron/tree/develop/reactor-aeron-tools from @scalecube team.

I guess the transports possibilities should drive the source backpressure and we should not propagate original request frame from the requester. That will allow us to keep UnboundedProcessor un-overwhelmed.

This issue is critical because doing .subscribe() on the requester side will kill responder thread see the following example:

https://gist.github.com/artem-v/5f0ec7cf417bf733177262ff181f61b2#file-rsocketnettyservertps-java-L81

RSocketFactory
		.receive()
        .frameDecoder(Frame::retain)
        .acceptor(
            (setupPayload, rsocket) -> {
              System.out.println(rsocket);
              return Mono.just(
                  new AbstractRSocket() {
                    @Override
                    public Flux<Payload> requestStream(Payload payload) {
                      payload.release();

                      return Flux.range(1, 2000000000)
                          .map(i -> ByteBufPayload.create(BUFFER.retainedSlice()))
                          .subscribeOn(Schedulers.parallel());
                    }
                  });
            })
        .transport(() -> TcpServerTransport.create(tcpServer))
        .start()
        .block()
        .onClose()
        .block();

In the above example, we created a flux rang with 2000000000 messages to send.
Once the subscriber requests Long.MAX_VALUE, which more than possible, the Thread on the responder side, which calls Subscription#request of FluxRange, will be stolen. But that would be ok if we would have direct writing to the connection, but since the SenderPublisher fuses with UnboundedProcessor to asyncMode which prevents to propagate message down the stream. Thus the Thread will be almost infinitely populating the UnboundedProcessor (until all messages consumed from the given FluxRange) and only after that starts draining messages from UnboundedProcessor.

Therefore, the only way to work around that by using rescheduling on a different thread.

BOTH ISSUES RELATED TO THE SAME ROOT CAUSE

RSocket Transport SHOULD (I would say MUST) control backpressure independently from the requester side. At the same time, counting of the originally requested number should be managed internally so in that way the responder source will not overwhelm UnboundedProcessor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants