From cb0fd85424db1beac24c922eedaa2f46755557f7 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 14 Oct 2024 15:41:29 -0400 Subject: [PATCH] Use context to client worker cancellation --- libbeat/publisher/pipeline/client_worker.go | 28 ++++++++++----------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/libbeat/publisher/pipeline/client_worker.go b/libbeat/publisher/pipeline/client_worker.go index dc51979d0333..3e6b8202dd21 100644 --- a/libbeat/publisher/pipeline/client_worker.go +++ b/libbeat/publisher/pipeline/client_worker.go @@ -29,8 +29,8 @@ import ( ) type worker struct { - qu chan publisher.Batch - done chan struct{} + qu chan publisher.Batch + cancel func() } // clientWorker manages output client of type outputs.Client, not supporting reconnect. @@ -50,14 +50,15 @@ type netClientWorker struct { } func makeClientWorker(qu chan publisher.Batch, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker { + ctx, cancel := context.WithCancel(context.Background()) w := worker{ - qu: qu, - done: make(chan struct{}), + qu: qu, + cancel: cancel, } var c interface { outputWorker - run() + run(context.Context) } if nc, ok := client.(outputs.NetworkClient); ok { @@ -71,12 +72,12 @@ func makeClientWorker(qu chan publisher.Batch, client outputs.Client, logger log c = &clientWorker{worker: w, client: client} } - go c.run() + go c.run(ctx) return c } func (w *worker) close() { - close(w.done) + w.cancel() } func (w *clientWorker) Close() error { @@ -84,20 +85,20 @@ func (w *clientWorker) Close() error { return w.client.Close() } -func (w *clientWorker) run() { +func (w *clientWorker) run(ctx context.Context) { for { // We wait for either the worker to be closed or for there to be a batch of // events to publish. select { - case <-w.done: + case <-ctx.Done(): return case batch := <-w.qu: if batch == nil { continue } - if err := w.client.Publish(context.TODO(), batch); err != nil { + if err := w.client.Publish(ctx, batch); err != nil { return } } @@ -109,21 +110,18 @@ func (w *netClientWorker) Close() error { return w.client.Close() } -func (w *netClientWorker) run() { +func (w *netClientWorker) run(ctx context.Context) { var ( connected = false reconnectAttempts = 0 ) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - for { // We wait for either the worker to be closed or for there to be a batch of // events to publish. select { - case <-w.done: + case <-ctx.Done(): return case batch := <-w.qu: