Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...) #959

Closed
lkolisko opened this issue Nov 18, 2020 · 34 comments · Fixed by #985
Labels
bug superseded Issue is superseded by another

Comments

@lkolisko
Copy link

Using RSocket REQUEST_STREAM to implement a query where results are streamed back to the client. The client sends REQUEST_N frame 512. It is successfully receiving results. Once the client successfully receives >256 results, it requests an additional 256. The communication works fine. FrameLogger shows expected receiving and sending frames. After some time the following exception is thrown on the event of additional REQUEST_N on client;

2020-11-18 13:27:47.944 DEBUG 1 --- [tor-tcp-epoll-2] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 296
Data:
......

020-11-18 13:27:47.945 DEBUG 1 --- [pool-5-thread-5] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: REQUEST_N Flags: 0b0 Length: 10 RequestN: 256
Data:

2020-11-18 13:27:47.956 ERROR 1 --- [tor-tcp-epoll-2]  : 

reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:221)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxSourceFuseable] :
	reactor.core.publisher.Flux.from(Flux.java:957)
	io.rsocket.core.RSocketRequester.handleRequestStream(RSocketRequester.java:338)
Error has been observed at the following site(s):
	|_        Flux.from ? at io.rsocket.core.RSocketRequester.handleRequestStream(RSocketRequester.java:338)
	|_ Flux.subscribeOn ? at io.rsocket.core.RSocketRequester.handleRequestStream(RSocketRequester.java:393)

This happens as part of a large application. Threfore I do not have a standalone replication example at the moment.

  • RSocket version(s) used: 1.0.3
  • Other relevant libraries versions: spring-boot: 2.3.5, netty 4.1.53
  • Platform: OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.8+10, mixed mode)
@OlegDokuka OlegDokuka added the bug label Nov 18, 2020
@OlegDokuka OlegDokuka added this to the 1.0.4 milestone Nov 18, 2020
@OlegDokuka
Copy link
Member

Looks like it is possible with fusion enabled. I will try to arrange something similar with the .publisheOn operator to see if this reproduces

@koldat
Copy link
Contributor

koldat commented Nov 19, 2020

It is happening for us on more places. So far did not found any workaround. How to disable fusion?

@koldat
Copy link
Contributor

koldat commented Nov 19, 2020

@OlegDokuka why is in handleRequestStream/handleChannel the assumption that the receiver queue has bounded size 1? I would understand it in handleRequestResponse, but not in Flux variants.

io.rsocket.core.RSocketRequester.handleRequestStream(Payload)

final UnicastProcessor<Payload> receiver = UnicastProcessor.create(Queues.<Payload>one().get());

Handle frame function can consume network frames as fast as it can so it can happen that processor did not yet dispatched previous frame downstream. When I used unbounded queue it started working without exception.

@OlegDokuka
Copy link
Member

OlegDokuka commented Nov 19, 2020

@koldat

It is happening for us on more places. So far did not found any workaround. How to disable fusion?

There is no way to do so on your end.
For now, to work around what you have, you can try to add a RequesterInterceptor like the following :

new RSocketProxy(source) {

   Flux<Payload> requestStream(Paload p) {
      return source.requestStream(p).hide();
   }

   Flux<Payload> requestChannel(Flux<Paload> p) {
      return source.requestChannel(p).hide();
   }
	
}

please find a full example here -> https://github.com/rsocket/rsocket-java/blob/master/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/plugins/LimitRateInterceptorExample.java

The .hide operator should disable fusion and no issue should appear after that.

@OlegDokuka why is in handleRequestStream/handleChannel the assumption that the receiver queue has bounded size 1? I would understand it in handleRequestResponse, but not in Flux variants.

io.rsocket.core.RSocketRequester.handleRequestStream(Payload)

final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get());
Handle frame function can consume network frames as fast as it can so it can happen that processor did not yet dispatched previous frame downstream. When I used unbounded queue it started working without exception.

there is a strict reactive streams relationship. If you requested N elements, you clearly stated that you are ready to receive N elements. Following reactive streams, the producer follows your demand and produce exactly N elements (or less).

That said, there is no reason for us (rsocket core logic) to cache N elements anywhere before your events handler. That is why to avoid any redundant allocations we use a queue of a single element since we expect no values will be enqueued at all

The mentioned issue is the result of Reactor optimizations that we have not taken into account. Which fuse with the unicastProcessor queue and does not take into account the queue capacity

