Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filestream: validate input id on startup #41731

Merged
merged 11 commits into from
Dec 3, 2024
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]

- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input without ID or with a duplicated ID {pull}41731[41731]

*Heartbeat*

Expand Down
6 changes: 6 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/filebeat/input/filestream/takeover"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/filebeat/input/v2/compat"
Expand Down Expand Up @@ -291,6 +292,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
defer stateStore.Close()

err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("filestream"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filestream input actually uses input.filestream as the logger name, so I suggest keeping the pattern.

Suggested change
err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("filestream"))
err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("input.filestream"))

if err != nil {
logp.Err("invalid filestream configuration: %+v", err)
return err
}
err = processLogInputTakeOver(stateStore, config)
if err != nil {
logp.Err("Failed to attempt filestream state take over: %+v", err)
Expand Down
77 changes: 77 additions & 0 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package filestream

import (
"fmt"
"strings"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

// Config stores the options of a file stream.
Expand Down Expand Up @@ -142,3 +144,78 @@ func (c *config) Validate() error {

return nil
}

// ValidateInputIDs checks all filestream inputs to ensure all have an ID and
// the ID is unique. If there is any empty or duplicated ID, it logs an error
// containing the offending input configurations and returns an error containing
// the duplicated IDs.
rdner marked this conversation as resolved.
Show resolved Hide resolved
func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error {
ids := make(map[string][]*conf.C)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion]
Rename this map to something that better describe its contents. It is a map of inputs (or configs) grouped by ID, so ids does not seem to be the better description to it.

var duplicates []string
var empty []*conf.C
for _, input := range inputs {
fsInput := struct {
ID string `config:"id"`
Type string `config:"type"`
}{}
err := input.Unpack(&fsInput)
if err != nil {
return fmt.Errorf("failed to unpack filestream input configuration: %w", err)
}
if fsInput.Type == "filestream" {
if fsInput.ID == "" {
empty = append(empty, input)
continue
}
ids[fsInput.ID] = append(ids[fsInput.ID], input)
if len(ids[fsInput.ID]) > 1 {
rdner marked this conversation as resolved.
Show resolved Hide resolved
duplicates = append(duplicates, fsInput.ID)
}
}
}

var errs []string
if empty != nil {
errs = append(errs, "input without ID")
}
if len(duplicates) != 0 {
errs = append(errs, fmt.Sprintf("filestream inputs with duplicated IDs: %v", strings.Join(duplicates, ",")))
}

if len(errs) != 0 {
jsonDupCfg := collectOffendingInputs(duplicates, empty, ids)
logger.Errorw("filestream inputs with invalid IDs", "inputs", jsonDupCfg)

return fmt.Errorf("filestream inputs validation error: %s", strings.Join(errs, ", "))
}

return nil
}

func collectOffendingInputs(duplicates []string, empty []*conf.C, ids map[string][]*conf.C) []map[string]interface{} {
var cfgs []map[string]interface{}

if len(empty) > 0 {
for _, cfg := range empty {
toJson := map[string]interface{}{}
err := cfg.Unpack(&toJson)
if err != nil {
toJson["emptyID"] = fmt.Sprintf("failed to unpack config: %v", err)
}
cfgs = append(cfgs, toJson)
}
}

for _, id := range duplicates {
for _, dupcfgs := range ids[id] {
toJson := map[string]interface{}{}
err := dupcfgs.Unpack(&toJson)
if err != nil {
toJson[id] = fmt.Sprintf("failed to unpack config: %v", err)
}
cfgs = append(cfgs, toJson)
}
}
rdner marked this conversation as resolved.
Show resolved Hide resolved

return cfgs
}
161 changes: 161 additions & 0 deletions filebeat/input/filestream/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
package filestream

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest/observer"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

func TestConfigValidate(t *testing.T) {
Expand All @@ -30,3 +36,158 @@ func TestConfigValidate(t *testing.T) {
require.Error(t, err)
})
}

func TestValidateInputIDs(t *testing.T) {
tcs := []struct {
name string
cfg []string
assertErr func(t *testing.T, err error)
assertLogs func(t *testing.T, buff *observer.ObservedLogs)
}{
{
name: "empty config",
cfg: []string{""},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "empty config should not return an error")
},
},
{
name: "empty ID",
cfg: []string{`
type: filestream
`, `
type: filestream
id: some-id-1
`, `
type: filestream
id: some-id-2
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "input without ID")
},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))

},
},
{
name: "duplicated IDs",
rdner marked this conversation as resolved.
Show resolved Hide resolved
cfg: []string{`
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
assert.ErrorContains(t, err, "duplicated-id-1")
assert.ErrorContains(t, err, "duplicated-id-2")
},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))
},
},
{
name: "duplicated IDs and empty ID",
cfg: []string{`
type: filestream
`, `
type: filestream
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "input without ID")
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"type":"filestream"},{"type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))

},
},
{
name: "only unique IDs",
cfg: []string{`
type: filestream
id: unique-id-1
`, `
type: filestream
id: unique-id-2
`, `
type: filestream
id: unique-id-3
`,
},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "only unique IDs should not return an error")
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
var inputs []*conf.C
for _, c := range tc.cfg {
cfg, err := conf.NewConfigFrom(c)
require.NoError(t, err, "could not create input configuration")
inputs = append(inputs, cfg)
}
err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err, "could not setup log for development")

err = ValidateInputIDs(inputs, logp.L())
tc.assertErr(t, err)
if tc.assertLogs != nil {
tc.assertLogs(t, logp.ObserverLogs())
}
})
}
}
Loading
Loading