diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 5230f3c57a3f..bbb1b0794331 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -19,6 +19,7 @@ package elasticsearch import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -49,6 +50,7 @@ type Client struct { timeout time.Duration // buffered bulk requests + ctx clientContext bulkRequ *bulkRequest // buffered json response reader @@ -61,6 +63,11 @@ type Client struct { observer outputs.Observer } +type clientContext struct { + ctx context.Context + cancel context.CancelFunc +} + // ClientSettings contains the settings for a client. type ClientSettings struct { URL string @@ -669,7 +676,14 @@ func (client *Client) String() string { // the configured host, updates the known Elasticsearch version and calls // globally configured handlers. func (client *Client) Connect() error { - return client.Connection.Connect() + err := client.Connection.Connect() + if err != nil { + return err + } + + client.ctx.init() + client.bulkRequ.requ = client.bulkRequ.requ.WithContext(client.ctx.Context()) + return nil } // Connect connects the client. It runs a GET request against the root URL of @@ -725,8 +739,19 @@ func (conn *Connection) Ping() (string, error) { return response.Version.Number, nil } +// Close closes idle connections and can cancel active requests. +func (client *Client) Close() error { + client.ctx.close() // cancel active requests using the client context + client.Connection.Close() + return nil +} + // Close closes a connection. func (conn *Connection) Close() error { + t := conn.http.Transport + if ci, ok := t.(interface{ CloseIdleConnections() }); ok { + ci.CloseIdleConnections() + } return nil } @@ -826,3 +851,16 @@ func closing(c io.Closer) { logp.Warn("Close failed with: %v", err) } } + +func (c *clientContext) init() { + c.ctx, c.cancel = context.WithCancel(context.Background()) +} + +func (c *clientContext) close() { + c.cancel() + c.cancel = func() {} +} + +func (c *clientContext) Context() context.Context { + return c.ctx +} diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 60ed3519ae0c..eeb715e19a57 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -18,7 +18,9 @@ package pipeline import ( - "github.com/elastic/beats/libbeat/common/atomic" + "errors" + "sync" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" ) @@ -28,24 +30,36 @@ type clientWorker struct { observer outputObserver qu workQueue client outputs.Client - closed atomic.Bool + done chan struct{} + wg sync.WaitGroup } // netClientWorker manages reconnectable output clients of type outputs.NetworkClient. type netClientWorker struct { observer outputObserver qu workQueue - client outputs.NetworkClient - closed atomic.Bool + client netClient + done chan struct{} + wg sync.WaitGroup batchSize int batchSizer func() int } +type netClient struct { + client outputs.NetworkClient + + mu sync.Mutex + err error + active bool +} + +var errOutputDisabled = errors.New("output disabled") + func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker { if nc, ok := client.(outputs.NetworkClient); ok { - c := &netClientWorker{observer: observer, qu: qu, client: nc} - go c.run() + c := &netClientWorker{observer: observer, qu: qu, client: makeNetClient(nc)} + c.start() return c } c := &clientWorker{observer: observer, qu: qu, client: client} @@ -54,74 +68,229 @@ func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Clie } func (w *clientWorker) Close() error { - w.closed.Store(true) - return w.client.Close() + close(w.done) + err := w.client.Close() + w.wg.Wait() + return err +} + +func (w *clientWorker) start() { + w.wg.Add(1) + go func() { + defer w.wg.Done() + w.run() + }() } func (w *clientWorker) run() { - for !w.closed.Load() { - for batch := range w.qu { - w.observer.outBatchSend(len(batch.events)) + for w.active() { + batch, ok := w.next() + if !ok { + return + } + + w.observer.outBatchSend(len(batch.events)) + if err := w.client.Publish(batch); err != nil { + return + } + } +} - if err := w.client.Publish(batch); err != nil { - return +func (w *clientWorker) active() bool { + select { + case <-w.done: + return false + default: + return true + } +} + +func (w *clientWorker) next() (*Batch, bool) { + for { + select { + case <-w.done: + return nil, false + case b := <-w.qu: + if b != nil { + return b, true } } } } func (w *netClientWorker) Close() error { - w.closed.Store(true) - return w.client.Close() + close(w.done) + err := w.client.Disable() // async close and disable client from reconnecting + w.wg.Wait() + return err } -func (w *netClientWorker) run() { - for !w.closed.Load() { - reconnectAttempts := 0 +func (w *netClientWorker) start() { + w.wg.Add(1) + go func() { + defer w.wg.Done() + w.run() + }() +} - // start initial connect loop from first batch, but return - // batch to pipeline for other outputs to catch up while we're trying to connect - for batch := range w.qu { - batch.Cancelled() +func (w *netClientWorker) run() { + var ( + connected bool + reconnectAttempts int + ) - if w.closed.Load() { - logp.Info("Closed connection to %v", w.client) - return - } + for w.active() { + batch, ok := w.next() + if !ok { + break + } + if !connected { + batch.Cancelled() if reconnectAttempts > 0 { - logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) + logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client.String(), reconnectAttempts) } else { - logp.Info("Connecting to %v", w.client) + logp.Info("Connecting to %v", w.client.String()) } - err := w.client.Connect() - if err != nil { - logp.Err("Failed to connect to %v: %v", w.client, err) + err := w.connect() + connected = err == nil + if connected { + reconnectAttempts = 0 + } else { reconnectAttempts++ - continue } + continue + } - logp.Info("Connection to %v established", w.client) - reconnectAttempts = 0 - break + err := w.client.Publish(batch) + if err != nil { + logp.Err("Failed to publish events: %v", err) + // on error return to connect loop + connected = false + + w.client.Close() } + } +} - // send loop - for batch := range w.qu { - if w.closed.Load() { - if batch != nil { - batch.Cancelled() - } - return +func (w *netClientWorker) next() (*Batch, bool) { + for { + select { + case <-w.done: + return nil, false + case b := <-w.qu: + if b != nil { + return b, true } + } + } +} - err := w.client.Publish(batch) - if err != nil { - logp.Err("Failed to publish events: %v", err) - // on error return to connect loop - break - } +func (w *netClientWorker) connect() error { + err := w.client.Connect() + if err != nil { + logp.Err("Failed to connect to %v: %v", w.client.String(), err) + } else { + logp.Info("Connection to %v established", w.client.String()) + } + return err +} + +func (w *netClientWorker) active() bool { + select { + case <-w.done: + return false + default: + return true + } +} + +func makeNetClient(c outputs.NetworkClient) netClient { + return netClient{ + client: c, + active: true, + } +} + +func (c *netClient) Disable() error { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.active { + return c.err + } + + c.active = false + err := c.Close() + if err != nil { + c.err = err + } + return err +} + +func (c *netClient) Err() error { + c.mu.Lock() + defer c.mu.Unlock() + + return c.err +} + +func (c *netClient) String() string { + return c.client.String() +} + +func (c *netClient) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.active { + return c.err + } + return c.client.Close() +} + +func (c *netClient) Connect() error { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.active { + return errOutputDisabled + } + + c.mu.Unlock() + err := c.Connect() + c.mu.Lock() + + if !c.active { + if err == nil { + // connection has been closed concurrently during Connect + // attempt to close in case of race + c.updErr(c.Close()) } + err = errOutputDisabled } + return err +} + +func (c *netClient) updErr(err error) { + if c.err == nil && err != nil { + c.err = err + } +} + +func (c *netClient) Publish(batch *Batch) error { + if !c.isActive() { + return errOutputDisabled + } + + // assume that concurrent Close or close before Publish call becomes + // effective making Publish fail immediately. + return c.client.Publish(batch) +} + +func (c *netClient) isActive() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.active }