Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize SimpleSubscriber for Netty #3583

Merged
merged 9 commits into from
Mar 11, 2024
Merged

Conversation

kciesielski
Copy link
Member

@kciesielski kciesielski commented Mar 8, 2024

Fixes #3548

This PR updates the SimpleSubscriber with following improvements:

  1. If there is only one chunk of data (by default for body length < 8192), read it from the buffer and return immediately, requiring only one array allocation
  2. If there are multiple chunks, collect all Netty ByteBufs without rewriting them into arrays, then copy them into the final array in onComplete. Disclaimer: initially I wanted to cover this case with Netty's CompositeByteBuf, but it turned out to be bad idea. It does some reallocations to resize internal representation underneath, resulting in making the overall performance significantly worse.

Results:
PostBytes Simulation

Before After
CPU samples 30.86% 29.73%
throughput 44738 56955

Latency improvement:
latency-postbytes

PostLongBytes Simulation

Before After
CPU samples 48.8% 38.69%
throughput 363 454

Latency improvement:
latency-postlongbytes

I haven't measured it, but there should be also noteworthy gains in memory allocations.

@kciesielski kciesielski marked this pull request as ready for review March 8, 2024 16:01
@kciesielski kciesielski requested a review from adamw March 8, 2024 16:01
val finalArray = ByteBufUtil.getBytes(byteBuf)
byteBuf.release()
resultBlockingQueue.add(Right(finalArray))
resultPromise.success(finalArray)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the subscriber enter into some state, where another arriving content would report an exception? it's possible that the incoming header is malformed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a handler for this case - now if the result queue.offer call returns false, we follow up with canceling the subscription - as far as I checked that's the recommended way to 'abort'.

private val resultBlockingQueue = new LinkedBlockingQueue[Either[Throwable, Array[Byte]]]()
private val buffers = new ConcurrentLinkedQueue[ByteBuf]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually ... what are the concurrency guarantees for a subscriber - can multiple onNext be called concurrently? or maybe onNext + onError? I'm wondering if we (a) need a concurrent data structure here at all and (b) if concurrency is allowed, is the impl safe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onNext, onError, and onComplete operations are guaranteed to be called sequentially without concurrency, so we are safe to replace the concurrent data structure with something simpler. I tried with a ListBuffer and it actually gave another noticeable boost to throughput and latency for PostLongBytes.

@adamw
Copy link
Member

adamw commented Mar 8, 2024

Nice results! :)

import scala.concurrent.{Future, Promise}

private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends PromisingSubscriber[Array[Byte], HttpContent] {
private var subscription: Subscription = _
private val resultPromise = Promise[Array[Byte]]()
private var totalLength = 0
private val resultBlockingQueue = new LinkedBlockingQueue[Either[Throwable, Array[Byte]]]()
private val buffers = new ConcurrentLinkedQueue[ByteBuf]()
private val buffers = new mutable.ListBuffer[ByteBuf]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow-up question (sorry ;) ) - onNext/onComplete/onError are guaranteed to be called from one thread, but is it going to be the same thread? that is, does buffers need to be volatile?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have such a guarantee, and maybe that was why I had a ConcurrentLinkedQueue for byte arrays in the previous implementation, but I forgot :) This means I either fall back to it, or use a volatile ListBuffer.
I guess var totalLength is also unsafe and should be converted to an AtomicInteger, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: You can't have a volatile val, so I fell back to ConcurrentLinkedQueue. For totalLength I chose @volatile instead of AtomicInteger, because we are using this variable sequentially, our only scenario is increasing it in onNext and reading in onComplete.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we can have a volatile var with an immutable list - always less synchronisations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I replaced the ConcurrentLinkedQueue with a volatile var ListBuffer. In theory, ListBuffer should have slightly cheaper append time than a Vector, and we don't need Vector's fast random access, which takes additional cost of maintaining more complex underlying structure.
I don't see any improvement of throughput, but there's a slight improvement in latency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this properly protects the value - ListBuffer is mutable, and the volatile only ensures there's a memory barrier before reading the buffers reference (not references inside the ListBuffer). But maybe since we have a memory barrier, everything will be synchronized correctly ... as you can't really access the inner references before first reading the buffers reference (which creates the barrier).

Anyway, I though about a simpler design, using immutable data structures, where you don't have to think that much ;-) But maybe this one works as well :)

@kciesielski kciesielski merged commit 7301c06 into master Mar 11, 2024
28 checks passed
@kciesielski kciesielski deleted the perf-netty-subscriber branch March 11, 2024 10:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimize SimpleSubscriber for tapir-netty
2 participants