Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #41356) Restore memory queue's internal event cleanup after a batch is vended #41364

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Support Elastic Agent control protocol chunking support {pull}37343[37343]
- Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816]
- Set timeout of 1 minute for FQDN requests {pull}37756[37756]
- Fix issue where old data could be saved in the memory queue after acknowledgment, increasing memory use {pull}41356[41356]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
events = append(events, event)
}
}
original.FreeEntries()

b := &ttlBatch{
done: original.Done,
Expand Down
10 changes: 10 additions & 0 deletions libbeat/publisher/pipeline/ttl_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ func TestBatchCallsDoneAndFreesEvents(t *testing.T) {
require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback")
}

func TestNewBatchFreesEvents(t *testing.T) {
queueBatch := &mockQueueBatch{}
_ = newBatch(nil, queueBatch, 0)
assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch")
}

type mockQueueBatch struct {
freeEntriesCalled int
}
Expand All @@ -127,6 +133,10 @@ func (b *mockQueueBatch) Entry(i int) queue.Entry {
return fmt.Sprintf("event %v", i)
}

func (b *mockQueueBatch) FreeEntries() {
b.freeEntriesCalled++
}

type mockRetryer struct {
batches []*ttlBatch
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (batch *diskQueueBatch) Entry(i int) queue.Entry {
return batch.frames[i].event
}

func (batch *diskQueueBatch) FreeEntries() {
}

func (batch *diskQueueBatch) Done() {
batch.queue.acks.addFrames(batch.frames)
}
9 changes: 9 additions & 0 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@
}

func newBatch(queue *broker, start, count int) *batch {
batch := batchPool.Get().(*batch)

Check failure on line 293 in libbeat/publisher/queue/memqueue/broker.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value is not checked (errcheck)
batch.next = nil
batch.queue = queue
batch.start = start
Expand Down Expand Up @@ -398,6 +398,15 @@
return b.rawEntry(i).event
}

func (b *batch) FreeEntries() {
// This signals that the event data has been copied out of the batch, and is
// safe to free from the queue buffer, so set all the event pointers to nil.
for i := 0; i < b.count; i++ {
index := (b.start + i) % len(b.queue.buf)
b.queue.buf[index].event = nil
}
}

func (b *batch) Done() {
b.doneChan <- batchDoneMsg{}
}
38 changes: 38 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,41 @@ func TestAdjustInputQueueSize(t *testing.T) {
assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue))
})
}

func TestBatchFreeEntries(t *testing.T) {
const queueSize = 10
const batchSize = 5
// 1. Add 10 events to the queue, request two batches with 5 events each
// 2. Make sure the queue buffer has 10 non-nil events
// 3. Call FreeEntries on the second batch
// 4. Make sure only events 6-10 are nil
// 5. Call FreeEntries on the first batch
// 6. Make sure all events are nil
testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil)
producer := testQueue.Producer(queue.ProducerConfig{})
for i := 0; i < queueSize; i++ {
_, ok := producer.Publish(i)
require.True(t, ok, "Queue publish must succeed")
}
batch1, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request")
batch2, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request")
// Slight concurrency subtlety: we check events are non-nil after the queue
// reads, since if we do it before we have no way to be sure the insert
// has been completed.
for i := 0; i < queueSize; i++ {
require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil")
}
batch2.FreeEntries()
for i := 0; i < batchSize; i++ {
require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i)
require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i)
}
batch1.FreeEntries()
for i := 0; i < queueSize; i++ {
require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches")
}
}
4 changes: 4 additions & 0 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ type Batch interface {
Count() int
Entry(i int) Entry
Done()
// Release internal references to the contained events if supported
// (the disk queue does not currently implement this).
// Entry() should not be used after this call.
FreeEntries()
}

// Outputs can provide an EncoderFactory to enable early encoding, in which
Expand Down
Loading