Skip to content

Commit

Permalink
[filebeat][log] Enable status reporter for log input (elastic#40075)
Browse files Browse the repository at this point in the history
* chore: initial commit, without tests

* chore: tests

* chore: add test cases

* fix: add null check

* fix: remove println

* fix: lint

* goimports

* remove println

* fix: changelog

* update test for windows

* fix: fix some comments

* chore: add starting state in NewInput

* fix: add sample output to verify the status

* fix: remove println

* fix: add integration tag

* Update CHANGELOG.next.asciidoc

Co-authored-by: Denis <[email protected]>

* fix: remove redundant bool

* fix: add degraded

---------

Co-authored-by: Pierre HILBERT <[email protected]>
Co-authored-by: Denis <[email protected]>
  • Loading branch information
3 people authored Jul 19, 2024
1 parent 463bbb4 commit f3f772f
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]
Expand Down
31 changes: 21 additions & 10 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand All @@ -48,12 +49,13 @@ type Input interface {

// Runner encapsulate the lifecycle of the input
type Runner struct {
config inputConfig
input Input
done chan struct{}
wg *sync.WaitGroup
Once bool
beatDone chan struct{}
config inputConfig
input Input
done chan struct{}
wg *sync.WaitGroup
Once bool
beatDone chan struct{}
statusReporter status.StatusReporter
}

// New instantiates a new Runner
Expand Down Expand Up @@ -83,10 +85,11 @@ func New(
}

context := Context{
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
States: states,
Done: input.done,
BeatDone: input.beatDone,
Meta: nil,
GetStatusReporter: input.GetStatusReporter,
}
var ipt Input
ipt, err = f(conf, connector, context)
Expand Down Expand Up @@ -164,3 +167,11 @@ func (p *Runner) stop() {
func (p *Runner) String() string {
return fmt.Sprintf("input [type=%s]", p.config.Type)
}

func (p *Runner) SetStatusReporter(statusReporter status.StatusReporter) {
p.statusReporter = statusReporter
}

func (p *Runner) GetStatusReporter() status.StatusReporter {
return p.statusReporter
}
20 changes: 20 additions & 0 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -78,6 +79,7 @@ type Input struct {
meta map[string]string
stopOnce sync.Once
fileStateIdentifier file.StateIdentifier
getStatusReporter input.GetStatusReporter
}

// NewInput instantiates a new Log
Expand Down Expand Up @@ -157,8 +159,11 @@ func NewInput(
done: context.Done,
meta: meta,
fileStateIdentifier: identifier,
getStatusReporter: context.GetStatusReporter,
}

p.updateStatus(status.Starting, "starting the log input")

// Create empty harvester to check if configs are fine
// TODO: Do config validation instead
_, err = p.createHarvester(logger, file.State{}, nil)
Expand Down Expand Up @@ -224,6 +229,9 @@ func (p *Input) loadStates(states []file.State) error {

// Run runs the input
func (p *Input) Run() {
// Mark it Running for now.
// Any errors encountered in this loop will change state to degraded
p.updateStatus(status.Running, "")
logger := p.logger
logger.Debug("Start next scan")

Expand Down Expand Up @@ -558,6 +566,7 @@ func (p *Input) scan() {
continue
}
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf(harvesterErrMsg, newState.Source, err))
logger.Errorf(harvesterErrMsg, newState.Source, err)
}
} else {
Expand All @@ -583,6 +592,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol
logger.Debugf("Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
err := p.startHarvester(logger, newState, oldState.Offset)
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err))
logger.Errorf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
}
return
Expand All @@ -593,6 +603,7 @@ func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, ol
logger.Debugf("Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Offset, newState.Fileinfo.Size())
err := p.startHarvester(logger, newState, 0)
if err != nil {
p.updateStatus(status.Degraded, fmt.Sprintf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err))
logger.Errorf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
}

Expand Down Expand Up @@ -833,3 +844,12 @@ func (p *Input) stopWhenDone() {

p.Wait()
}

func (p *Input) updateStatus(status status.Status, msg string) {
if p.getStatusReporter == nil {
return
}
if reporter := p.getStatusReporter(); reporter != nil {
reporter.UpdateStatus(status, msg)
}
}
12 changes: 8 additions & 4 deletions filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import (

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

type GetStatusReporter func() status.StatusReporter

type Context struct {
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
States []file.State
Done chan struct{}
BeatDone chan struct{}
Meta map[string]string
GetStatusReporter GetStatusReporter
}

// Factory is used to register functions creating new Input instances.
Expand Down
205 changes: 205 additions & 0 deletions x-pack/filebeat/tests/integration/status_reporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration

package integration

import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/libbeat/common/reload"
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/x-pack/filebeat/cmd"
"github.com/elastic/beats/v7/x-pack/libbeat/management"
"github.com/elastic/beats/v7/x-pack/libbeat/management/tests"

conf "github.com/elastic/elastic-agent-libs/config"
)

func TestLogStatusReporter(t *testing.T) {
unitOneID := mock.NewID()
unitOutID := mock.NewID()
token := mock.NewID()

tests.InitBeatsForTest(t, cmd.Filebeat())
tmpDir := t.TempDir()
filename := fmt.Sprintf("test-%d", time.Now().Unix())
outPath := filepath.Join(tmpDir, filename)
t.Logf("writing output to file %s", outPath)
err := os.Mkdir(outPath, 0775)
require.NoError(t, err)

/*
* valid input stream, shouldn't raise any error.
*/
inputStream := getInputStream(unitOneID, filepath.Join(tmpDir, "*.log"), 2)
require.NoError(t, os.WriteFile(filepath.Join(tmpDir, "test.log"), []byte("Line1\nLine2\nLine3\n"), 0777))
/*
* try to open an irregular file.
* This should throw "Tried to open non regular file:" and status to degraded
*/
nullDeviceFile := "/dev/null"
if runtime.GOOS == "windows" {
nullDeviceFile = "NUL"
}
inputStreamIrregular := getInputStream(unitOneID, nullDeviceFile, 1)

outputExpectedStream := proto.UnitExpected{
Id: unitOutID,
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
Config: &proto.UnitExpectedConfig{
Type: "file",
Source: tests.RequireNewStruct(map[string]interface{}{
"type": "file",
"enabled": true,
"path": outPath,
"filename": "beat-out",
"number_of_files": 7,
}),
},
}

observedStates := make(chan *proto.CheckinObserved)
expectedUnits := make(chan []*proto.UnitExpected)
done := make(chan struct{})
// V2 mock server
server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
select {
case observedStates <- observed:
return &proto.CheckinExpected{
Units: <-expectedUnits,
}
case <-done:
return nil
}
},
ActionImpl: func(response *proto.ActionResponse) error {
return nil
},
}
require.NoError(t, server.Start())
defer server.Stop()

// start the client
client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{
Name: "program",
}, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))

lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) {
c := management.DefaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, err
}
return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits)
})

go func() {
t.Logf("Running beats...")
err := cmd.Filebeat().Execute()
require.NoError(t, err)
}()

scenarios := []struct {
expectedStatus proto.State
nextInputunit *proto.UnitExpected
}{
{
proto.State_HEALTHY,
&inputStreamIrregular,
},
{
proto.State_DEGRADED,
&inputStream,
},
{
proto.State_HEALTHY,
&inputStream,
},
// wait for one more checkin, just to be sure it's healthy
{
proto.State_HEALTHY,
&inputStream,
},
}

timer := time.NewTimer(2 * time.Minute)
id := 0
for id < len(scenarios) {
select {
case observed := <-observedStates:
state := extractState(observed.GetUnits(), unitOneID)
expectedUnits <- []*proto.UnitExpected{
scenarios[id].nextInputunit,
&outputExpectedStream,
}
if state != scenarios[id].expectedStatus {
continue
}
// always ensure that output is healthy
outputState := extractState(observed.GetUnits(), unitOutID)
require.Equal(t, outputState, proto.State_HEALTHY)

timer.Reset(2 * time.Minute)
id++
case <-timer.C:
t.Fatal("timeout waiting for checkin")
default:
}
}
require.Eventually(t, func() bool {
events := tests.ReadLogLines(t, outPath)
return events > 0 // wait until we see one output event
}, 15*time.Second, 1*time.Second)
}

func extractState(units []*proto.UnitObserved, idx string) proto.State {
for _, unit := range units {
if unit.Id == idx {
return unit.GetState()
}
}
return -1
}

func getInputStream(id string, path string, stateIdx int) proto.UnitExpected {
return proto.UnitExpected{
Id: id,
Type: proto.UnitType_INPUT,
ConfigStateIdx: uint64(stateIdx),
State: proto.State_HEALTHY,
Config: &proto.UnitExpectedConfig{
Streams: []*proto.Stream{{
Id: "filebeat/log-default-system",
Source: tests.RequireNewStruct(map[string]interface{}{
"enabled": true,
"symlinks": true,
"type": "log",
"paths": []interface{}{path},
"scan_frequency": "500ms",
}),
}},
Type: "log",
Id: "log-input-test",
Name: "log-1",
Revision: 1,
},
}
}

0 comments on commit f3f772f

Please sign in to comment.