-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[shipper] Make the memory queue accept opaque pointers #31356
Conversation
…ut the contained buffer type
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
// there might also be free space before region A. In that | ||
// case new events must be inserted in region B, but the | ||
// queue isn't at capacity. | ||
avail = len(b.entries) - b.regA.index - b.regA.size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the follow up from your comment about this possibly not being right? Is it too much work to fix, or not worth fixing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the intention correctly then it's an easy fix, just removing b.regA.index
from the right side. I've been leaving it for last cause I don't want to intentionally change the functional logic until everything is at full parity with the old version (which right now is just pending on the stress tests).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah -- this turned out to be the cause of the test failure 😅 This is the same computation as the old version, but before it was only made on a specific state transition, and now that its checked every loop iteration it ended up blocking the queue. Switching to the correct calculation here makes the tests pass locally, so fingers crossed on the CI now.
|
||
for { | ||
var pushChan chan pushRequest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this duplicated entirely between here and newDirectEventLoop
? It is hard for me to spot if there is some subtle difference between the two just scrolling up and down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not quite duplicated -- it's the same logical sequence, but because the containing struct is different the conditions don't match (e.g.: here we check if the queue is full by comparing eventCount
to maxEvents
, but in the version above, directEventLoop
has no field analogous to eventCount
so it uses a different test).
I suspect that having these two almost-identical objects with such completely divergent implementations just for a special case doesn't help performance enough to justify the complexity, and if I get a chance I'd like to merge these into a single helper, but that seemed out of scope for now :-)
@@ -99,80 +83,73 @@ func (l *directEventLoop) run() { | |||
) | |||
|
|||
for { | |||
var pushChan chan pushRequest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is significantly more obvious than what was going on before. Nice!
Just looking at the diff I can't spot any major issues. I'll try to check this out and build more of an understanding of what this is doing later (after your refactoring, which is much easier to follow). Also I had never seen the |
Yea, I've never triggered that one before either, but it's related to sending loads through the pipeline so it's almost certainly a real failure. Right now I'm debugging it expecting that I missed a race condition somewhere. |
LGTM, give the rest of the time some time to look at it before merging though |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, awesome as always!
What does this PR do?
Refactors the memory queue internal data structures to accept opaque pointers (
interface{}
) for its events rather than an explicitpublisher.Event
. This is needed for the queue to store the event representations anticipated in https://github.com/elastic/elastic-agent-shipper.This doesn't fully resolve #31307 because it doesn't yet expose a type-agnostic public interface. This PR is already pretty big and I don't want it to eat into ON week, so I'm deferring those questions until I can give them full attention.
This change should, in a perfect world, be a functional no-op: it changes internal handling but the exposed API is unchanged. The main changes are:
events
andclients
intoqueueEntry
. The memory queue previously stored events aspublisher.Event
, and their metadata inclientState
. These were stored in separate arrays with shared indices, propagated in various ways. The new code creates the typequeueEntry
as its underlying buffer type, which contains the event (aninterface{}
which in beats has underlying type*publisher.Event
) and its metadata. This change had to be propagated through a number of internal helpers likememqueue.ringBuffer
ackState
inmemqueue.batch
. There were also some fields that were duplicates of others -- ineventloop.go
the event loops had pointers to their associated broker and their own unaltered copies of several of its fields. I removed these when I could.pushRequest
,getRequest
etc) which were selectively nulled-out on appropriate state changes (e.g. if the queue is full after apushRequest
then the push channel is set to nil to block additional requests). This got quite hard to follow during the changes, since the fields were mutated throughout the code and their semantics were undocumented. I moved the channels into local variables in the run loop, initializing them immediately before their use inselect
. This keeps the logic in one place, and it's clearer now what specific circumstances can enable / disable each channel.Checklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesI have added an entry inCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Performance tests
I ran some extra benchmarks using
libbeat/publisher/pipeline/stress
. The main configurations I tested were buffered vs direct event loop. The tests were 1-minute samples sending a continuous stream of events through the queue. They were run with:and similarly after switching to the PR branch. The top-level results were:
Event throughput (per minute)
Total allocations:
In use allocations:
As expected for the nature of the change, the total allocations are noticeably higher, since a lot of the complexity of the
publisher.Event
handling was to avoid allocating temporary values. However, the throughput is fine. While in-use memory is up it is still reasonable (8MB to send 69 million events).I also tested these configurations using the
blocking
output test (out=blocking
in the test name) which adds amin_wait
to the configuration. Remarkably, both old and new queues had exactly the same throughputs (tho on the order of 30K rather than 70M). The total allocations were up in the new version but in-use was slightly down.Overall these results look to me like we are paying slightly for this simplification, but nothing that seems worrying. I'm expecting to do more pipeline performance work soon and this cleanup gives a good baseline for tracking down our real bottlenecks.
Related issues