Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes performance degradation when fragmentation is used #995

Merged
merged 1 commit into from
Mar 4, 2021

Conversation

koldat
Copy link
Contributor

@koldat koldat commented Mar 3, 2021

When one define custom mtu to be used for fragment size it significantly degrades performance. Attached example code sends 1M records with size 5 bytes. Before fix it takes 39 seconds. After fix it takes 5 seconds (same time as with no custom fragmentation). We need to enable it, because Websocket max data size is 64kB and we support both transports.

See #994

@OlegDokuka
Copy link
Member

Unfortunately, this fix breaks the idea of fragmentation and introduces head-of-the-line blocking since now basically all frames are fragmented and sent one by one meaning, that if one huge frame has to be sent, it will be blocking the others. I believe what you need is another fragmentation level which is given by WebSocket by default.

@koldat
Copy link
Contributor Author

koldat commented Mar 3, 2021

@OlegDokuka " that if one huge frame has to be sent, it will be blocking the others."

I am not sure I understand. Why it should be blocking others? original code is doing the same. There is only exception that concatMap has condition that when no fragmentation is not needed we do not need to send with "fragmented" branch.

@OlegDokuka
Copy link
Member

I did. What you do is basically you have concatMap which sends all the frame in a single, delegate.send. Am I missing something?

@koldat
Copy link
Contributor Author

koldat commented Mar 3, 2021

delegate.send is blocking others? Then no multiple streams could even work on same channel.

@OlegDokuka
Copy link
Member

OlegDokuka commented Mar 3, 2021

am not sure I understand. Why it should be blocking others? original code is doing the same. There is only exception that concatMap has condition that when no fragmentation is not needed we do not need to send with "fragmented" branch.

cancatMap allows only a single Flux to produce frames, which means if we have a huuuuuge payload it will produce a loong Flux with small frames, so that payload will take a long time to complete and will put in a queue all the other frames, so they will be waiting their turn to be fragmented.

delegate.send is blocking others? Then no multiple streams could even work on same channel.

It sequentially drains frames, one-by-one

@OlegDokuka
Copy link
Member

I mean, looking at your fix and the explanation, I believe that you need something different and at the netty level. Can you please look at my comments on #994?

@koldat
Copy link
Contributor Author

koldat commented Mar 3, 2021

What about flatMap then?

@koldat
Copy link
Contributor Author

koldat commented Mar 3, 2021

@OlegDokuka Why this line is not blocking the same way you say for my version? That concatMap also does one by one and if there is huge data frame then others waits in queue:

  @Override
  public Mono<Void> send(Publisher<ByteBuf> frames) {
    return Flux.from(frames).concatMap(this::sendOne).then();
  }

@OlegDokuka
Copy link
Member

OlegDokuka commented Mar 3, 2021

