From 3f39dd8a9dc435fcaf649e7ddc44b8477c2301ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 19 Apr 2021 15:13:22 +0200 Subject: [PATCH] Add support for ignore_inactive in filestream input (#25036) This PR adds support for a more flexible file ignoring in `filestream` input. A new setting is introduced named `ignore_inactive`. At the moment it only supports two values: `since_last_start` and `since_first_start`. If `since_last_start` is selected, the input ignores every file that has not been updated since Filebeat has been started. If `since_first_start` is chosen files that haven't been written since Filebeat has been started the first time on a given host are ignored. --- .../config/filebeat.inputs.reference.yml.tmpl | 5 ++ filebeat/filebeat.reference.yml | 5 ++ filebeat/input/filestream/config.go | 1 + filebeat/input/filestream/prospector.go | 57 +++++++++++++++++-- libbeat/beat/info.go | 8 ++- libbeat/cmd/instance/beat.go | 16 +++++- libbeat/cmd/instance/beat_test.go | 28 +++++++++ x-pack/filebeat/filebeat.reference.yml | 5 ++ 8 files changed, 115 insertions(+), 10 deletions(-) diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index dd459a2cfac..d84d14c5983 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -320,6 +320,11 @@ filebeat.inputs: # Time strings like 2h (2 hours), 5m (5 minutes) can be used. #ignore_older: 0 + # Ignore files that have not been updated since the selected event. + # ignore_inactive is disabled by default, so no files are ignored by setting it to "". + # Available options: since_first_start, since_last_start. + #ignore_inactive: "" + # Defines the buffer size every harvester uses when fetching the file #harvester_buffer_size: 16384 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 38ecc9fb0b5..450b0f58f26 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -727,6 +727,11 @@ filebeat.inputs: # Time strings like 2h (2 hours), 5m (5 minutes) can be used. #ignore_older: 0 + # Ignore files that have not been updated since the selected event. + # ignore_inactive is disabled by default, so no files are ignored by setting it to "". + # Available options: since_first_start, since_last_start. + #ignore_inactive: "" + # Defines the buffer size every harvester uses when fetching the file #harvester_buffer_size: 16384 diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index e199a6e6870..e460e2627e0 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -40,6 +40,7 @@ type config struct { CleanRemoved bool `config:"clean_removed"` HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"` IgnoreOlder time.Duration `config:"ignore_older"` + IgnoreInactive ignoreInactiveType `config:"ignore_inactive"` } type closerConfig struct { diff --git a/filebeat/input/filestream/prospector.go b/filebeat/input/filestream/prospector.go index 322004421bf..97bf14efa7c 100644 --- a/filebeat/input/filestream/prospector.go +++ b/filebeat/input/filestream/prospector.go @@ -18,18 +18,35 @@ package filestream import ( + "fmt" "time" "github.com/urso/sderr" loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/go-concert/unison" ) +type ignoreInactiveType uint8 + const ( - prospectorDebugKey = "file_prospector" + InvalidIgnoreInactive = iota + IgnoreInactiveSinceLastStart + IgnoreInactiveSinceFirstStart + + ignoreInactiveSinceLastStartStr = "since_last_start" + ignoreInactiveSinceFirstStartStr = "since_first_start" + prospectorDebugKey = "file_prospector" +) + +var ( + ignoreInactiveSettings = map[string]ignoreInactiveType{ + ignoreInactiveSinceLastStartStr: IgnoreInactiveSinceLastStart, + ignoreInactiveSinceFirstStartStr: IgnoreInactiveSinceFirstStart, + } ) // fileProspector implements the Prospector interface. @@ -37,11 +54,12 @@ const ( // The FS events then trigger either new Harvester runs or updates // the statestore. type fileProspector struct { - filewatcher loginp.FSWatcher - identifier fileIdentifier - ignoreOlder time.Duration - cleanRemoved bool - stateChangeCloser stateChangeCloserConfig + filewatcher loginp.FSWatcher + identifier fileIdentifier + ignoreOlder time.Duration + ignoreInactiveSince ignoreInactiveType + cleanRemoved bool + stateChangeCloser stateChangeCloserConfig } func (p *fileProspector) Init(cleaner loginp.ProspectorCleaner) error { @@ -101,6 +119,8 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h }) tg.Go(func() error { + ignoreInactiveSince := getIgnoreSince(p.ignoreInactiveSince, ctx.Agent) + for ctx.Cancelation.Err() == nil { fe := p.filewatcher.Event() @@ -130,6 +150,11 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h break } } + if !ignoreInactiveSince.IsZero() && fe.Info.ModTime().Sub(ignoreInactiveSince) <= 0 { + log.Debugf("Ignore file because ignore_since.* reached time %v. File %s", p.ignoreInactiveSince, fe.NewPath) + break + } + hg.Start(ctx, src) case loginp.OpTruncate: @@ -217,3 +242,23 @@ func (p *fileProspector) stopHarvesterGroup(log *logp.Logger, hg loginp.Harveste func (p *fileProspector) Test() error { panic("TODO: implement me") } + +func getIgnoreSince(t ignoreInactiveType, info beat.Info) time.Time { + switch t { + case IgnoreInactiveSinceLastStart: + return info.StartTime + case IgnoreInactiveSinceFirstStart: + return info.FirstStart + default: + return time.Time{} + } +} + +func (t *ignoreInactiveType) Unpack(v string) error { + val, ok := ignoreInactiveSettings[v] + if !ok { + return fmt.Errorf("invalid ignore_inactive setting: %s", v) + } + *t = val + return nil +} diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index c808ba214ea..078833b3bde 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -17,7 +17,11 @@ package beat -import "github.com/gofrs/uuid" +import ( + "time" + + "github.com/gofrs/uuid" +) // Info stores a beats instance meta data. type Info struct { @@ -29,6 +33,8 @@ type Info struct { Hostname string // hostname ID uuid.UUID // ID assigned to beat machine EphemeralID uuid.UUID // ID assigned to beat process invocation (PID) + FirstStart time.Time // The time of the first start of the Beat. + StartTime time.Time // The time of last start of the Beat. Updated when the Beat is started or restarted. // Monitoring-related fields Monitoring struct { diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index b4c8dae9d89..f3254857505 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -239,6 +239,8 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool) (*Beat, error) { Name: hostname, Hostname: hostname, ID: id, + FirstStart: time.Now(), + StartTime: time.Now(), EphemeralID: metrics.EphemeralID(), }, Fields: fields, @@ -695,7 +697,8 @@ func (b *Beat) configure(settings Settings) error { func (b *Beat) loadMeta(metaPath string) error { type meta struct { - UUID uuid.UUID `json:"uuid"` + UUID uuid.UUID `json:"uuid"` + FirstStart time.Time `json:"first_start"` } logp.Debug("beat", "Beat metadata path: %v", metaPath) @@ -713,14 +716,21 @@ func (b *Beat) loadMeta(metaPath string) error { } f.Close() + + if !m.FirstStart.IsZero() { + b.Info.FirstStart = m.FirstStart + } valid := m.UUID != uuid.Nil if valid { b.Info.ID = m.UUID + } + + if valid && !m.FirstStart.IsZero() { return nil } } - // file does not exist or ID is invalid, let's create a new one + // file does not exist or ID is invalid or first start time is not defined, let's create a new one // write temporary file first tempFile := metaPath + ".new" @@ -729,7 +739,7 @@ func (b *Beat) loadMeta(metaPath string) error { return fmt.Errorf("Failed to create Beat meta file: %s", err) } - encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID}) + encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart}) err = f.Sync() if err != nil { return fmt.Errorf("Beat meta file failed to write: %s", err) diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index a0db00c853c..86ae2697d10 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -114,3 +114,31 @@ func TestEmptyMetaJson(t *testing.T) { assert.Equal(t, nil, err, "Unable to load meta file properly") assert.NotEqual(t, uuid.Nil, b.Info.ID, "Beats UUID is not set") } + +func TestMetaJsonWithTimestamp(t *testing.T) { + firstBeat, err := NewBeat("filebeat", "testidx", "0.9", false) + if err != nil { + panic(err) + } + firstStart := firstBeat.Info.FirstStart + + metaFile, err := ioutil.TempFile("../test", "meta.json") + assert.Equal(t, nil, err, "Unable to create temporary meta file") + + metaPath := metaFile.Name() + metaFile.Close() + defer os.Remove(metaPath) + + err = firstBeat.loadMeta(metaPath) + assert.Equal(t, nil, err, "Unable to load meta file properly") + + secondBeat, err := NewBeat("filebeat", "testidx", "0.9", false) + if err != nil { + panic(err) + } + assert.False(t, firstStart.Equal(secondBeat.Info.FirstStart), "Before meta.json is loaded, first start must be different") + secondBeat.loadMeta(metaPath) + + assert.Equal(t, nil, err, "Unable to load meta file properly") + assert.True(t, firstStart.Equal(secondBeat.Info.FirstStart), "Cannot load first start") +} diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 0773814bb2d..ca901cf26dc 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -2578,6 +2578,11 @@ filebeat.inputs: # Time strings like 2h (2 hours), 5m (5 minutes) can be used. #ignore_older: 0 + # Ignore files that have not been updated since the selected event. + # ignore_inactive is disabled by default, so no files are ignored by setting it to "". + # Available options: since_first_start, since_last_start. + #ignore_inactive: "" + # Defines the buffer size every harvester uses when fetching the file #harvester_buffer_size: 16384