From 6b97469490f60d415eb9453b4fb9a3bf5596734a Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 21 Aug 2024 00:34:38 -0700 Subject: [PATCH 01/12] Stop publisher properly --- CHANGELOG.next.asciidoc | 2 ++ libbeat/cmd/instance/beat.go | 13 ++++++------- libbeat/esleg/eslegclient/connection.go | 11 +++++++++++ libbeat/publisher/pipeline/controller.go | 11 +---------- 4 files changed, 20 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9b73c52df06..947330fc67d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -105,6 +105,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Allow port number 0 in the community ID flowhash processor {pull}40259[40259] - Fix handling of escaped brackets in syslog structured data. {issue}40445[40445] {pull}40446[40446] - Update Go version to 1.22.6. {pull}40528[40528] +- Aborts all active connections for Elasticsearch output. {pull}40572[40572] +- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572] *Auditbeat* diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index fc5e2ae9fec..436048cc2d1 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -402,10 +402,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) - // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, - // but refine publisher to disconnect clients on stop automatically - // defer pipeline.Close() - b.Publisher = publisher beater, err := bt(&b.Beat, sub) if err != nil { @@ -520,9 +516,15 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { ctx, cancel := context.WithCancel(context.Background()) stopBeat := func() { b.Instrumentation.Tracer().Close() + // If the publisher has a Close() method, call it before stopping the beater. + if c, ok := b.Publisher.(io.Closer); ok { + c.Close() + } beater.Stop() } svc.HandleSignals(stopBeat, cancel) + // Allow the manager to stop a currently running beats out of bound. + b.Manager.SetStopCallback(stopBeat) err = b.loadDashboards(ctx, false) if err != nil { @@ -531,9 +533,6 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { logp.Info("%s start running.", b.Info.Beat) - // Allow the manager to stop a currently running beats out of bound. - b.Manager.SetStopCallback(beater.Stop) - err = beater.Run(&b.Beat) if b.shouldReexec { if err := b.reexec(); err != nil { diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 122a47b8081..0c83851b4f5 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -19,6 +19,7 @@ package eslegclient import ( "bytes" + "context" "encoding/base64" "encoding/json" "fmt" @@ -62,6 +63,11 @@ type Connection struct { responseBuffer *bytes.Buffer isServerless bool + + // requests will share the same cancellable context + // so they can be aborted on Close() + reqsContext context.Context + cancelReqs func() } // ConnectionSettings are the settings needed for a Connection @@ -178,12 +184,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { logger.Info("kerberos client created") } + ctx, cancelFunc := context.WithCancel(context.Background()) conn := Connection{ ConnectionSettings: s, HTTP: esClient, Encoder: encoder, log: logger, responseBuffer: bytes.NewBuffer(nil), + reqsContext: ctx, + cancelReqs: cancelFunc, } if s.APIKey != "" { @@ -317,6 +326,7 @@ func (conn *Connection) Ping() (ESPingData, error) { // Close closes a connection. func (conn *Connection) Close() error { conn.HTTP.CloseIdleConnections() + conn.cancelReqs() return nil } @@ -486,6 +496,7 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) req.Host = host } + req = req.WithContext(conn.reqsContext) resp, err := conn.HTTP.Do(req) if err != nil { return 0, nil, err diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 4c27494fa68..4ac2373bcea 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -113,11 +113,7 @@ func (c *outputController) WaitClose(timeout time.Duration) error { c.consumer.close() close(c.workerChan) - // Signal the output workers to close. This step is a hint, and carries - // no guarantees. For example, on close the Elasticsearch output workers - // will close idle connections, but will not change any behavior for - // active connections, giving any remaining events a chance to ingest - // before we terminate. + // Signal the output workers to close. for _, out := range c.workers { out.Close() } @@ -209,11 +205,6 @@ func (c *outputController) closeQueue(timeout time.Duration) { // pipeline but it was shut down before any output was set. // In this case, return nil and Pipeline.ConnectWith will pass on a // real error to the caller. - // NOTE: under the current shutdown process, Pipeline.Close (and hence - // outputController.Close) is ~never called. So even if we did have - // blocked callers here, in a real shutdown they will never be woken - // up. But in hopes of a day when the shutdown process is more robust, - // I've decided to do the right thing here anyway. req.responseChan <- nil } } From e43c52ef83c1b4408464f794f181712a96187231 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 21 Aug 2024 11:27:33 +0200 Subject: [PATCH 02/12] Just call beater.Stop from manager --- libbeat/cmd/instance/beat.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 436048cc2d1..0ea4d495ac6 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -524,7 +524,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { } svc.HandleSignals(stopBeat, cancel) // Allow the manager to stop a currently running beats out of bound. - b.Manager.SetStopCallback(stopBeat) + b.Manager.SetStopCallback(beater.Stop) err = b.loadDashboards(ctx, false) if err != nil { @@ -533,6 +533,9 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { logp.Info("%s start running.", b.Info.Beat) + // Allow the manager to stop a currently running beats out of bound. + b.Manager.SetStopCallback(beater.Stop) + err = beater.Run(&b.Beat) if b.shouldReexec { if err := b.reexec(); err != nil { From 8be1a177d8f0fded435765eb7e5758d56f872d29 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Wed, 21 Aug 2024 13:25:45 +0200 Subject: [PATCH 03/12] Delete duplicated lines --- libbeat/cmd/instance/beat.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 0ea4d495ac6..9bffac2e56c 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -523,8 +523,6 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { beater.Stop() } svc.HandleSignals(stopBeat, cancel) - // Allow the manager to stop a currently running beats out of bound. - b.Manager.SetStopCallback(beater.Stop) err = b.loadDashboards(ctx, false) if err != nil { From 0870605a7108c298361bec03477be9bcdade2433 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 22 Aug 2024 10:40:44 +0200 Subject: [PATCH 04/12] Make call to stopBeat idempotent --- libbeat/cmd/instance/beat.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 9bffac2e56c..f57c0148b2e 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -35,6 +35,7 @@ import ( "runtime/debug" "strconv" "strings" + "sync" "time" "github.com/gofrs/uuid" @@ -514,16 +515,25 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { } ctx, cancel := context.WithCancel(context.Background()) + + // stopBeat must be idempotent since it will be called both from a signal and by the manager. + // Since beater.Stop is not safe to be called more than once this is necessary. + var once sync.Once stopBeat := func() { - b.Instrumentation.Tracer().Close() - // If the publisher has a Close() method, call it before stopping the beater. - if c, ok := b.Publisher.(io.Closer); ok { - c.Close() - } - beater.Stop() + once.Do(func() { + b.Instrumentation.Tracer().Close() + // If the publisher has a Close() method, call it before stopping the beater. + if c, ok := b.Publisher.(io.Closer); ok { + c.Close() + } + beater.Stop() + }) } svc.HandleSignals(stopBeat, cancel) + // Allow the manager to stop a currently running beats out of bound. + b.Manager.SetStopCallback(stopBeat) + err = b.loadDashboards(ctx, false) if err != nil { return err @@ -531,9 +541,6 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { logp.Info("%s start running.", b.Info.Beat) - // Allow the manager to stop a currently running beats out of bound. - b.Manager.SetStopCallback(beater.Stop) - err = beater.Run(&b.Beat) if b.shouldReexec { if err := b.reexec(); err != nil { From 464055b91738ef37c70ac527c32a52e0f3b73441 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 22 Aug 2024 13:40:34 +0200 Subject: [PATCH 05/12] Add context at request creation to not break tracing --- libbeat/esleg/eslegclient/connection.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 0c83851b4f5..a09973c6ab9 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -401,7 +401,7 @@ func (conn *Connection) execRequest( method, url string, body io.Reader, ) (int, []byte, error) { - req, err := http.NewRequest(method, url, body) //nolint:noctx // keep legacy behaviour + req, err := http.NewRequestWithContext(conn.reqsContext, method, url, body) //nolint:noctx // keep legacy behaviour if err != nil { conn.log.Warnf("Failed to create request %+v", err) return 0, nil, err @@ -496,7 +496,6 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) req.Host = host } - req = req.WithContext(conn.reqsContext) resp, err := conn.HTTP.Do(req) if err != nil { return 0, nil, err From 6cb525f31effe208ecdb5d6b0c917a82a8842eb9 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 22 Aug 2024 13:54:35 +0200 Subject: [PATCH 06/12] Remove unused lint --- libbeat/esleg/eslegclient/connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index a09973c6ab9..6f98935fab7 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -401,7 +401,7 @@ func (conn *Connection) execRequest( method, url string, body io.Reader, ) (int, []byte, error) { - req, err := http.NewRequestWithContext(conn.reqsContext, method, url, body) //nolint:noctx // keep legacy behaviour + req, err := http.NewRequestWithContext(conn.reqsContext, method, url, body) if err != nil { conn.log.Warnf("Failed to create request %+v", err) return 0, nil, err From 9be31459119f81d18989a7c902e634531353e2fb Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Fri, 23 Aug 2024 10:19:38 +0200 Subject: [PATCH 07/12] Add default WaitClose timeout --- libbeat/cmd/instance/beat.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index f57c0148b2e..fb497bbe7c8 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -393,6 +393,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { } outputFactory := b.makeOutputFactory(b.Config.Output) settings := pipeline.Settings{ + // Since now publisher is closed on Stop, we want to give some + // time to ack any pending events by default to avoid + // changing on stop behavior too much. + WaitClose: 5 * time.Second, Processors: b.processors, InputQueueSize: b.InputQueueSize, } @@ -517,7 +521,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { ctx, cancel := context.WithCancel(context.Background()) // stopBeat must be idempotent since it will be called both from a signal and by the manager. - // Since beater.Stop is not safe to be called more than once this is necessary. + // Since publisher.Close is not safe to be called more than once this is necessary. var once sync.Once stopBeat := func() { once.Do(func() { From 97908ca2824600843f0524b52859cf8a3f50c7a1 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Fri, 23 Aug 2024 11:30:32 +0200 Subject: [PATCH 08/12] Adjust wait on close time --- libbeat/cmd/instance/beat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index fb497bbe7c8..936ecf0caaa 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -396,7 +396,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { // Since now publisher is closed on Stop, we want to give some // time to ack any pending events by default to avoid // changing on stop behavior too much. - WaitClose: 5 * time.Second, + WaitClose: time.Second, Processors: b.processors, InputQueueSize: b.InputQueueSize, } From 97af6fb631fb7bd5d04fb8e00f967009c42b7bb3 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 26 Aug 2024 10:12:16 +0200 Subject: [PATCH 09/12] Add delay to account for the stop of the publisher --- libbeat/tests/integration/http_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libbeat/tests/integration/http_test.go b/libbeat/tests/integration/http_test.go index bb2f7bde924..f8bdc0e604f 100644 --- a/libbeat/tests/integration/http_test.go +++ b/libbeat/tests/integration/http_test.go @@ -57,6 +57,7 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) r, err := http.Get("http://localhost:5066") require.NoError(t, err) @@ -88,6 +89,7 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) r, err := http.Get("http://localhost:5066/stats") require.NoError(t, err) @@ -121,6 +123,7 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) r, err := http.Get("http://localhost:5066/not-exist") require.NoError(t, err) @@ -143,6 +146,7 @@ output.console: mockbeat.WriteConfigFile(cfg) mockbeat.Start() mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) + time.Sleep(time.Second) r, err := http.Get("http://localhost:5066/debug/pprof/") require.NoError(t, err) From d1565e3da23b81ab5f0eb9a1b3a88308b7605e2c Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 26 Aug 2024 10:25:11 +0200 Subject: [PATCH 10/12] Fix lint issues --- libbeat/tests/integration/http_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/libbeat/tests/integration/http_test.go b/libbeat/tests/integration/http_test.go index f8bdc0e604f..fc4c8ed0521 100644 --- a/libbeat/tests/integration/http_test.go +++ b/libbeat/tests/integration/http_test.go @@ -21,7 +21,6 @@ package integration import ( "encoding/json" - "io/ioutil" "net/http" "testing" "time" @@ -63,7 +62,8 @@ output.console: require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - body, err := ioutil.ReadAll(r.Body) + body, err := os.ReadAll(r.Body) + r.Body.Close() require.NoError(t, err) var m map[string]interface{} err = json.Unmarshal(body, &m) @@ -95,7 +95,8 @@ output.console: require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - body, err := ioutil.ReadAll(r.Body) + body, err := os.ReadAll(r.Body) + r.Body.Close() require.NoError(t, err) var m Stats @@ -126,6 +127,7 @@ output.console: time.Sleep(time.Second) r, err := http.Get("http://localhost:5066/not-exist") + r.Body.Close() require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } @@ -149,6 +151,7 @@ output.console: time.Sleep(time.Second) r, err := http.Get("http://localhost:5066/debug/pprof/") + r.Body.Close() require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } From 46dee35789a5bd2ef54971f70fc6a3e6d227853d Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 26 Aug 2024 11:14:20 +0200 Subject: [PATCH 11/12] Fix lint issues --- libbeat/tests/integration/http_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libbeat/tests/integration/http_test.go b/libbeat/tests/integration/http_test.go index fc4c8ed0521..1b68dfc8723 100644 --- a/libbeat/tests/integration/http_test.go +++ b/libbeat/tests/integration/http_test.go @@ -21,6 +21,7 @@ package integration import ( "encoding/json" + "io" "net/http" "testing" "time" @@ -62,7 +63,7 @@ output.console: require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - body, err := os.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) r.Body.Close() require.NoError(t, err) var m map[string]interface{} @@ -95,7 +96,7 @@ output.console: require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") - body, err := os.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) r.Body.Close() require.NoError(t, err) var m Stats From 747b36a117ea6a6b66124a7dae0971fc91faed70 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 26 Aug 2024 11:33:34 +0200 Subject: [PATCH 12/12] Fix lint --- libbeat/tests/integration/http_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/tests/integration/http_test.go b/libbeat/tests/integration/http_test.go index 1b68dfc8723..41382ab9e09 100644 --- a/libbeat/tests/integration/http_test.go +++ b/libbeat/tests/integration/http_test.go @@ -59,7 +59,7 @@ output.console: mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066") + r, err := http.Get("http://localhost:5066") //nolint:noctx // fine for tests require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") @@ -92,7 +92,7 @@ output.console: mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066/stats") + r, err := http.Get("http://localhost:5066/stats") //nolint:noctx // fine for tests require.NoError(t, err) require.Equal(t, http.StatusOK, r.StatusCode, "incorrect status code") @@ -127,7 +127,7 @@ output.console: mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066/not-exist") + r, err := http.Get("http://localhost:5066/not-exist") //nolint:noctx // fine for tests r.Body.Close() require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") @@ -151,7 +151,7 @@ output.console: mockbeat.WaitForLogs("Starting stats endpoint", 60*time.Second) time.Sleep(time.Second) - r, err := http.Get("http://localhost:5066/debug/pprof/") + r, err := http.Get("http://localhost:5066/debug/pprof/") //nolint:noctx // fine for tests r.Body.Close() require.NoError(t, err) require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")