@koldat this code is weird, and it kind of the same and not. The main difference that every sub-flux generated within concatMap is sent to delegate.send. Under the hood, reactor-netty allocates the whole MonoSendMany for that purpose (and that is why you see so tremendous overhead because it creates a couple of queues and objects). MonoSendMany has a prefetch mechanic and technically it uses its own channel (if I'm not wrong), so it can write in the channel's queue some amount of data before being blocked. That said onComplete will be back fast enough compares to the case when all the data are being written into the same MonoSendMany (before it was MonoSendMany per Flux generated by concatMap -> after your changes it is single MonoSendMany for everything so whenever the channel says "I'm full" we have to wait and prefetch only after that). (apologies if the above reads hard, it is ~1 AM local time)

What about flatMap then?

Actually, it can be a good alternative and I guess we can land it as an improvement. So, your code is good, lets try to migrate from concatMap to flatMap and see if nothing is broken.

@OlegDokuka OlegDokuka added this to the 1.0.4 milestone Mar 3, 2021
@koldat
Copy link
Contributor Author

koldat commented Mar 3, 2021

Changed to flatmap. Performance is almost same (5 seconds vs 30 and more without fix). Tests pass (locally)

Copy link
Member

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@OlegDokuka OlegDokuka self-requested a review March 4, 2021 09:59
Copy link
Member

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my private discussion with @rstoyanchev, we figured out that flatMap may break frame ordering so frames for the same stream can be potentially reordered which is something we don't wanna have. Actually, it turned out that the previous impl may do the same, so we need to iterate a little more to ensure we have good perf and do not break frame ordering

@rstoyanchev
Copy link
Contributor

@koldat the head-of-line and performance issues with fragmentation are well known limitations in 1.0.x which required significant work that was done for 1.1, if you take a look at #761 and the issues linked to it. Given there are no easy solutions for this will likely remain as a limitation in 1.0.x and you'll need to upgrade to 1.1 to get the benefits of the rework. You mentioned Spring Boot 2.3 which has 3 months remaining as well so you'll need to upgrade to 2.4 which is based on RSocket Java 1.1.

As @OlegDokuka mentioned using flatMap is likely an issue and we can't roll it in with only 3 months of OSS support left, but if it works for you, feel free to apply it to your environment. Another idea, I don't know if reducing the fragmentation size to be less than 60K might give you a slightly more optimal performance for the remaining time before you upgrade.

@OlegDokuka
Copy link
Member

OlegDokuka commented Mar 4, 2021

@koldat as it turned out, you were right about behaviors, we kind of exploit a bug of reactor-netty which did shuffling frames, and the bug is fixed now, which says we have a head-of-line-blocking at the moment which is unfortunate.

We chatted with @rstoyanchev and figured out that we can use flatMap, however, we need to put groupBy operator beforehand to ensure that flatMap is not reordering frames for the same streamId.

I suggest doing the following:

delegate.send(
        Flux.from(frames)
            .groupBy(frame -> FrameHeaderCodec.streamId(frame), Integer.MAX_VALUE)
            .flatMap(groupedById -> groupedById
              .concatMap(
                frame -> {
                  FrameType frameType = FrameHeaderCodec.frameType(frame);
                  int readableBytes = frame.readableBytes();
                  if (!shouldFragment(frameType, readableBytes)) {
                    return Flux.just(frame);
                  }

                  return logFragments(Flux.from(fragmentFrame(alloc(), mtu, frame, frameType)));
              }), Integer.MAX_VALUE),
           Integer.MAX_VALUE);

can you please check if that solution is still good enough for you?

@OlegDokuka
Copy link
Member

OlegDokuka commented Mar 4, 2021

if the above will not work well for you, I guess we can stick to concatMap (instead of flatMap) and just state that we have a head-of-line blocking problem in 1.0.x though we support fragmentation (which is useless in that case)

@koldat
Copy link
Contributor Author

koldat commented Mar 4, 2021

I think groupBy is not a good idea as it keeps stream in groupBy operator forever (internally as Map).

I would go with concat as you have said it is same performance and least chance to make issue. Scaling this can be easily done by loadbalancing (more connections). But still I do not think it is an issue, because connection is just for the application it uses it so having fully utilized wire cannot be faster. So that having potential way to interleave the streams does not increase the final throughput.

@rstoyanchev yes 3 months sounds short, but some deployments does not go that fast. Especially in production. Yes we plan to more forward and do upgrade, but we also want to have current version stable and performance. Regarding the comment on changing the fragmentation. That is actually a problem. Setting any value cause this issue. It does not matter what value it is.

Should I switch back to concatMap or you do not want to include it in the release?

@OlegDokuka
Copy link
Member

OlegDokuka commented Mar 4, 2021

I think groupBy is not a good idea as it keeps stream in groupBy operator forever (internally as Map).

Technically, we can track the end of the stream, so it will not be a problem, and we can cancel the inner Flux when we see the terminal frame for that streamId. I will try to do that for 1.0.5 (if we end up doing 1.0.5)

Should I switch back to concatMap or you do not want to include it in the release?

@koldat yes, please

@koldat
Copy link
Contributor Author

koldat commented Mar 4, 2021

Change done

@OlegDokuka OlegDokuka changed the title Fix performance degradation when fragmentation is used (#994) fixes performance degradation when fragmentation is used Mar 4, 2021
@OlegDokuka OlegDokuka merged commit e4d62b6 into rsocket:1.0.x Mar 4, 2021
@OlegDokuka
Copy link
Member

OlegDokuka commented Mar 4, 2021

@koldat thanks for your contribution

@koldat koldat deleted the fragment_fix branch March 4, 2021 14:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants