Skip to content

Commit

Permalink
Refactor internal buffer chain in the memory queue (elastic#37795)
Browse files Browse the repository at this point in the history
- Refactor the buffered and unbuffered memory queue implementations into a single object
- Use a ring buffer for both cases (previously the buffered case used a linked list of independent buffers)
- Simplify the behavior of `queue.mem.flush.min_events` so it can't starve output workers when enough data is ready (previously it interacted in complicated ways with `bulk_max_size`, now it serves as a simple maximum on allowable event batch size).
  • Loading branch information
faec authored Feb 13, 2024
1 parent be66efa commit ecfb88f
Show file tree
Hide file tree
Showing 13 changed files with 708 additions and 1,024 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
platform, and when viewed from a metadata API standpoint, it is impossible to differentiate it from OpenStack. If you
know that your deployments run on Huawei Cloud exclusively, and you wish to have `cloud.provider` value as `huawei`,
you can achieve this by overwriting the value using an `add_fields` processor. {pull}35184[35184]
- In managed mode, Beats running under Elastic Agent will report the package
- In managed mode, Beats running under Elastic Agent will report the package
version of Elastic Agent as their own version. This includes all additional
fields added to events containing the Beats version. {pull}37553[37553]
- The behavior of `queue.mem.flush.min_events` has been simplified. It now serves as a simple maximum on the size of all event batches. There are no longer performance implications in its relationship to `bulk_max_size`. {pull}37795[37795]

*Auditbeat*

Expand Down
54 changes: 33 additions & 21 deletions libbeat/docs/queueconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,32 @@ The memory queue waits for the output to acknowledge or drop events. If
the queue is full, no new events can be inserted into the memory queue. Only
after the signal from the output will the queue free up space for more events to be accepted.

The memory queue is controlled by the parameters `flush.min_events` and `flush.timeout`. If
`flush.timeout` is `0s` or `flush.min_events` is `0` or `1` then events can be sent by the output as
soon as they are available. If the output supports a `bulk_max_size` parameter it controls the
maximum batch size that can be sent.
The memory queue is controlled by the parameters `flush.min_events` and `flush.timeout`.
`flush.min_events` gives a limit on the number of events that can be included in a
single batch, and `flush.timeout` specifies how long the queue should wait to completely
fill an event request. If the output supports a `bulk_max_size` parameter, the maximum
batch size will be the smaller of `bulk_max_size` and `flush.min_events`.

If `flush.min_events` is greater than `1` and `flush.timeout` is greater than `0s`, events will only
be sent to the output when the queue contains at least `flush.min_events` events or the
`flush.timeout` period has expired. In this mode the maximum size batch that that can be sent by the
output is `flush.min_events`. If the output supports a `bulk_max_size` parameter, values of
`bulk_max_size` greater than `flush.min_events` have no effect. The value of `flush.min_events`
should be evenly divisible by `bulk_max_size` to avoid sending partial batches to the output.
`flush.min_events` is a legacy parameter, and new configurations should prefer to control
batch size with `bulk_max_size`. As of 8.13, there is never a performance advantage to
limiting batch size with `flush.min_events` instead of `bulk_max_size`.

This sample configuration forwards events to the output if 512 events are available or the oldest
available event has been waiting for 5s in the queue:
In synchronous mode, an event request is always filled as soon as events are available,
even if there are not enough events to fill the requested batch. This is useful when
latency must be minimized. To use synchronous mode, set `flush.timeout` to 0.

For backwards compatibility, synchronous mode can also be activated by setting `flush.min_events`
to 0 or 1. In this case, batch size will be capped at 1/2 the queue capacity.

In asynchronous mode, an event request will wait up to the specified timeout to try
and fill the requested batch completely. If the timeout expires, the queue returns a
partial batch with all available events. To use asynchronous mode, set `flush.timeout`
to a positive duration, e.g. `5s`.

This sample configuration forwards events to the output when there are enough events
to fill the output's request (usually controlled by `bulk_max_size`, and limited to at
most 512 events by `flush.min_events`), or when events have been waiting for 5s without
filling the requested size:

