From 668f0f2af5be2a46f36f487c26502212492e3964 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 22 Oct 2024 08:57:34 -0400 Subject: [PATCH] Restore memory queue's internal event cleanup after a batch is vended (#41356) Fix https://github.com/elastic/beats/issues/41355, where event data in the memory queue was not being freed when event batches were acknowledged, but only gradually as the queue buffer was overwritten by later events. This gave the same effect as if all beat instances, even low-volume ones, were running with a full / saturated event queue. The root cause, found by @swiatekm, is [this PR](https://github.com/elastic/beats/pull/39584), an unrelated cleanup of old code that accidentally included one live call along with the deprecated ones. (There was an old `FreeEntries` hook in pipeline batches that was only used for deprecated shipper configs, but the cleanup also removed the `FreeEntries` call _inside_ the queue which was essential for releasing event memory.) (cherry picked from commit fdb912a8ace4245dd64076a47b174059dd656b49) --- CHANGELOG.next.asciidoc | 1 + libbeat/publisher/pipeline/ttl_batch.go | 1 + libbeat/publisher/pipeline/ttl_batch_test.go | 10 +++++ libbeat/publisher/queue/diskqueue/consumer.go | 3 ++ libbeat/publisher/queue/memqueue/broker.go | 9 +++++ .../publisher/queue/memqueue/queue_test.go | 38 +++++++++++++++++++ libbeat/publisher/queue/queue.go | 4 ++ 7 files changed, 66 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 396338321ef7..3fed6771468a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -108,6 +108,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Support Elastic Agent control protocol chunking support {pull}37343[37343] - Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816] - Set timeout of 1 minute for FQDN requests {pull}37756[37756] +- Fix issue where old data could be saved in the memory queue after acknowledgment, increasing memory use {pull}41356[41356] *Auditbeat* diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index dab77fa56597..0ef4408b6136 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -77,6 +77,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { events = append(events, event) } } + original.FreeEntries() b := &ttlBatch{ done: original.Done, diff --git a/libbeat/publisher/pipeline/ttl_batch_test.go b/libbeat/publisher/pipeline/ttl_batch_test.go index 769ccc37c35f..4c5207acbb07 100644 --- a/libbeat/publisher/pipeline/ttl_batch_test.go +++ b/libbeat/publisher/pipeline/ttl_batch_test.go @@ -112,6 +112,12 @@ func TestBatchCallsDoneAndFreesEvents(t *testing.T) { require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback") } +func TestNewBatchFreesEvents(t *testing.T) { + queueBatch := &mockQueueBatch{} + _ = newBatch(nil, queueBatch, 0) + assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch") +} + type mockQueueBatch struct { freeEntriesCalled int } @@ -127,6 +133,10 @@ func (b *mockQueueBatch) Entry(i int) queue.Entry { return fmt.Sprintf("event %v", i) } +func (b *mockQueueBatch) FreeEntries() { + b.freeEntriesCalled++ +} + type mockRetryer struct { batches []*ttlBatch } diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 20e6648d927e..a0e5e944df31 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -97,6 +97,9 @@ func (batch *diskQueueBatch) Entry(i int) queue.Entry { return batch.frames[i].event } +func (batch *diskQueueBatch) FreeEntries() { +} + func (batch *diskQueueBatch) Done() { batch.queue.acks.addFrames(batch.frames) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index b617bae61102..3e3e47e502cc 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -398,6 +398,15 @@ func (b *batch) Entry(i int) queue.Entry { return b.rawEntry(i).event } +func (b *batch) FreeEntries() { + // This signals that the event data has been copied out of the batch, and is + // safe to free from the queue buffer, so set all the event pointers to nil. + for i := 0; i < b.count; i++ { + index := (b.start + i) % len(b.queue.buf) + b.queue.buf[index].event = nil + } +} + func (b *batch) Done() { b.doneChan <- batchDoneMsg{} } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 9cd209bbd51e..168c923e5987 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -262,3 +262,41 @@ func TestAdjustInputQueueSize(t *testing.T) { assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue)) }) } + +func TestBatchFreeEntries(t *testing.T) { + const queueSize = 10 + const batchSize = 5 + // 1. Add 10 events to the queue, request two batches with 5 events each + // 2. Make sure the queue buffer has 10 non-nil events + // 3. Call FreeEntries on the second batch + // 4. Make sure only events 6-10 are nil + // 5. Call FreeEntries on the first batch + // 6. Make sure all events are nil + testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil) + producer := testQueue.Producer(queue.ProducerConfig{}) + for i := 0; i < queueSize; i++ { + _, ok := producer.Publish(i) + require.True(t, ok, "Queue publish must succeed") + } + batch1, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request") + batch2, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request") + // Slight concurrency subtlety: we check events are non-nil after the queue + // reads, since if we do it before we have no way to be sure the insert + // has been completed. + for i := 0; i < queueSize; i++ { + require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil") + } + batch2.FreeEntries() + for i := 0; i < batchSize; i++ { + require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i) + require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i) + } + batch1.FreeEntries() + for i := 0; i < queueSize; i++ { + require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches") + } +} diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 075d7ad66a46..983a835a0699 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -112,6 +112,10 @@ type Batch interface { Count() int Entry(i int) Entry Done() + // Release internal references to the contained events if supported + // (the disk queue does not currently implement this). + // Entry() should not be used after this call. + FreeEntries() } // Outputs can provide an EncoderFactory to enable early encoding, in which