@koldat
Copy link
Contributor

koldat commented Nov 19, 2020

@OlegDokuka I am not sure I described that properly.

Ofcourse I know that N requested elements are ready to be consumed. But the problem is that there is also asynchronous effect. So downstream can consume, but it can take a time. Our application uses usual backpressure (512 elements) and normal operators.

As I said issue is fixed when queue is unbounded, because in rare case it hits the condition where in queue there is one element still not dispatched (randomly after 10k - 1M items received). But it also sometimes happen with much shorter streams.

I tries to remove fusion everywhere. I will try also your suggestion, but again:
why is in handleRequestStream/handleChannel the assumption that the receiver queue has bounded size 1? When receiving side is processed in other threads than consuming side.

@OlegDokuka
Copy link
Member

OlegDokuka commented Nov 19, 2020

here is the point. Your backpressure operator is responsible for storing up to 512 elements, RSocket has no clue about your request. it just transparently transforms your request N into a binary frame and ensures that that frame is decoded back upon receiveal into the same requestN method invocation on your subscription on the receiver side.

The problem is that Reactor is a smart framework and tries to do lots of optimizations.
That said, if you have a workflow of UnicastProcessor[Queue] -> PublishOn[Queue]

then Reactor will optimize your workflow and make it as the following PublishOn[UnicastProcessor[Queue]], which means that 1 less queue will be allocated.

But here comes the problem. We use UnicastProcessor just because it is a convenient API that lets us send elements manually. On the other hand, we don't need to store any elements, because we know the strict relationship between publisher and subscriber, which says that if subscriber say request(5) it is ready to consume(5) if later on, it says request(10), but previous 5 was not yet delivered, it means it is ready to receive 15 (5 + 10) (and again we don't have to store anything).

The problems come from the fact that UnicastProcessor fuses with any operator which creates its queue. This means now if we create queue(1) and publishOn wanted queue(512), the UnicastProcessor will have no clue about that requirement. hence we see overflow exceptions because of requirements mismatch.

As I said, to avoid this issue for now, just place the .hide operator before your publishOn or any other operator like groupBy. which can fuse with our internal UnicastProcessor.

Also, feel free to migrate to 1.1.0 which provides custom implementations of request operators and does not use UnicastProcessor anymore

@OlegDokuka
Copy link
Member

OlegDokuka commented Nov 19, 2020

On the other hand, we have subscribeOn between UnicastProcessor and your logic. SubscribeOn should disable fusion. Thus now I'm at the point of I don't have a clue what is going on there.

@lkolisko appologies for the confusion. Let me reproduce that issue locally and then I can provide you with more updates. For now please, use either 1.0.2 or 1.1.0

@koldat
Copy link
Contributor

koldat commented Nov 19, 2020

I am working with Lukas on this issue. We use spring-boot unfortunately and I am not sure we can upgrade to 1.1.0 and it brings some incompatibilities on onterfaces. On the other hand we tried 1.0.2 and it did not help. We placed ".hide" almost everywhere without any effect as we cannot go under "subscribedOn" in handleRequestStream. The only fix that helped is to use unbounded queue in handleRequestStream to cover that glitch. In worst case there will be 2 elements in memory, but still better to not crash the stream.

@koldat
Copy link
Contributor

koldat commented Nov 19, 2020

The change (unbounded vs. one()) was introduced in 18050ed after 1.0.2. Hmm... We will try again if it is really happening even for 1.0.2.

@OlegDokuka
Copy link
Member

@koldat @lkolisko It should not happening in 1.0.2. We will do our best to land a fix for that ASAP and do 1.0.4

@koldat
Copy link
Contributor

koldat commented Nov 19, 2020

You are right, we have just verified that. The issue was that we got same exception text (overrun by more signals than expected) from server side as well thus were confused. So it looked the same, but was not. It means similar problem is also on the other side (RSocketResponder) . When we changed server and client issue was solved.

Thanks a lot for looking into this.

@OlegDokuka
Copy link
Member

OlegDokuka commented Nov 20, 2020

@koldat one more question. Do you have any custom publishers on the producer side?

I did a couple of different test cases, and I can not reproduce this issue. Also, we have an extensive number of tests inside RSocket which means if we have a bug, it would eventually appear on the CI but it's not. Thus, I suspect that something is on your end. Thus to make sure it is our bug, I would appreciate if you share more details on what kind of operators do you use: what is the producer, what is the consumer; what Reactor operators do you use (if not reactor, what libraries do you use in combination with rsocket)

