From ec8ce0b09b9ed7ea57e5608c287ab033771c26e6 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Tue, 2 Jul 2024 19:57:54 +0530 Subject: [PATCH 01/18] chore: initial commit, without tests --- filebeat/input/input.go | 31 +++++++++++++++++++++---------- filebeat/input/log/input.go | 10 ++++++++++ filebeat/input/registry.go | 12 ++++++++---- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/filebeat/input/input.go b/filebeat/input/input.go index d52d63e0f85..2d8faa0b08f 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -48,12 +49,13 @@ type Input interface { // Runner encapsulate the lifecycle of the input type Runner struct { - config inputConfig - input Input - done chan struct{} - wg *sync.WaitGroup - Once bool - beatDone chan struct{} + config inputConfig + input Input + done chan struct{} + wg *sync.WaitGroup + Once bool + beatDone chan struct{} + statusReporter status.StatusReporter } // New instantiates a new Runner @@ -83,10 +85,11 @@ func New( } context := Context{ - States: states, - Done: input.done, - BeatDone: input.beatDone, - Meta: nil, + States: states, + Done: input.done, + BeatDone: input.beatDone, + Meta: nil, + GetStatusReporter: input.GetStatusReporter, } var ipt Input ipt, err = f(conf, connector, context) @@ -164,3 +167,11 @@ func (p *Runner) stop() { func (p *Runner) String() string { return fmt.Sprintf("input [type=%s]", p.config.Type) } + +func (p *Runner) SetStatusReporter(statusReporter status.StatusReporter) { + p.statusReporter = statusReporter +} + +func (p *Runner) GetStatusReporter() status.StatusReporter { + return p.statusReporter +} diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 4882735d36b..39fada257d8 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -37,6 +37,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -78,6 +79,7 @@ type Input struct { meta map[string]string stopOnce sync.Once fileStateIdentifier file.StateIdentifier + getStatusReporter input.GetStatusReporter } // NewInput instantiates a new Log @@ -157,6 +159,7 @@ func NewInput( done: context.Done, meta: meta, fileStateIdentifier: identifier, + getStatusReporter: context.GetStatusReporter, } // Create empty harvester to check if configs are fine @@ -558,6 +561,7 @@ func (p *Input) scan() { continue } if err != nil { + p.updateStatus(status.Degraded, fmt.Sprintf(harvesterErrMsg, newState.Source, err)) logger.Errorf(harvesterErrMsg, newState.Source, err) } } else { @@ -833,3 +837,9 @@ func (p *Input) stopWhenDone() { p.Wait() } + +func (p *Input) updateStatus(status status.Status, msg string) { + if reporter := p.getStatusReporter(); reporter != nil { + reporter.UpdateStatus(status, msg) + } +} diff --git a/filebeat/input/registry.go b/filebeat/input/registry.go index 829cf16866c..18e35fa2274 100644 --- a/filebeat/input/registry.go +++ b/filebeat/input/registry.go @@ -22,15 +22,19 @@ import ( "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) +type GetStatusReporter func() status.StatusReporter + type Context struct { - States []file.State - Done chan struct{} - BeatDone chan struct{} - Meta map[string]string + States []file.State + Done chan struct{} + BeatDone chan struct{} + Meta map[string]string + GetStatusReporter GetStatusReporter } // Factory is used to register functions creating new Input instances. From 7286a1d2537c35e66f17c83c3d5515231b73c80d Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Tue, 2 Jul 2024 20:02:29 +0530 Subject: [PATCH 02/18] chore: tests --- .../tests/integration/status_reporter_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 x-pack/filebeat/tests/integration/status_reporter_test.go diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go new file mode 100644 index 00000000000..d7daae21311 --- /dev/null +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -0,0 +1,13 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import "testing" + +func TestLogStatusReporter(t *testing.T) { + +} From cff678519d5de23302135f8100d782be5e015b8e Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 4 Jul 2024 18:59:42 +0530 Subject: [PATCH 03/18] chore: add test cases --- filebeat/input/log/input.go | 9 + .../tests/integration/status_reporter_test.go | 200 +++++++++++++++++- 2 files changed, 208 insertions(+), 1 deletion(-) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 39fada257d8..7e75ccd4444 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -80,6 +80,7 @@ type Input struct { stopOnce sync.Once fileStateIdentifier file.StateIdentifier getStatusReporter input.GetStatusReporter + healthy bool } // NewInput instantiates a new Log @@ -227,6 +228,8 @@ func (p *Input) loadStates(states []file.State) error { // Run runs the input func (p *Input) Run() { + p.healthy = true + p.updateStatus(status.Starting, "starting the scan") logger := p.logger logger.Debug("Start next scan") @@ -287,6 +290,10 @@ func (p *Input) Run() { p.cleanupStates() } } + + if p.healthy { + p.updateStatus(status.Running, "finished the scan") + } } func (p *Input) cleanupStates() { @@ -498,6 +505,7 @@ func (p *Input) scan() { var files []string paths := p.getFiles() + fmt.Println("HELLLLLLO ", paths) var err error @@ -561,6 +569,7 @@ func (p *Input) scan() { continue } if err != nil { + p.healthy = false p.updateStatus(status.Degraded, fmt.Sprintf(harvesterErrMsg, newState.Source, err)) logger.Errorf(harvesterErrMsg, newState.Source, err) } diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index d7daae21311..f90100bf2c9 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -6,8 +6,206 @@ package integration -import "testing" +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/common/reload" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/x-pack/filebeat/cmd" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + conf "github.com/elastic/elastic-agent-libs/config" +) func TestLogStatusReporter(t *testing.T) { + unitOneID := mock.NewID() + unitOutID := mock.NewID() + token := mock.NewID() + + tests.InitBeatsForTest(t, cmd.Filebeat()) + tmpDir := t.TempDir() + filename := fmt.Sprintf("test-%d", time.Now().Unix()) + outPath := filepath.Join(tmpDir, filename) + t.Logf("writing output to file %s", outPath) + err := os.Mkdir(outPath, 0775) + require.NoError(t, err) + defer func() { + err := os.RemoveAll(outPath) + require.NoError(t, err) + }() + + /* + * valid input stream, shouldn't raise any error. + */ + inputStream := getInputStream(unitOneID, filepath.Join(tmpDir, "*"), 2) + + /* + * try to open an irregular file. + * This should throw "Tried to open non regular file:" and status to degraded + */ + inputStreamIrregular := getInputStream(unitOneID, "/dev/null", 1) + + outputExpectedStream := proto.UnitExpected{ + Id: unitOutID, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Type: "file", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "type": "file", + "enabled": true, + "path": outPath, + "filename": "beat-out", + "number_of_files": 7, + }), + }, + } + + observedStates := make(chan *proto.CheckinObserved) + expectedUnits := make(chan []*proto.UnitExpected) + done := make(chan struct{}) + // V2 mock server + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + select { + case observedStates <- observed: + return &proto.CheckinExpected{ + Units: <-expectedUnits, + } + case <-done: + return nil + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + } + server.Start() + defer server.Stop() + + // start the client + client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{ + Name: "program", + Meta: map[string]string{ + "key": "value", + }, + }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { + c := management.DefaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits) + }) + + go func() { + t.Logf("Running beats...") + err := cmd.Filebeat().Execute() + require.NoError(t, err) + }() + + scenarios := []struct { + expectedStatus proto.State + nextInputunit *proto.UnitExpected + }{ + { + proto.State_HEALTHY, + &inputStreamIrregular, + }, + { + proto.State_DEGRADED, + &inputStream, + }, + { + proto.State_HEALTHY, + &inputStream, + }, + // wait for one more checkin, just to be sure it's healthy + { + proto.State_HEALTHY, + &inputStream, + }, + } + + timer := time.NewTimer(2 * time.Minute) + id := 0 + for id < len(scenarios) { + select { + case observed := <-observedStates: + state := extractState(observed.GetUnits(), unitOneID) + expectedUnits <- []*proto.UnitExpected{ + scenarios[id].nextInputunit, + &outputExpectedStream, + } + if state != scenarios[id].expectedStatus { + continue + } + fmt.Println("WHATSPUPP", state, scenarios[id].expectedStatus) + // always ensure that output is healthy + outputState := extractState(observed.GetUnits(), unitOutID) + require.Equal(t, outputState, proto.State_HEALTHY) + + timer.Reset(2 * time.Minute) + id++ + case <-timer.C: + t.Fatal("timeout waiting for checkin") + default: + } + } +} + +func extractState(units []*proto.UnitObserved, idx string) proto.State { + for _, unit := range units { + if unit.Id == idx { + return unit.GetState() + } + } + return -1 +} +func getInputStream(id string, path string, stateIdx int) proto.UnitExpected { + return proto.UnitExpected{ + Id: id, + Type: proto.UnitType_INPUT, + ConfigStateIdx: uint64(stateIdx), + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + Streams: []*proto.Stream{{ + Id: "filebeat/log-default-system", + Source: tests.RequireNewStruct(map[string]interface{}{ + "enabled": true, + "symlinks": true, + "type": "log", + "paths": []interface{}{path}, + }), + }}, + Type: "log", + Id: "log-input-test", + Name: "log-1", + Revision: 1, + }, + } } From bef95bfd390c5bd0e19f6c1eb591da38d8d88378 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 4 Jul 2024 20:01:49 +0530 Subject: [PATCH 04/18] fix: add null check --- filebeat/input/log/input.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 7e75ccd4444..b80736e1531 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -848,6 +848,9 @@ func (p *Input) stopWhenDone() { } func (p *Input) updateStatus(status status.Status, msg string) { + if p.getStatusReporter == nil { + return + } if reporter := p.getStatusReporter(); reporter != nil { reporter.UpdateStatus(status, msg) } From ec66f08a741f534edead63dd9f001211d5cd11c0 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 4 Jul 2024 22:27:40 +0530 Subject: [PATCH 05/18] fix: remove println --- filebeat/input/log/input.go | 1 - 1 file changed, 1 deletion(-) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index b80736e1531..ac35194c0be 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -505,7 +505,6 @@ func (p *Input) scan() { var files []string paths := p.getFiles() - fmt.Println("HELLLLLLO ", paths) var err error From facd3a060a59ae41af4533614539523e70df4be6 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 4 Jul 2024 22:29:31 +0530 Subject: [PATCH 06/18] fix: lint --- .../tests/integration/status_reporter_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index f90100bf2c9..dc6c002f9e2 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -13,11 +13,6 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/libbeat/common/reload" - lbmanagement "github.com/elastic/beats/v7/libbeat/management" - "github.com/elastic/beats/v7/x-pack/filebeat/cmd" - "github.com/elastic/beats/v7/x-pack/libbeat/management" - "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -25,6 +20,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/elastic/beats/v7/libbeat/common/reload" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/x-pack/filebeat/cmd" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + conf "github.com/elastic/elastic-agent-libs/config" ) @@ -102,7 +103,7 @@ func TestLogStatusReporter(t *testing.T) { return nil }, } - server.Start() + require.NoError(t, server.Start()) defer server.Stop() // start the client From 05dfbac3230700920af937a32b5ba6e8499e4c0e Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 4 Jul 2024 22:31:32 +0530 Subject: [PATCH 07/18] goimports --- x-pack/filebeat/tests/integration/status_reporter_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index dc6c002f9e2..604edb37bff 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -13,13 +13,14 @@ import ( "testing" "time" - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/beats/v7/libbeat/common/reload" lbmanagement "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/x-pack/filebeat/cmd" From 0f00f0499e1a23026c376b0c8c0ea9c06208b3bb Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 4 Jul 2024 22:50:17 +0530 Subject: [PATCH 08/18] remove println --- x-pack/filebeat/tests/integration/status_reporter_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index 604edb37bff..194969e2a29 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -165,7 +165,6 @@ func TestLogStatusReporter(t *testing.T) { if state != scenarios[id].expectedStatus { continue } - fmt.Println("WHATSPUPP", state, scenarios[id].expectedStatus) // always ensure that output is healthy outputState := extractState(observed.GetUnits(), unitOutID) require.Equal(t, outputState, proto.State_HEALTHY) From 035619ffa4bee65dd4121c8c39aff2c22ad1104c Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 5 Jul 2024 14:57:27 +0530 Subject: [PATCH 09/18] fix: changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 02da7b9da1a..006f15cb1ea 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update Salesforce module to use new Salesforce input. {pull}37509[37509] - Tag events that come from a filestream in "take over" mode. {pull}39828[39828] - Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] +- Enable StatusReporter for log input. {pull}40075[40075] *Heartbeat* From 99b5625e149797574611d5181f0985f410319192 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 5 Jul 2024 16:38:39 +0530 Subject: [PATCH 10/18] update test for windows --- x-pack/filebeat/tests/integration/status_reporter_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index 194969e2a29..de553641366 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -10,6 +10,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "testing" "time" @@ -56,7 +57,11 @@ func TestLogStatusReporter(t *testing.T) { * try to open an irregular file. * This should throw "Tried to open non regular file:" and status to degraded */ - inputStreamIrregular := getInputStream(unitOneID, "/dev/null", 1) + nullDeviceFile := "/dev/null" + if runtime.GOOS == "windows" { + nullDeviceFile = "NUL" + } + inputStreamIrregular := getInputStream(unitOneID, nullDeviceFile, 1) outputExpectedStream := proto.UnitExpected{ Id: unitOutID, From bf26bf35ed56799d4b6716fe3d979fcea0c6205c Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 11 Jul 2024 19:38:03 +0530 Subject: [PATCH 11/18] fix: fix some comments --- .../tests/integration/status_reporter_test.go | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index de553641366..702dadc879d 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -43,10 +43,6 @@ func TestLogStatusReporter(t *testing.T) { t.Logf("writing output to file %s", outPath) err := os.Mkdir(outPath, 0775) require.NoError(t, err) - defer func() { - err := os.RemoveAll(outPath) - require.NoError(t, err) - }() /* * valid input stream, shouldn't raise any error. @@ -69,17 +65,7 @@ func TestLogStatusReporter(t *testing.T) { ConfigStateIdx: 1, State: proto.State_HEALTHY, Config: &proto.UnitExpectedConfig{ - DataStream: &proto.DataStream{ - Namespace: "default", - }, - Type: "file", - Revision: 1, - Meta: &proto.Meta{ - Package: &proto.Package{ - Name: "system", - Version: "1.17.0", - }, - }, + Type: "file", Source: tests.RequireNewStruct(map[string]interface{}{ "type": "file", "enabled": true, @@ -115,9 +101,6 @@ func TestLogStatusReporter(t *testing.T) { // start the client client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{ Name: "program", - Meta: map[string]string{ - "key": "value", - }, }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { From 2bf5257c4afc4e631e5cbfa00e8dd795dde185b6 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 11 Jul 2024 19:41:36 +0530 Subject: [PATCH 12/18] chore: add starting state in NewInput --- filebeat/input/log/input.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index ac35194c0be..f123bbbe10c 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -163,6 +163,8 @@ func NewInput( getStatusReporter: context.GetStatusReporter, } + p.updateStatus(status.Starting, "starting the log input") + // Create empty harvester to check if configs are fine // TODO: Do config validation instead _, err = p.createHarvester(logger, file.State{}, nil) @@ -229,7 +231,9 @@ func (p *Input) loadStates(states []file.State) error { // Run runs the input func (p *Input) Run() { p.healthy = true - p.updateStatus(status.Starting, "starting the scan") + // Mark it Running for now. + // Any errors encountered in this loop will change state to degraded + p.updateStatus(status.Running, "") logger := p.logger logger.Debug("Start next scan") From 965c285af6ec5faeffd14c902925bc4a93410fd6 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 11 Jul 2024 22:20:42 +0530 Subject: [PATCH 13/18] fix: add sample output to verify the status --- .../tests/integration/status_reporter_test.go | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index 702dadc879d..80390c0021f 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -2,8 +2,6 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build integration - package integration import ( @@ -47,8 +45,8 @@ func TestLogStatusReporter(t *testing.T) { /* * valid input stream, shouldn't raise any error. */ - inputStream := getInputStream(unitOneID, filepath.Join(tmpDir, "*"), 2) - + inputStream := getInputStream(unitOneID, filepath.Join(tmpDir, "*.log"), 2) + require.NoError(t, os.WriteFile(filepath.Join(tmpDir, "test.log"), []byte("Line1\nLine2\nLine3\n"), 0777)) /* * try to open an irregular file. * This should throw "Tried to open non regular file:" and status to degraded @@ -164,6 +162,11 @@ func TestLogStatusReporter(t *testing.T) { default: } } + require.Eventually(t, func() bool { + events := tests.ReadLogLines(t, outPath) + fmt.Println(events, outPath) + return events > 0 // wait until we see one output event + }, 15*time.Second, 1*time.Second) } func extractState(units []*proto.UnitObserved, idx string) proto.State { @@ -185,10 +188,11 @@ func getInputStream(id string, path string, stateIdx int) proto.UnitExpected { Streams: []*proto.Stream{{ Id: "filebeat/log-default-system", Source: tests.RequireNewStruct(map[string]interface{}{ - "enabled": true, - "symlinks": true, - "type": "log", - "paths": []interface{}{path}, + "enabled": true, + "symlinks": true, + "type": "log", + "paths": []interface{}{path}, + "scan_frequency": "500ms", }), }}, Type: "log", From 616f40e5dd24f4aa4d20c31ef2ac24d15640b07f Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 11 Jul 2024 22:30:13 +0530 Subject: [PATCH 14/18] fix: remove println --- x-pack/filebeat/tests/integration/status_reporter_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index 80390c0021f..dcc31fd4480 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -164,7 +164,6 @@ func TestLogStatusReporter(t *testing.T) { } require.Eventually(t, func() bool { events := tests.ReadLogLines(t, outPath) - fmt.Println(events, outPath) return events > 0 // wait until we see one output event }, 15*time.Second, 1*time.Second) } From 9453b09226bff9d87f57a301055d76df2dc768a6 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Fri, 12 Jul 2024 20:43:05 +0530 Subject: [PATCH 15/18] fix: add integration tag --- x-pack/filebeat/tests/integration/status_reporter_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/filebeat/tests/integration/status_reporter_test.go b/x-pack/filebeat/tests/integration/status_reporter_test.go index dcc31fd4480..f9e31703038 100644 --- a/x-pack/filebeat/tests/integration/status_reporter_test.go +++ b/x-pack/filebeat/tests/integration/status_reporter_test.go @@ -2,6 +2,8 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. +//go:build integration + package integration import ( From 142ff3ec31e85cb3b91f3faaabad525383df1fd1 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Tue, 16 Jul 2024 13:24:51 +0530 Subject: [PATCH 16/18] Update CHANGELOG.next.asciidoc Co-authored-by: Denis --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 13ed04454e8..05682c7a93d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -44,7 +44,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update Salesforce module to use new Salesforce input. {pull}37509[37509] - Tag events that come from a filestream in "take over" mode. {pull}39828[39828] - Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] -- Enable StatusReporter for log input. {pull}40075[40075] +- Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075] - Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121] - Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163] From e02783194f73fdc823da9dd8e58611d705c2c31a Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Tue, 16 Jul 2024 15:06:23 +0530 Subject: [PATCH 17/18] fix: remove redundant bool --- filebeat/input/log/input.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index f123bbbe10c..16b5a9db2f4 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -80,7 +80,6 @@ type Input struct { stopOnce sync.Once fileStateIdentifier file.StateIdentifier getStatusReporter input.GetStatusReporter - healthy bool } // NewInput instantiates a new Log @@ -230,7 +229,6 @@ func (p *Input) loadStates(states []file.State) error { // Run runs the input func (p *Input) Run() { - p.healthy = true // Mark it Running for now. // Any errors encountered in this loop will change state to degraded p.updateStatus(status.Running, "") @@ -294,10 +292,6 @@ func (p *Input) Run() { p.cleanupStates() } } - - if p.healthy { - p.updateStatus(status.Running, "finished the scan") - } } func (p *Input) cleanupStates() { @@ -572,7 +566,6 @@ func (p *Input) scan() { continue } if err != nil { - p.healthy = false p.updateStatus(status.Degraded, fmt.Sprintf(harvesterErrMsg, newState.Source, err)) logger.Errorf(harvesterErrMsg, newState.Source, err) } From 25e6488a9ec55de50989a04ff853692e92f23c03 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Thu, 18 Jul 2024 13:55:57 +0530 Subject: [PATCH 18/18] fix: add degraded --- filebeat/input/log/input.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 16b5a9db2f4..40c42ddeebd 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -592,6 +592,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol logger.Debugf("Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size()) err := p.startHarvester(logger, newState, oldState.Offset) if err != nil { + p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)) logger.Errorf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err) } return @@ -602,6 +603,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol logger.Debugf("Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Offset, newState.Fileinfo.Size()) err := p.startHarvester(logger, newState, 0) if err != nil { + p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)) logger.Errorf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err) }