Skip to content

Commit

Permalink
Use context to client worker cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Oct 18, 2024
1 parent 7bac99e commit cb0fd85
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions libbeat/publisher/pipeline/client_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
)

type worker struct {
qu chan publisher.Batch
done chan struct{}
qu chan publisher.Batch
cancel func()
}

// clientWorker manages output client of type outputs.Client, not supporting reconnect.
Expand All @@ -50,14 +50,15 @@ type netClientWorker struct {
}

func makeClientWorker(qu chan publisher.Batch, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker {
ctx, cancel := context.WithCancel(context.Background())
w := worker{
qu: qu,
done: make(chan struct{}),
qu: qu,
cancel: cancel,
}

var c interface {
outputWorker
run()
run(context.Context)
}

if nc, ok := client.(outputs.NetworkClient); ok {
Expand All @@ -71,33 +72,33 @@ func makeClientWorker(qu chan publisher.Batch, client outputs.Client, logger log
c = &clientWorker{worker: w, client: client}
}

go c.run()
go c.run(ctx)
return c
}

func (w *worker) close() {
close(w.done)
w.cancel()
}

func (w *clientWorker) Close() error {
w.worker.close()
return w.client.Close()
}

func (w *clientWorker) run() {
func (w *clientWorker) run(ctx context.Context) {
for {
// We wait for either the worker to be closed or for there to be a batch of
// events to publish.
select {

case <-w.done:
case <-ctx.Done():
return

case batch := <-w.qu:
if batch == nil {
continue
}
if err := w.client.Publish(context.TODO(), batch); err != nil {
if err := w.client.Publish(ctx, batch); err != nil {
return
}
}
Expand All @@ -109,21 +110,18 @@ func (w *netClientWorker) Close() error {
return w.client.Close()
}

func (w *netClientWorker) run() {
func (w *netClientWorker) run(ctx context.Context) {
var (
connected = false
reconnectAttempts = 0
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
// We wait for either the worker to be closed or for there to be a batch of
// events to publish.
select {

case <-w.done:
case <-ctx.Done():
return

case batch := <-w.qu:
Expand Down

0 comments on commit cb0fd85

Please sign in to comment.