-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add specialized buffers to memqueue #5148
Conversation
Move the buffer and message handling from memqueue broker type to the actual go routines handling the messaging.
Use specialized buffer types and eventloops depending on the memqueue configuration. If flushing is disabled, a region based ring buffer is used, as loads of small batches can be generated and we want to minimize additional allocations of small objects. If flushing is enabled a list of active and flushed buffers is managed by the queue. If a buffer is flushed by timeout, but not yet processed (and not full), additional events being published are added to the already flushed buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite challenging to follow the full change as I think the tricky parts are in the nitty gritty details. As test almost didn't change, it seems things are working as before.
Would be nice if we could an additional pair of eyes on this PR.
minEvents int | ||
idleTimeout time.Duration | ||
bufSize int | ||
// buf brokerBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
count := len(buf) | ||
if count == 0 { | ||
panic("empty batch returned") | ||
} | ||
|
||
// log.Debug("newACKChan: ", b.ackSeq, count) | ||
ackCH := newACKChan(l.ackSeq, start, count) | ||
ackCH := newACKChan(l.ackSeq, start, count, l.buf.buf.clients) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming smells a bit here ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's due to the old code. Some more cleanups will come at some point.
|
||
// Internal event ring buffer. | ||
// The ring is split into 2 regions. | ||
// Region A contains active events to be send to consumers, while region B can |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/send/sent/
} | ||
|
||
func (b *ringBuffer) insert(event publisher.Event, client clientState) (bool, int) { | ||
// log := b.buf.logger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover?
|
||
// region B does not exist yet, check if region A is available for use | ||
idx := b.regA.index + b.regA.size | ||
// log.Debug(" - index: ", idx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite a few logging leftsovers in the code here and below I think.
Debug Logging 'leftovers' are on purpose. It's some 'ExtraDebug' :) |
type ringBuffer struct { | ||
buf eventBuffer | ||
|
||
regA, regB region |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for the record: I think regA
and regB
could be renamed to regConsumer
and regProducer
, so the could can make more sense without the comments. But as it is just moved from a different a file I can live with it. It is already an improvement over the previous state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tl;dr: Only purpose of regB is to handle the filling the ring buffers when the indices in the ring buffer 'wrap around'.
regA is regConsumer and regProducer as long as there is space left in regA :)
regB only comes into existence if the ring buffer wraps around (no more space in regA).
Once regB exists, regA is indeed consumer only and regB is producer only.
After wrapping around regB becomes regA, once regA becomes empty. From then on regA is again shared by producers and consumers until another 'wrap around' on the ring buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have seen that. I was just trying to come up with names which could help the understanding more. But ofc, the proposed names were not descriptive either. :(
* memqueue cleanup Move the buffer and message handling from memqueue broker type to the actual go routines handling the messaging. * Add specialized buffers to memqueue Use specialized buffer types and eventloops depending on the memqueue configuration. If flushing is disabled, a region based ring buffer is used, as loads of small batches can be generated and we want to minimize additional allocations of small objects. If flushing is enabled a list of active and flushed buffers is managed by the queue. If a buffer is flushed by timeout, but not yet processed (and not full), additional events being published are added to the already flushed buffer. * ensure flush list does not contain empty buffers (cherry picked from commit ffd23a9)
* memqueue cleanup Move the buffer and message handling from memqueue broker type to the actual go routines handling the messaging. * Add specialized buffers to memqueue Use specialized buffer types and eventloops depending on the memqueue configuration. If flushing is disabled, a region based ring buffer is used, as loads of small batches can be generated and we want to minimize additional allocations of small objects. If flushing is enabled a list of active and flushed buffers is managed by the queue. If a buffer is flushed by timeout, but not yet processed (and not full), additional events being published are added to the already flushed buffer. * ensure flush list does not contain empty buffers (cherry picked from commit ffd23a9)
* memqueue cleanup Move the buffer and message handling from memqueue broker type to the actual go routines handling the messaging. * Add specialized buffers to memqueue Use specialized buffer types and eventloops depending on the memqueue configuration. If flushing is disabled, a region based ring buffer is used, as loads of small batches can be generated and we want to minimize additional allocations of small objects. If flushing is enabled a list of active and flushed buffers is managed by the queue. If a buffer is flushed by timeout, but not yet processed (and not full), additional events being published are added to the already flushed buffer. * ensure flush list does not contain empty buffers (cherry picked from commit 7968572)
Use specialized buffer types and eventloops depending on the memqueue
configuration.
If flushing is disabled, a region based ring buffer is used, as loads of
small batches can be generated and we want to minimize additional
allocations of small objects.
If flushing is enabled a list of active and flushed buffers is managed
by the queue. If a buffer is flushed by timeout, but not yet processed
(and not full), additional events being published are added to the
already flushed buffer.
This PR fixes a bug in beta2 regarding the flush timeout not working properly. Having distinct state machines and buffer types for flush based and non-flush based queue configurations helps solving the issue, due to having more simple use-case specific state-machines.