diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6e585dcceeca..7479100c859d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -107,6 +107,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Allow port number 0 in the community ID flowhash processor {pull}40259[40259] - Fix handling of escaped brackets in syslog structured data. {issue}40445[40445] {pull}40446[40446] - Update Go version to 1.22.6. {pull}40528[40528] +- Aborts all active connections for Elasticsearch output. {pull}40572[40572] +- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572] *Auditbeat* diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index fc5e2ae9fec4..936ecf0caaa0 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -35,6 +35,7 @@ import ( "runtime/debug" "strconv" "strings" + "sync" "time" "github.com/gofrs/uuid" @@ -392,6 +393,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { } outputFactory := b.makeOutputFactory(b.Config.Output) settings := pipeline.Settings{ + // Since now publisher is closed on Stop, we want to give some + // time to ack any pending events by default to avoid + // changing on stop behavior too much. + WaitClose: time.Second, Processors: b.processors, InputQueueSize: b.InputQueueSize, } @@ -402,10 +407,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) - // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, - // but refine publisher to disconnect clients on stop automatically - // defer pipeline.Close() - b.Publisher = publisher beater, err := bt(&b.Beat, sub) if err != nil { @@ -518,12 +519,25 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { } ctx, cancel := context.WithCancel(context.Background()) + + // stopBeat must be idempotent since it will be called both from a signal and by the manager. + // Since publisher.Close is not safe to be called more than once this is necessary. + var once sync.Once stopBeat := func() { - b.Instrumentation.Tracer().Close() - beater.Stop() + once.Do(func() { + b.Instrumentation.Tracer().Close() + // If the publisher has a Close() method, call it before stopping the beater. + if c, ok := b.Publisher.(io.Closer); ok { + c.Close() + } + beater.Stop() + }) } svc.HandleSignals(stopBeat, cancel) + // Allow the manager to stop a currently running beats out of bound. + b.Manager.SetStopCallback(stopBeat) + err = b.loadDashboards(ctx, false) if err != nil { return err @@ -531,9 +545,6 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { logp.Info("%s start running.", b.Info.Beat) - // Allow the manager to stop a currently running beats out of bound. - b.Manager.SetStopCallback(beater.Stop) - err = beater.Run(&b.Beat) if b.shouldReexec { if err := b.reexec(); err != nil { diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 122a47b80815..6f98935fab7f 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -19,6 +19,7 @@ package eslegclient import ( "bytes" + "context" "encoding/base64" "encoding/json" "fmt" @@ -62,6 +63,11 @@ type Connection struct { responseBuffer *bytes.Buffer isServerless bool + + // requests will share the same cancellable context + // so they can be aborted on Close() + reqsContext context.Context + cancelReqs func() } // ConnectionSettings are the settings needed for a Connection @@ -178,12 +184,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { logger.Info("kerberos client created") } + ctx, cancelFunc := context.WithCancel(context.Background()) conn := Connection{ ConnectionSettings: s, HTTP: esClient, Encoder: encoder, log: logger, responseBuffer: bytes.NewBuffer(nil), + reqsContext: ctx, + cancelReqs: cancelFunc, } if s.APIKey != "" { @@ -317,6 +326,7 @@ func (conn *Connection) Ping() (ESPingData, error) { // Close closes a connection. func (conn *Connection) Close() error { conn.HTTP.CloseIdleConnections() + conn.cancelReqs() return nil } @@ -391,7 +401,7 @@ func (conn *Connection) execRequest( method, url string, body io.Reader, ) (int, []byte, error) { - req, err := http.NewRequest(method, url, body) //nolint:noctx // keep legacy behaviour + req, err := http.NewRequestWithContext(conn.reqsContext, method, url, body) if err != nil { conn.log.Warnf("Failed to create request %+v", err) return 0, nil, err diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 4c27494fa68b..4ac2373bcea6 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -113,11 +113,7 @@ func (c *outputController) WaitClose(timeout time.Duration) error { c.consumer.close() close(c.workerChan) - // Signal the output workers to close. This step is a hint, and carries - // no guarantees. For example, on close the Elasticsearch output workers - // will close idle connections, but will not change any behavior for - // active connections, giving any remaining events a chance to ingest - // before we terminate. + // Signal the output workers to close. for _, out := range c.workers { out.Close() } @@ -209,11 +205,6 @@ func (c *outputController) closeQueue(timeout time.Duration) { // pipeline but it was shut down before any output was set. // In this case, return nil and Pipeline.ConnectWith will pass on a // real error to the caller. - // NOTE: under the current shutdown process, Pipeline.Close (and hence - // outputController.Close) is ~never called. So even if we did have - // blocked callers here, in a real shutdown they will never be woken - // up. But in hopes of a day when the shutdown process is more robust, - // I've decided to do the right thing here anyway. req.responseChan <- nil } } diff --git a/libbeat/tests/integration/http_test.go b/libbeat/tests/integration/http_test.go index bb2f7bde924a..41382ab9e090 100644 --- a/libbeat/tests/integration/http_test.go +++ b/libbeat/tests/integration/http_test.go @@ -21,7 +21,7 @@ package integration import ( "encoding/json" - "io/ioutil" + "io" "net/http" "testing" "time" @@ -57,12 +57,14 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066") + r, err := http.Get("http://localhost:5066") //nolint:noctx // fine for tests require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) + r.Body.Close() require.NoError(t, err) var m map[string]interface{} err = json.Unmarshal(body, &m) @@ -88,12 +90,14 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066/stats") + r, err := http.Get("http://localhost:5066/stats") //nolint:noctx // fine for tests require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) + r.Body.Close() require.NoError(t, err) var m Stats @@ -121,8 +125,10 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066/not-exist") + r, err := http.Get("http://localhost:5066/not-exist") //nolint:noctx // fine for tests + r.Body.Close() require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } @@ -143,8 +149,10 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066/debug/pprof/") + r, err := http.Get("http://localhost:5066/debug/pprof/") //nolint:noctx // fine for tests + r.Body.Close() require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") }