Skip to content

Commit

Permalink
Restore memory queue's internal event cleanup after a batch is vended (
Browse files Browse the repository at this point in the history
…#41356)

Fix #41355, where event data in the memory queue was not being freed when event batches were acknowledged, but only gradually as the queue buffer was overwritten by later events. This gave the same effect as if all beat instances, even low-volume ones, were running with a full / saturated event queue.

The root cause, found by @swiatekm, is [this PR](#39584), an unrelated cleanup of old code that accidentally included one live call along with the deprecated ones. (There was an old `FreeEntries` hook in pipeline batches that was only used for deprecated shipper configs, but the cleanup also removed the `FreeEntries` call _inside_ the queue which was essential for releasing event memory.)
  • Loading branch information
faec authored Oct 22, 2024
1 parent ec92e02 commit fdb912a
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Support Elastic Agent control protocol chunking support {pull}37343[37343]
- Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816]
- Set timeout of 1 minute for FQDN requests {pull}37756[37756]
- Fix issue where old data could be saved in the memory queue after acknowledgment, increasing memory use {pull}41356[41356]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
events = append(events, event)
}
}
original.FreeEntries()

b := &ttlBatch{
done: original.Done,
Expand Down
10 changes: 10 additions & 0 deletions libbeat/publisher/pipeline/ttl_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ func TestBatchCallsDoneAndFreesEvents(t *testing.T) {
require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback")
}

func TestNewBatchFreesEvents(t *testing.T) {
queueBatch := &mockQueueBatch{}
_ = newBatch(nil, queueBatch, 0)
assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch")
}

type mockQueueBatch struct {
freeEntriesCalled int
}
Expand All @@ -127,6 +133,10 @@ func (b *mockQueueBatch) Entry(i int) queue.Entry {
return fmt.Sprintf("event %v", i)
}

func (b *mockQueueBatch) FreeEntries() {
b.freeEntriesCalled++
}

type mockRetryer struct {
batches []*ttlBatch
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (batch *diskQueueBatch) Entry(i int) queue.Entry {
return batch.frames[i].event
}

func (batch *diskQueueBatch) FreeEntries() {
}

func (batch *diskQueueBatch) Done() {
batch.queue.acks.addFrames(batch.frames)
}
9 changes: 9 additions & 0 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,15 @@ func (b *batch) Entry(i int) queue.Entry {
return b.rawEntry(i).event
}

func (b *batch) FreeEntries() {
// This signals that the event data has been copied out of the batch, and is
// safe to free from the queue buffer, so set all the event pointers to nil.
for i := 0; i < b.count; i++ {
index := (b.start + i) % len(b.queue.buf)
b.queue.buf[index].event = nil
}
}

func (b *batch) Done() {
b.doneChan <- batchDoneMsg{}
}
38 changes: 38 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,41 @@ func TestAdjustInputQueueSize(t *testing.T) {
assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue))
})
}

func TestBatchFreeEntries(t *testing.T) {
const queueSize = 10
const batchSize = 5
// 1. Add 10 events to the queue, request two batches with 5 events each
// 2. Make sure the queue buffer has 10 non-nil events
// 3. Call FreeEntries on the second batch
// 4. Make sure only events 6-10 are nil
// 5. Call FreeEntries on the first batch
// 6. Make sure all events are nil
testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil)
producer := testQueue.Producer(queue.ProducerConfig{})
for i := 0; i < queueSize; i++ {
_, ok := producer.Publish(i)
require.True(t, ok, "Queue publish must succeed")
}
batch1, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request")
batch2, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request")
// Slight concurrency subtlety: we check events are non-nil after the queue
// reads, since if we do it before we have no way to be sure the insert
// has been completed.
for i := 0; i < queueSize; i++ {
require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil")
}
batch2.FreeEntries()
for i := 0; i < batchSize; i++ {
require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i)
require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i)
}
batch1.FreeEntries()
for i := 0; i < queueSize; i++ {
require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches")
}
}
4 changes: 4 additions & 0 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ type Batch interface {
Count() int
Entry(i int) Entry
Done()
// Release internal references to the contained events if supported
// (the disk queue does not currently implement this).
// Entry() should not be used after this call.
FreeEntries()
}

// Outputs can provide an EncoderFactory to enable early encoding, in which
Expand Down

0 comments on commit fdb912a

Please sign in to comment.