@koldat
Copy link
Contributor

koldat commented Nov 20, 2020

We use Spring boot 2.3.5. Flux we return is flatmap:

    public Flux<MetadataMetricQueryMatch> query(Mono<MetadataMetricQueryReactiveRequest> request, RoleBasedSession session) {
        return request.flatMapMany(query ->  metadataManager.executeAsFlux(session, query));
     }

where inner flux is using and fromIterable:

        return Flux.using(() -> new AtomicReference<>(new DatabaseIteratorContext()),
                    DatabaseIterator<MetadataMetricQueryMatch> it = execute(session, query, matcher, ctx.get(), false);
                    return Flux.fromIterable(() -> it);
            }, ctx -> {
                ctx.get().close();
            });

@OlegDokuka
Copy link
Member

OlegDokuka commented Nov 20, 2020

@koldat Anything special on the consumer side, e.g. custom subscriber, etc?

@koldat
Copy link
Contributor

koldat commented Nov 20, 2020

It is failing in two different cases:

  1. stream - We use api -> BaseSubscriber
  2. channel - We use api -> windowUntil and flatMap and block()

api call is built like this for stream:

       rSocket.flatMapMany(socket -> socket.requestStream(
            DefaultPayload.create(encodeMessage(request, mapper), encodeMetadata()))
            .map(data -> decodeMessage(data, mapper, type)))
            .onErrorMap(applicationExceptionMapper());

api call is built like this for channel:

            rSocket.flatMapMany(socket -> socket.requestChannel(
                Flux.from(request)
                .map(r -> encodeMessage(r, mapper))
                .switchOnFirst((signal, inner) -> {
                    if (signal.hasValue()) {
                        Payload firstPayload = DefaultPayload.create(signal.get(), encodeMetadata());
                        return Mono.just(firstPayload)
                            .concatWith(inner.skip(1).map(DefaultPayload::create));
                    }
                    return inner.map(DefaultPayload::create);
                }))
            .map(data -> decodeMessage(data, mapper, type)))
            .onErrorMap(applicationExceptionMapper());

@OlegDokuka
Copy link
Member

Yup. Looks like it is not related to the business logic at all

@koldat
Copy link
Contributor

koldat commented Jan 6, 2021

@OlegDokuka any update on this issue? Using 1.0.2 version we have not met any single issue for last month we were facing almost immediately with 1.0.3 (queue with bounded size == 1). I would still like to use official latest version provided by spring that has also other fixes.

Thanks you very much for your support so far.

@OlegDokuka
Copy link
Member

OlegDokuka commented Jan 6, 2021

@koldat can we have a call so I can look at your application setup / how you write your dataflow, etc?

Unfortunately, I'm was not able to reproduce the mentioned issue at any of the setups you mentioned. So... basically, I'm stacked with the progress on this.

Cheers,
Oleh

@koldat
Copy link
Contributor

koldat commented Jan 25, 2021

@OlegDokuka We are still trying to collect more information to help you to resolve this.

So far it looks to me like an issue that is happening to us also with WebFlux. Basically using limitRate (I think you are author) together with schedulers leads to some multithread issue I think. You can see more information on reactor page including simply unit test. When we removed limitRate is at least started to work properly with WebFlux. I will retry the same for rsocket (I just found that).

reactor/reactor-core#2567

@OlegDokuka
Copy link
Member

Is it the same set of operators you are using with rsocket Java?

@OlegDokuka
Copy link
Member

OlegDokuka commented Jan 25, 2021

@koldat Is it the same set of operators you are using with rsocket Java?

@koldat
Copy link
Contributor

koldat commented Jan 25, 2021

Unfortunately not exactly. But we are using similar combination like this:

    @Override
    public Flux<MetadataQueryMatch> query(Mono<MetadataQueryReactiveRequest> request, RoleBasedSession session) {
        request = request.publishOn(readScheduler);
       // Some standard operators here
       return flux.subscribeOn(readScheduler);
    }

What can happen is that server side does not follow rule, because it emits some value twice. So in very rare cases client throws an exception as we see. Version 1.0.2 maybe just hide the reactor defect (double emit of the same item).

Just brainstorming, have no evidence.... But if it can happen in the unit test why not in any other operator combination together with subscribeOn/publishOn.

