Skip to content

Commit

Permalink
Add support for ignore_inactive in filestream input (elastic#25036)
Browse files Browse the repository at this point in the history
This PR adds support for a more flexible file ignoring in `filestream` input. A new setting is introduced named `ignore_inactive`. At the moment it only supports two values: `since_last_start` and `since_first_start`.

If `since_last_start` is selected, the input ignores every file that has not been updated since Filebeat has been started. If `since_first_start` is chosen files that haven't been written since Filebeat has been started the first time on a given host are ignored.
  • Loading branch information
kvch authored Apr 19, 2021
1 parent 7561201 commit 3f39dd8
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 10 deletions.
5 changes: 5 additions & 0 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ filebeat.inputs:
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 0

# Ignore files that have not been updated since the selected event.
# ignore_inactive is disabled by default, so no files are ignored by setting it to "".
# Available options: since_first_start, since_last_start.
#ignore_inactive: ""

# Defines the buffer size every harvester uses when fetching the file
#harvester_buffer_size: 16384

Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,11 @@ filebeat.inputs:
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 0

# Ignore files that have not been updated since the selected event.
# ignore_inactive is disabled by default, so no files are ignored by setting it to "".
# Available options: since_first_start, since_last_start.
#ignore_inactive: ""

# Defines the buffer size every harvester uses when fetching the file
#harvester_buffer_size: 16384

Expand Down
1 change: 1 addition & 0 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type config struct {
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"`
IgnoreOlder time.Duration `config:"ignore_older"`
IgnoreInactive ignoreInactiveType `config:"ignore_inactive"`
}

type closerConfig struct {
Expand Down
57 changes: 51 additions & 6 deletions filebeat/input/filestream/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,48 @@
package filestream

import (
"fmt"
"time"

"github.com/urso/sderr"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/unison"
)

type ignoreInactiveType uint8

const (
prospectorDebugKey = "file_prospector"
InvalidIgnoreInactive = iota
IgnoreInactiveSinceLastStart
IgnoreInactiveSinceFirstStart

ignoreInactiveSinceLastStartStr = "since_last_start"
ignoreInactiveSinceFirstStartStr = "since_first_start"
prospectorDebugKey = "file_prospector"
)

var (
ignoreInactiveSettings = map[string]ignoreInactiveType{
ignoreInactiveSinceLastStartStr: IgnoreInactiveSinceLastStart,
ignoreInactiveSinceFirstStartStr: IgnoreInactiveSinceFirstStart,
}
)

// fileProspector implements the Prospector interface.
// It contains a file scanner which returns file system events.
// The FS events then trigger either new Harvester runs or updates
// the statestore.
type fileProspector struct {
filewatcher loginp.FSWatcher
identifier fileIdentifier
ignoreOlder time.Duration
cleanRemoved bool
stateChangeCloser stateChangeCloserConfig
filewatcher loginp.FSWatcher
identifier fileIdentifier
ignoreOlder time.Duration
ignoreInactiveSince ignoreInactiveType
cleanRemoved bool
stateChangeCloser stateChangeCloserConfig
}

func (p *fileProspector) Init(cleaner loginp.ProspectorCleaner) error {
Expand Down Expand Up @@ -101,6 +119,8 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h
})

tg.Go(func() error {
ignoreInactiveSince := getIgnoreSince(p.ignoreInactiveSince, ctx.Agent)

for ctx.Cancelation.Err() == nil {
fe := p.filewatcher.Event()

Expand Down Expand Up @@ -130,6 +150,11 @@ func (p *fileProspector) Run(ctx input.Context, s loginp.StateMetadataUpdater, h
break
}
}
if !ignoreInactiveSince.IsZero() && fe.Info.ModTime().Sub(ignoreInactiveSince) <= 0 {
log.Debugf("Ignore file because ignore_since.* reached time %v. File %s", p.ignoreInactiveSince, fe.NewPath)
break
}

hg.Start(ctx, src)

case loginp.OpTruncate:
Expand Down Expand Up @@ -217,3 +242,23 @@ func (p *fileProspector) stopHarvesterGroup(log *logp.Logger, hg loginp.Harveste
func (p *fileProspector) Test() error {
panic("TODO: implement me")
}

func getIgnoreSince(t ignoreInactiveType, info beat.Info) time.Time {
switch t {
case IgnoreInactiveSinceLastStart:
return info.StartTime
case IgnoreInactiveSinceFirstStart:
return info.FirstStart
default:
return time.Time{}
}
}

func (t *ignoreInactiveType) Unpack(v string) error {
val, ok := ignoreInactiveSettings[v]
if !ok {
return fmt.Errorf("invalid ignore_inactive setting: %s", v)
}
*t = val
return nil
}
8 changes: 7 additions & 1 deletion libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package beat

import "github.com/gofrs/uuid"
import (
"time"

"github.com/gofrs/uuid"
)

// Info stores a beats instance meta data.
type Info struct {
Expand All @@ -29,6 +33,8 @@ type Info struct {
Hostname string // hostname
ID uuid.UUID // ID assigned to beat machine
EphemeralID uuid.UUID // ID assigned to beat process invocation (PID)
FirstStart time.Time // The time of the first start of the Beat.
StartTime time.Time // The time of last start of the Beat. Updated when the Beat is started or restarted.

// Monitoring-related fields
Monitoring struct {
Expand Down
16 changes: 13 additions & 3 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool) (*Beat, error) {
Name: hostname,
Hostname: hostname,
ID: id,
FirstStart: time.Now(),
StartTime: time.Now(),
EphemeralID: metrics.EphemeralID(),
},
Fields: fields,
Expand Down Expand Up @@ -695,7 +697,8 @@ func (b *Beat) configure(settings Settings) error {

func (b *Beat) loadMeta(metaPath string) error {
type meta struct {
UUID uuid.UUID `json:"uuid"`
UUID uuid.UUID `json:"uuid"`
FirstStart time.Time `json:"first_start"`
}

logp.Debug("beat", "Beat metadata path: %v", metaPath)
Expand All @@ -713,14 +716,21 @@ func (b *Beat) loadMeta(metaPath string) error {
}

f.Close()

if !m.FirstStart.IsZero() {
b.Info.FirstStart = m.FirstStart
}
valid := m.UUID != uuid.Nil
if valid {
b.Info.ID = m.UUID
}

if valid && !m.FirstStart.IsZero() {
return nil
}
}

// file does not exist or ID is invalid, let's create a new one
// file does not exist or ID is invalid or first start time is not defined, let's create a new one

// write temporary file first
tempFile := metaPath + ".new"
Expand All @@ -729,7 +739,7 @@ func (b *Beat) loadMeta(metaPath string) error {
return fmt.Errorf("Failed to create Beat meta file: %s", err)
}

encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID})
encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
err = f.Sync()
if err != nil {
return fmt.Errorf("Beat meta file failed to write: %s", err)
Expand Down
28 changes: 28 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,31 @@ func TestEmptyMetaJson(t *testing.T) {
assert.Equal(t, nil, err, "Unable to load meta file properly")
assert.NotEqual(t, uuid.Nil, b.Info.ID, "Beats UUID is not set")
}

func TestMetaJsonWithTimestamp(t *testing.T) {
firstBeat, err := NewBeat("filebeat", "testidx", "0.9", false)
if err != nil {
panic(err)
}
firstStart := firstBeat.Info.FirstStart

metaFile, err := ioutil.TempFile("../test", "meta.json")
assert.Equal(t, nil, err, "Unable to create temporary meta file")

metaPath := metaFile.Name()
metaFile.Close()
defer os.Remove(metaPath)

err = firstBeat.loadMeta(metaPath)
assert.Equal(t, nil, err, "Unable to load meta file properly")

secondBeat, err := NewBeat("filebeat", "testidx", "0.9", false)
if err != nil {
panic(err)
}
assert.False(t, firstStart.Equal(secondBeat.Info.FirstStart), "Before meta.json is loaded, first start must be different")
secondBeat.loadMeta(metaPath)

assert.Equal(t, nil, err, "Unable to load meta file properly")
assert.True(t, firstStart.Equal(secondBeat.Info.FirstStart), "Cannot load first start")
}
5 changes: 5 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,11 @@ filebeat.inputs:
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 0

# Ignore files that have not been updated since the selected event.
# ignore_inactive is disabled by default, so no files are ignored by setting it to "".
# Available options: since_first_start, since_last_start.
#ignore_inactive: ""

# Defines the buffer size every harvester uses when fetching the file
#harvester_buffer_size: 16384

Expand Down

0 comments on commit 3f39dd8

Please sign in to comment.