[source,yaml]
------------------------------------------------------------------------------
Expand All @@ -64,29 +76,29 @@ You can specify the following options in the `queue.mem` section of the +{beatna
[[queue-mem-events-option]]
===== `events`

Number of events the queue can store. This value should be evenly divisible by `flush.min_events` to
avoid sending partial batches to the output.
Number of events the queue can store.

The default value is 3200 events.

[float]
[[queue-mem-flush-min-events-option]]
===== `flush.min_events`

Minimum number of events required for publishing. If this value is set to 0 or 1, events are
available to the output immediately. If this value is greater than 1 the output must wait for the
queue to accumulate this minimum number of events or for `flush.timeout` to expire before
publishing. When greater than `1` this value also defines the maximum possible batch that can be
sent by the output.
If greater than 1, specifies the maximum number of events per batch. In this case the
output must wait for the
queue to accumulate the requested number of events or for `flush.timeout` to expire before
publishing.

If 0 or 1, sets the maximum number of events per batch to half the queue size, and sets
the queue to synchronous mode (equivalent to `flush.timeout` of 0).

The default value is 1600.

[float]
[[queue-mem-flush-timeout-option]]
===== `flush.timeout`

Maximum wait time for `flush.min_events` to be fulfilled. If set to 0s, events are available to the
output immediately.
Maximum wait time for event requests from the output to be fulfilled. If set to 0s, events are returned immediately.

The default value is 10s.

Expand Down
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ func TestClient(t *testing.T) {

// a small in-memory queue with a very short flush interval
q := memqueue.NewQueue(l, nil, memqueue.Settings{
Events: 5,
FlushMinEvents: 1,
FlushTimeout: time.Millisecond,
Events: 5,
MaxGetRequest: 1,
FlushTimeout: time.Millisecond,
}, 5)

// model a processor that we're going to make produce errors after
Expand Down
89 changes: 66 additions & 23 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,28 @@ package memqueue
type ackLoop struct {
broker *broker

// A list of ACK channels given to queue consumers,
// A list of batches given to queue consumers,
// used to maintain sequencing of event acknowledgements.
ackChans chanList
pendingBatches batchList
}

processACK func(chanList, int)
func newACKLoop(broker *broker) *ackLoop {
return &ackLoop{broker: broker}
}

func (l *ackLoop) run() {
b := l.broker
for {
nextBatchChan := l.ackChans.nextBatchChannel()
nextBatchChan := l.pendingBatches.nextBatchChannel()

select {
case <-l.broker.done:
case <-b.ctx.Done():
// The queue is shutting down.
return

case chanList := <-l.broker.scheduledACKs:
// A new batch has been generated, add its ACK channel to the end of
// the pending list.
l.ackChans.concat(&chanList)
case chanList := <-b.consumedChan:
// New batches have been generated, add them to the pending list
l.pendingBatches.concat(&chanList)

case <-nextBatchChan:
// The oldest outstanding batch has been acknowledged, advance our
Expand All @@ -57,11 +59,11 @@ func (l *ackLoop) run() {
// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {
lst := l.collectAcked()
ackedBatches := l.collectAcked()

count := 0
for current := lst.front(); current != nil; current = current.next {
count += current.count
for batch := ackedBatches.front(); batch != nil; batch = batch.next {
count += batch.count
}

if count > 0 {
Expand All @@ -70,11 +72,12 @@ func (l *ackLoop) handleBatchSig() int {
}

// report acks to waiting clients
l.processACK(lst, count)
l.processACK(ackedBatches, count)
}

for !lst.empty() {
releaseACKChan(lst.pop())
for !ackedBatches.empty() {
// Release finished batch structs into the shared memory pool
releaseBatch(ackedBatches.pop())
}

// return final ACK to EventLoop, in order to clean up internal buffer
Expand All @@ -84,23 +87,63 @@ func (l *ackLoop) handleBatchSig() int {
return count
}

func (l *ackLoop) collectAcked() chanList {
lst := chanList{}
func (l *ackLoop) collectAcked() batchList {
ackedBatches := batchList{}

acks := l.ackChans.pop()
lst.append(acks)
acks := l.pendingBatches.pop()
ackedBatches.append(acks)

done := false
for !l.ackChans.empty() && !done {
acks := l.ackChans.front()
for !l.pendingBatches.empty() && !done {
acks := l.pendingBatches.front()
select {
case <-acks.doneChan:
lst.append(l.ackChans.pop())
ackedBatches.append(l.pendingBatches.pop())

default:
done = true
}
}

return lst
return ackedBatches
}

// Called by ackLoop. This function exists to decouple the work of collecting
// and running producer callbacks from logical deletion of the events, so
// input callbacks can't block the queue by occupying the runLoop goroutine.
func (l *ackLoop) processACK(lst batchList, N int) {
ackCallbacks := []func(){}
// First we traverse the entries we're about to remove, collecting any callbacks
// we need to run.
lst.reverse()
for !lst.empty() {
batch := lst.pop()

// Traverse entries from last to first, so we can acknowledge the most recent
// ones first and skip subsequent producer callbacks.
for i := batch.count - 1; i >= 0; i-- {
entry := batch.rawEntry(i)
if entry.producer == nil {
continue
}

if entry.producerID <= entry.producer.state.lastACK {
// This index was already acknowledged on a previous iteration, skip.
entry.producer = nil
continue
}
producerState := entry.producer.state
count := int(entry.producerID - producerState.lastACK)
ackCallbacks = append(ackCallbacks, func() { producerState.cb(count) })
entry.producer.state.lastACK = entry.producerID
entry.producer = nil
}
}
// Signal runLoop to delete the events
l.broker.deleteChan <- N

// The events have been removed; notify their listeners.
for _, f := range ackCallbacks {
f()
}
}
53 changes: 0 additions & 53 deletions libbeat/publisher/queue/memqueue/batchbuf.go

This file was deleted.

Loading

0 comments on commit ecfb88f

Please sign in to comment.