I have tried elastic and also executorService schedulers. No difference.

@OlegDokuka
Copy link
Member

can be a bug in the reactor. I will check that

@OlegDokuka
Copy link
Member

OlegDokuka commented Jan 25, 2021

@koldat Also, I would appreciate it if you can enable rsocket frame logging and your flux logging and share these logs with me. That can helo with debugging as well

@OlegDokuka
Copy link
Member

OlegDokuka commented Feb 17, 2021

@koldat @lkolisko do you observe that issue in the combination of 1.0.3 and the latest reactor-core 3.3.14 and the fix that you just recently added?

@koldat
Copy link
Contributor

koldat commented Feb 18, 2021

@OlegDokuka no latest reactor did not help, it is another issue. Problem is still in this queue:

final UnicastProcessor<Payload> receiver = UnicastProcessor.create(Queues.<Payload>one().get());

Issue is that there are two threads. One is requesting and one is dispatching to queue. As request from downstream is higher than one then there is more than one inflight message coming to receiver.

Now imagine you are dispatching to queue(io.rsocket.core.RSocketRequester.handleFrame(int, FrameType, ByteBuf)), but at the same time there is a "request" call from downstream side. It locks WIP so that onNext will NOT drain. Then it dispatch immediately second message from wire which as a result end up with two elements in the queue -> overflow.

@OlegDokuka
Copy link
Member

Indeed. I see what you mean. Good catchup 👍

@koldat
Copy link
Contributor

koldat commented Feb 18, 2021

Do you want me to do PR for this or you will handle that? I would simply revert to default create (unbounded). It does not matter much if for tiny a bit of a time there will be couple of elements.

@OlegDokuka
Copy link
Member

@koldat thank you for offering!

Let's keep what we have right and do that differently. Having an unbounded queue behind UnboundedProcessor brings other DoS-related problems (for more info see -> #887). Thus, I don't wanna circle back to that problem in 1.0.x again and would rather relax UnboundedProcessor (taking into account our execution model which should never fail because of the overflow) and have that relaxed impl locally. We are planning the release of 1.0.4 next week and this one should be included in it.

However, if you see what I mean by "relaxed impl" which has to be Queue-free and WIP-free, feel free to send a PR before me. (I'm planning to work on that this weekends)

Thanks,
Oleh

@koldat
Copy link
Contributor

koldat commented Feb 19, 2021

#887 makes sense. What I would personally do is to add requestN assertion which is to check if queue is drained and filled with respect to downstream requests. If client does not follow the rules it will throw an exception. It can be maybe even nice operator in reactor. The point is that even if you fix this place the same type of attack can be applied to downstream so that every downstream service would maybe need to have same protection. So having that on entry would be the best I think.

I am not sure what is meant by relaxed impl, but if it is easier then I would leave it up to you as an expert in this area.

@OlegDokuka
Copy link
Member

OlegDokuka commented Feb 19, 2021

Right. The more challenging part is to have Processor API which does that + Single Subscriber Assertion + RequestN assertion though it is apart of rsocket impl. The point here is to avoid any queue on our end. Thus the first trial was implementing UnicastProcessor with Queues.one() but it seems that we have to come back to a custom UnicastProcessor impl again...

Although... there is a simpler way. What if we request(unbounded) to UnicastProcessor upfront at the first request at the RequestOperator? So what we shell do is:

  1. Removing this.s.request(n) from here -> https://github.com/rsocket/rsocket-java/blob/1.0.x/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java#L96
  2. Do this.s.request(Long.MAX_VALUE) there -> https://github.com/rsocket/rsocket-java/blob/1.0.x/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java#L118 to ensure that UnicastProcessor has requested once and will never be touched by the downstream request

Having that, we can keep UnicastProcessor.create(Queue.one()) and the requestN assertion should be deferred to the downstream logic (e.g. publishOn operator, or after all the actual subscribe must ensure the produced number of elements is what was requested) (and in case of attack we will not be queueing anything on our end)

@koldat, Do you wish to make a PR for that and adding a test ensuring such racing is not reproducing anymore?

@OlegDokuka
Copy link
Member

@koldat this issue should be fixed in 1.0.4 release by #985. Let me know if it works for you!

@OlegDokuka OlegDokuka removed this from the 1.0.4 milestone Feb 26, 2021
@OlegDokuka OlegDokuka added the superseded Issue is superseded by another label Feb 26, 2021
@chaoyoung
Copy link

chaoyoung commented Sep 13, 2022

@koldat this issue should be fixed in 1.0.4 release by #985. Let me know if it works for you!

There are many such issue in our production environment.

dependency: spring-boot:2.3.12.RELEASE, rsocket:1.0.5, reactor-core:3.3.17

public Flux<Void> voiceChat(RSocketRequester requester, @Header String callId, @Payload final Flux<byte[]> payloads) {
         payloads.subscribeOn(callRxScheduler, false)
        .onBackpressureBuffer(100, rxOnBufferOverflow, BufferOverflowStrategy.DROP_OLDEST)
        .subscribe(consumer, errorConsumer, completeConsumer);

}

error:

reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:221)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxDoFinallyFuseable] :
	reactor.core.publisher.Flux.doFinally
	io.rsocket.core.RSocketResponder.handleChannel(RSocketResponder.java:563)
