Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memqueue getting stuck on shutdown #37077

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Upgraded apache arrow library used in x-pack/libbeat/reader/parquet from v11 to v12.0.1 in order to fix cross-compilation issues {pull}35640[35640]
- Fix panic when MaxRetryInterval is specified, but RetryInterval is not {pull}35820[35820]
- Support build of projects outside of beats directory {pull}36126[36126]
- Fix memqueue producer blocking indefinitely even after being cancelled {issue}22813[22813] {pull}37077[37077]

*Auditbeat*

Expand Down
12 changes: 11 additions & 1 deletion libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,17 @@ func (st *openState) Close() {
func (st *openState) publish(req pushRequest) (queue.EntryID, bool) {
select {
case st.events <- req:
return <-req.resp, true
// If the output is blocked and the queue is full, `req` is written
// to `st.events`, however the queue never writes back to `req.resp`,
// which effectively blocks for ever. So we also need to select on the
// done channel to ensure we don't miss the shutdown signal.
select {
case resp := <-req.resp:
return resp, true
case <-st.done:
st.events = nil
return 0, false
}
case <-st.done:
st.events = nil
return 0, false
Expand Down
75 changes: 75 additions & 0 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"math"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

"gotest.tools/assert"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -74,6 +77,78 @@ func TestProduceConsumer(t *testing.T) {
t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond)))
}

// TestProducerDoesNotBlockWhenCancelled ensures the producer Publish
// does not block indefinitely.
//
// Once we get a producer `p` from the queue we want to ensure
// that if p.Publish is called and blocks it will unblock once
// p.Cancel is called.
//
// For this test we start a queue with size 2 and try to add more
// than 2 events to it, p.Publish will block, once we call p.Cancel,
// we ensure the 3rd event was not successfully published.
func TestProducerDoesNotBlockWhenCancelled(t *testing.T) {
q := NewQueue(nil, nil,
Settings{
Events: 2, // Queue size
FlushMinEvents: 1, // make sure the queue won't buffer events
FlushTimeout: time.Millisecond,
}, 0)

p := q.Producer(queue.ProducerConfig{
// We do not read from the queue, so the callbacks are never called
ACK: func(count int) {},
OnDrop: func(e interface{}) {},
DropOnCancel: false,
})

success := atomic.Bool{}
publishCount := atomic.Int32{}
go func() {
// Publish 2 events, this will make the queue full, but
// both will be accepted
for i := 0; i < 2; i++ {
id, ok := p.Publish(fmt.Sprintf("Event %d", i))
if !ok {
t.Errorf("failed to publish to the queue, event ID: %v", id)
return
}
publishCount.Add(1)
}
_, ok := p.Publish("Event 3")
if ok {
t.Errorf("publishing the 3rd event must fail")
return
}

// Flag the test as successful
success.Store(true)
}()

// Allow the producer to run and the queue to do its thing.
// Two events should be accepted and the third call to p.Publish
// must block
// time.Sleep(100 * time.Millisecond)

// Ensure we published two events
require.Eventually(
t,
func() bool { return publishCount.Load() == 2 },
200*time.Millisecond,
time.Millisecond,
"the first two events were not successfully published")

// Cancel the producer, this should unblock its Publish method
p.Cancel()

require.Eventually(
t,
success.Load,
200*time.Millisecond,
1*time.Millisecond,
"test not flagged as successful, p.Publish likely blocked indefinitely")
}

func TestQueueMetricsDirect(t *testing.T) {
eventsToTest := 5
maxEvents := 10
Expand Down
Loading