From 01af03bfea55ea4218a60d208d10cf78e3109d5a Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 18 Dec 2024 11:01:57 -0500 Subject: [PATCH 1/4] Add upstream deadlock warning to the logstash output (#41960) Add an internal timeout in the Logstash output so that it logs an error when published events stop making progress for an extended time, which can indicate that the Logstash host is silently deadlocked. (cherry picked from commit 0e62bf8f03efc8bf5f408ec02aae3b0835b4c38c) # Conflicts: # libbeat/outputs/logstash/async.go --- libbeat/outputs/logstash/async.go | 56 ++++++------- libbeat/outputs/logstash/deadlock.go | 95 +++++++++++++++++++++++ libbeat/outputs/logstash/deadlock_test.go | 51 ++++++++++++ libbeat/outputs/logstash/sync.go | 7 +- 4 files changed, 178 insertions(+), 31 deletions(-) create mode 100644 libbeat/outputs/logstash/deadlock.go create mode 100644 libbeat/outputs/logstash/deadlock_test.go diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index a980d1cef32c..b37f93c5cd07 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -46,13 +46,14 @@ type asyncClient struct { } type msgRef struct { - client *asyncClient - count atomic.Uint32 - batch publisher.Batch - slice []publisher.Event - err error - win *window - batchSize int + client *asyncClient + count atomic.Uint32 + batch publisher.Batch + slice []publisher.Event + err error + win *window + batchSize int + deadlockListener *deadlockListener } func newAsyncClient( @@ -146,6 +147,7 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { } ref := &msgRef{ +<<<<<<< HEAD client: c, count: atomic.MakeUint32(1), batch: batch, @@ -153,6 +155,15 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { batchSize: len(events), win: c.win, err: nil, +======= + client: c, + batch: batch, + slice: events, + batchSize: len(events), + win: c.win, + err: nil, + deadlockListener: newDeadlockListener(c.log, logstashDeadlockTimeout), +>>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) } defer ref.dec() @@ -229,34 +240,21 @@ func (c *asyncClient) getClient() *v2.AsyncClient { return client } -func (r *msgRef) callback(seq uint32, err error) { - if err != nil { - r.fail(seq, err) - } else { - r.done(seq) - } -} - -func (r *msgRef) done(n uint32) { +func (r *msgRef) callback(n uint32, err error) { r.client.observer.AckedEvents(int(n)) r.slice = r.slice[n:] - if r.win != nil { - r.win.tryGrowWindow(r.batchSize) - } - r.dec() -} - -func (r *msgRef) fail(n uint32, err error) { + r.deadlockListener.ack(int(n)) if r.err == nil { r.err = err } - r.slice = r.slice[n:] + // If publishing is windowed, update the window size. if r.win != nil { - r.win.shrinkWindow() + if err != nil { + r.win.shrinkWindow() + } else { + r.win.tryGrowWindow(r.batchSize) + } } - - r.client.observer.AckedEvents(int(n)) - r.dec() } @@ -266,6 +264,8 @@ func (r *msgRef) dec() { return } + r.deadlockListener.close() + if L := len(r.slice); L > 0 { r.client.observer.RetryableErrors(L) } diff --git a/libbeat/outputs/logstash/deadlock.go b/libbeat/outputs/logstash/deadlock.go new file mode 100644 index 000000000000..9a291baeda02 --- /dev/null +++ b/libbeat/outputs/logstash/deadlock.go @@ -0,0 +1,95 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logstash + +import ( + "time" + + "github.com/elastic/elastic-agent-libs/logp" +) + +type deadlockListener struct { + log *logp.Logger + timeout time.Duration + ticker *time.Ticker + + ackChan chan int + + doneChan chan struct{} +} + +const logstashDeadlockTimeout = 5 * time.Minute + +func newDeadlockListener(log *logp.Logger, timeout time.Duration) *deadlockListener { + if timeout <= 0 { + return nil + } + r := &deadlockListener{ + log: log, + timeout: timeout, + ticker: time.NewTicker(timeout), + + ackChan: make(chan int), + doneChan: make(chan struct{}), + } + go r.run() + return r +} + +func (r *deadlockListener) run() { + defer r.ticker.Stop() + defer close(r.doneChan) + for { + select { + case n, ok := <-r.ackChan: + if !ok { + // Listener has been closed + return + } + if n > 0 { + // If progress was made, reset the countdown. + r.ticker.Reset(r.timeout) + } + case <-r.ticker.C: + // No progress was made within the timeout, log error so users + // know there is likely a problem with the upstream host + r.log.Errorf("Logstash batch hasn't reported progress in the last %v, the Logstash host may be stalled. This problem can be prevented by configuring Logstash to use PipelineBusV1 or by upgrading Logstash to 8.17+, for details see https://github.com/elastic/logstash/issues/16657", r.timeout) + return + } + } +} + +func (r *deadlockListener) ack(n int) { + if r == nil { + return + } + // Send the new ack to the run loop, unless it has already shut down in + // which case it can be safely ignored. + select { + case r.ackChan <- n: + case <-r.doneChan: + } +} + +func (r *deadlockListener) close() { + if r == nil { + return + } + // Signal the run loop to shut down + close(r.ackChan) +} diff --git a/libbeat/outputs/logstash/deadlock_test.go b/libbeat/outputs/logstash/deadlock_test.go new file mode 100644 index 000000000000..15c3716b9971 --- /dev/null +++ b/libbeat/outputs/logstash/deadlock_test.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logstash + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestDeadlockListener(t *testing.T) { + const timeout = 5 * time.Millisecond + log := logp.NewLogger("test") + listener := newDeadlockListener(log, timeout) + + // Verify that the listener doesn't trigger when receiving regular acks + for i := 0; i < 5; i++ { + time.Sleep(timeout / 2) + listener.ack(1) + } + select { + case <-listener.doneChan: + require.Fail(t, "Deadlock listener should not trigger unless there is no progress for the configured time interval") + case <-time.After(timeout / 2): + } + + // Verify that the listener does trigger when the acks stop + select { + case <-time.After(timeout): + require.Fail(t, "Deadlock listener should trigger when there is no progress for the configured time interval") + case <-listener.doneChan: + } +} diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 6a4569073650..9f74fbf49676 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -113,6 +113,8 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { return nil } + deadlockListener := newDeadlockListener(c.log, logstashDeadlockTimeout) + defer deadlockListener.close() for len(events) > 0 { // check if we need to reconnect @@ -150,13 +152,11 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { events = events[n:] st.AckedEvents(n) + deadlockListener.ack(n) if err != nil { // return batch to pipeline before reporting/counting error batch.RetryEvents(events) - if c.win != nil { - c.win.shrinkWindow() - } _ = c.Close() c.log.Errorf("Failed to publish events caused by: %+v", err) @@ -186,6 +186,7 @@ func (c *syncClient) publishWindowed(events []publisher.Event) (int, error) { n, err := c.sendEvents(events) if err != nil { + c.win.shrinkWindow() return n, err } From 682f092663eb88aa9e90a1ef729d49711b593ab8 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 19 Dec 2024 14:44:46 -0500 Subject: [PATCH 2/4] fix merge --- libbeat/outputs/logstash/async.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index b37f93c5cd07..0c6dda4d0902 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -147,15 +147,6 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { } ref := &msgRef{ -<<<<<<< HEAD - client: c, - count: atomic.MakeUint32(1), - batch: batch, - slice: events, - batchSize: len(events), - win: c.win, - err: nil, -======= client: c, batch: batch, slice: events, @@ -163,7 +154,6 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { win: c.win, err: nil, deadlockListener: newDeadlockListener(c.log, logstashDeadlockTimeout), ->>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) } defer ref.dec() From 29ccc6f00a75be9edd400047af0848be997288e9 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 19 Dec 2024 16:26:14 -0500 Subject: [PATCH 3/4] working on test failures --- libbeat/outputs/logstash/deadlock.go | 2 +- libbeat/outputs/logstash/deadlock_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/outputs/logstash/deadlock.go b/libbeat/outputs/logstash/deadlock.go index 9a291baeda02..715266e8aab8 100644 --- a/libbeat/outputs/logstash/deadlock.go +++ b/libbeat/outputs/logstash/deadlock.go @@ -75,7 +75,7 @@ func (r *deadlockListener) run() { } func (r *deadlockListener) ack(n int) { - if r == nil { + if r == nil || n <= 0 { return } // Send the new ack to the run loop, unless it has already shut down in diff --git a/libbeat/outputs/logstash/deadlock_test.go b/libbeat/outputs/logstash/deadlock_test.go index 15c3716b9971..ffe9850e1205 100644 --- a/libbeat/outputs/logstash/deadlock_test.go +++ b/libbeat/outputs/logstash/deadlock_test.go @@ -27,7 +27,7 @@ import ( ) func TestDeadlockListener(t *testing.T) { - const timeout = 5 * time.Millisecond + const timeout = 25 * time.Millisecond log := logp.NewLogger("test") listener := newDeadlockListener(log, timeout) From b2ef6d3b5707206b4236492a7304aabba6752513 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 30 Dec 2024 14:09:48 -0500 Subject: [PATCH 4/4] make test failures more verbose --- .../logstash/logstash_integration_test.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 286717e49ede..241d8c8778eb 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -24,7 +24,6 @@ import ( "encoding/json" "fmt" "os" - "sync" "testing" "time" @@ -424,8 +423,8 @@ func testSendMultipleBatchesViaLogstash( } for _, batch := range batches { - ok := ls.BulkPublish(batch) - assert.Equal(t, true, ok) + result := ls.BulkPublish(batch) + assert.Equal(t, outest.BatchACK, result) } // wait for logstash event flush + elasticsearch @@ -569,20 +568,17 @@ func (t *testOutputer) PublishEvent(event beat.Event) { t.Publish(context.Background(), batch) } -func (t *testOutputer) BulkPublish(events []beat.Event) bool { - ok := false +func (t *testOutputer) BulkPublish(events []beat.Event) outest.BatchSignalTag { + resultChan := make(chan outest.BatchSignalTag, 1) batch := encodeBatch(t.encoder, outest.NewBatch(events...)) - var wg sync.WaitGroup - wg.Add(1) batch.OnSignal = func(sig outest.BatchSignal) { - ok = sig.Tag == outest.BatchACK - wg.Done() + resultChan <- sig.Tag + close(resultChan) } t.Publish(context.Background(), batch) - wg.Wait() - return ok + return <-resultChan } // encodeBatch encodes a publisher.Batch so it can be provided to