Error has been observed at the following site(s):
	|_            Flux.doFinallyat io.rsocket.core.RSocketResponder.handleChannel(RSocketResponder.java:563)
	|_          Flux.doOnDiscardat io.rsocket.core.RSocketResponder.handleChannel(RSocketResponder.java:576)
	|_                 Flux.fromat org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.requestChannel(MessagingRSocket.java:131)
	|_                  Flux.mapat org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:169)
	|_        Flux.doOnSubscribeat org.springframework.messaging.rsocket.annotation.support.MessagingRSocket.handleAndReply(MessagingRSocket.java:169)
	|_                 Flux.fromat org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver.extractContent(PayloadMethodArgumentResolver.java:178)
	|_                  Flux.mapat org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver.extractContent(PayloadMethodArgumentResolver.java:178)
	|_                  Flux.mapat org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver.decodeContent(PayloadMethodArgumentResolver.java:235)
	|_                Flux.errorat org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver.lambda$decodeContent$2(PayloadMethodArgumentResolver.java:236)
	|_        Flux.onErrorResumeat org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver.decodeContent(PayloadMethodArgumentResolver.java:236)
	|_        Flux.switchIfEmptyat org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver.decodeContent(PayloadMethodArgumentResolver.java:238)
	|_          Flux.subscribeOnat com.xxx.dm.controller.VoiceCallController.voiceChat(VoiceCallController.java:371)
	|_ Flux.onBackpressureBufferat com.xxx.dm.controller.VoiceCallController.voiceChat(VoiceCallController.java:372)
Stack trace:
		at reactor.core.Exceptions.failWithOverflow(Exceptions.java:221)
		at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:394)
		at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:336)
		at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
		at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:560)
		at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:645)
		at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:685)
		at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:178)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:265)
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:371)
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358)
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
		at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
		at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
		at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
		at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.lang.Thread.run(Thread.java:748)

@chaoyoung
Copy link

here is the point. Your backpressure operator is responsible for storing up to 512 elements, RSocket has no clue about your request. it just transparently transforms your request N into a binary frame and ensures that that frame is decoded back upon receiveal into the same requestN method invocation on your subscription on the receiver side.

The problem is that Reactor is a smart framework and tries to do lots of optimizations. That said, if you have a workflow of UnicastProcessor[Queue] -> PublishOn[Queue]

then Reactor will optimize your workflow and make it as the following PublishOn[UnicastProcessor[Queue]], which means that 1 less queue will be allocated.

But here comes the problem. We use UnicastProcessor just because it is a convenient API that lets us send elements manually. On the other hand, we don't need to store any elements, because we know the strict relationship between publisher and subscriber, which says that if subscriber say request(5) it is ready to consume(5) if later on, it says request(10), but previous 5 was not yet delivered, it means it is ready to receive 15 (5 + 10) (and again we don't have to store anything).

The problems come from the fact that UnicastProcessor fuses with any operator which creates its queue. This means now if we create queue(1) and publishOn wanted queue(512), the UnicastProcessor will have no clue about that requirement. hence we see overflow exceptions because of requirements mismatch.

As I said, to avoid this issue for now, just place the .hide operator before your publishOn or any other operator like groupBy. which can fuse with our internal UnicastProcessor.

Also, feel free to migrate to 1.1.0 which provides custom implementations of request operators and does not use UnicastProcessor anymore

It does not work for me, rsocket:1.0.5

dependency and context: #959 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug superseded Issue is superseded by another
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants