From fef001fdbce7b5d7fa702328fa63a83e9601da72 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 9 Nov 2023 18:03:18 +0100 Subject: [PATCH] Fix memqueue getting stuck on shutdown There was a case when openState.publish could get stuck and ignore its shutdown signal, effectively preventing a Filebeat (or any Beat) from gracefully terminating. This commit fixes this by ensuring every channel read/write also checks for the shutdown signal. --- CHANGELOG.next.asciidoc | 1 + libbeat/publisher/queue/memqueue/produce.go | 12 ++- .../publisher/queue/memqueue/queue_test.go | 75 +++++++++++++++++++ 3 files changed, 87 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 462af8725a1e..f1293b7d1725 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Upgraded apache arrow library used in x-pack/libbeat/reader/parquet from v11 to v12.0.1 in order to fix cross-compilation issues {pull}35640[35640] - Fix panic when MaxRetryInterval is specified, but RetryInterval is not {pull}35820[35820] - Support build of projects outside of beats directory {pull}36126[36126] +- Fix memqueue producer blocking indefinitely even after being cancelled {issue}22813[22813] {pull}37077[37077] *Auditbeat* diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 11a792af5761..954ea055f4a4 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -143,7 +143,17 @@ func (st *openState) Close() { func (st *openState) publish(req pushRequest) (queue.EntryID, bool) { select { case st.events <- req: - return <-req.resp, true + // If the output is blocked and the queue is full, `req` is written + // to `st.events`, however the queue never writes back to `req.resp`, + // which effectively blocks for ever. So we also need to select on the + // done channel to ensure we don't miss the shutdown signal. + select { + case resp := <-req.resp: + return resp, true + case <-st.done: + st.events = nil + return 0, false + } case <-st.done: st.events = nil return 0, false diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 28cc38025c38..531acdce3e98 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -23,11 +23,14 @@ import ( "math" "math/rand" "sync" + "sync/atomic" "testing" "time" "gotest.tools/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest" "github.com/elastic/elastic-agent-libs/mapstr" @@ -74,6 +77,78 @@ func TestProduceConsumer(t *testing.T) { t.Run("flush", testWith(makeTestQueue(bufferSize, batchSize/2, 100*time.Millisecond))) } +// TestProducerDoesNotBlockWhenCancelled ensures the producer Publish +// does not block indefinitely. +// +// Once we get a producer `p` from the queue we want to ensure +// that if p.Publish is called and blocks it will unblock once +// p.Cancel is called. +// +// For this test we start a queue with size 2 and try to add more +// than 2 events to it, p.Publish will block, once we call p.Cancel, +// we ensure the 3rd event was not successfully published. +func TestProducerDoesNotBlockWhenCancelled(t *testing.T) { + q := NewQueue(nil, nil, + Settings{ + Events: 2, // Queue size + FlushMinEvents: 1, // make sure the queue won't buffer events + FlushTimeout: time.Millisecond, + }, 0) + + p := q.Producer(queue.ProducerConfig{ + // We do not read from the queue, so the callbacks are never called + ACK: func(count int) {}, + OnDrop: func(e interface{}) {}, + DropOnCancel: false, + }) + + success := atomic.Bool{} + publishCount := atomic.Int32{} + go func() { + // Publish 2 events, this will make the queue full, but + // both will be accepted + for i := 0; i < 2; i++ { + id, ok := p.Publish(fmt.Sprintf("Event %d", i)) + if !ok { + t.Errorf("failed to publish to the queue, event ID: %v", id) + return + } + publishCount.Add(1) + } + _, ok := p.Publish("Event 3") + if ok { + t.Errorf("publishing the 3rd event must fail") + return + } + + // Flag the test as successful + success.Store(true) + }() + + // Allow the producer to run and the queue to do its thing. + // Two events should be accepted and the third call to p.Publish + // must block + // time.Sleep(100 * time.Millisecond) + + // Ensure we published two events + require.Eventually( + t, + func() bool { return publishCount.Load() == 2 }, + 200*time.Millisecond, + time.Millisecond, + "the first two events were not successfully published") + + // Cancel the producer, this should unblock its Publish method + p.Cancel() + + require.Eventually( + t, + success.Load, + 200*time.Millisecond, + 1*time.Millisecond, + "test not flagged as successful, p.Publish likely blocked indefinitely") +} + func TestQueueMetricsDirect(t *testing.T) { eventsToTest := 5 maxEvents := 10