Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filestream: validate input id on startup #41731

Merged
merged 11 commits into from
Dec 3, 2024
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]

- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731]

*Heartbeat*

Expand Down
6 changes: 6 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/filebeat/input/filestream/takeover"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/filebeat/input/v2/compat"
Expand Down Expand Up @@ -291,6 +292,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
defer stateStore.Close()

err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("input.filestream"))
if err != nil {
logp.Err("invalid filestream configuration: %+v", err)
return err
}
err = processLogInputTakeOver(stateStore, config)
if err != nil {
logp.Err("Failed to attempt filestream state take over: %+v", err)
Expand Down
59 changes: 59 additions & 0 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package filestream

import (
"fmt"
"strings"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

// Config stores the options of a file stream.
Expand Down Expand Up @@ -142,3 +144,60 @@ func (c *config) Validate() error {

return nil
}

// ValidateInputIDs checks all filestream inputs to ensure all input IDs are
// unique. If there is a duplicated ID, it logs an error containing the offending
// input configurations and returns an error containing the duplicated IDs.
// A single empty ID is a valid ID as it's unique, however multiple empty IDs
// are not unique and are therefore are treated as any other duplicated ID.
func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error {
duplicatedConfigs := make(map[string][]*conf.C)
var duplicates []string
for _, input := range inputs {
fsInput := struct {
ID string `config:"id"`
Type string `config:"type"`
}{}
err := input.Unpack(&fsInput)
if err != nil {
return fmt.Errorf("failed to unpack filestream input configuration: %w", err)
}
if fsInput.Type == "filestream" {
duplicatedConfigs[fsInput.ID] = append(duplicatedConfigs[fsInput.ID], input)
// we just need to collect the duplicated IDs once, therefore collect
// it only the first time we see a duplicated ID.
if len(duplicatedConfigs[fsInput.ID]) == 2 {
duplicates = append(duplicates, fsInput.ID)
}
}
}

if len(duplicates) != 0 {
jsonDupCfg := collectOffendingInputs(duplicates, duplicatedConfigs)
logger.Errorw("filestream inputs with duplicated IDs", "inputs", jsonDupCfg)
var quotedDuplicates []string
for _, dup := range duplicates {
quotedDuplicates = append(quotedDuplicates, fmt.Sprintf("%q", dup))
}
return fmt.Errorf("filestream inputs validation error: filestream inputs with duplicated IDs: %v", strings.Join(quotedDuplicates, ","))
}

return nil
}

func collectOffendingInputs(duplicates []string, ids map[string][]*conf.C) []map[string]interface{} {
var cfgs []map[string]interface{}

for _, id := range duplicates {
for _, dupcfgs := range ids[id] {
toJson := map[string]interface{}{}
err := dupcfgs.Unpack(&toJson)
if err != nil {
toJson[id] = fmt.Sprintf("failed to unpack config: %v", err)
}
cfgs = append(cfgs, toJson)
}
}
rdner marked this conversation as resolved.
Show resolved Hide resolved

return cfgs
}
190 changes: 190 additions & 0 deletions filebeat/input/filestream/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
package filestream

import (
"encoding/json"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest/observer"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

func TestConfigValidate(t *testing.T) {
Expand All @@ -30,3 +37,186 @@ func TestConfigValidate(t *testing.T) {
require.Error(t, err)
})
}

func TestValidateInputIDs(t *testing.T) {
tcs := []struct {
name string
cfg []string
assertErr func(t *testing.T, err error)
assertLogs func(t *testing.T, buff *observer.ObservedLogs)
}{
{
name: "empty config",
cfg: []string{""},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "empty config should not return an error")
},
},
{
name: "one empty ID is allowed",
cfg: []string{`
type: filestream
`, `
type: filestream
id: some-id-1
`, `
type: filestream
id: some-id-2
`,
},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "one empty id is allowed")
},
},
{
name: "duplicated empty ID",
cfg: []string{`
type: filestream
paths:
- "/tmp/empty-1"
`, `
type: filestream
paths:
- "/tmp/empty-2"
`, `
type: filestream
id: unique-id-1
`, `
type: filestream
id: unique-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, `filestream inputs with duplicated IDs: ""`)

},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"paths":["/tmp/empty-1"],"type":"filestream"},{"paths":["/tmp/empty-2"],"type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))
},
}, {
name: "duplicated IDs",
rdner marked this conversation as resolved.
Show resolved Hide resolved
cfg: []string{`
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
assert.ErrorContains(t, err, "duplicated-id-1")
assert.ErrorContains(t, err, "duplicated-id-2")
assert.Equal(t, strings.Count(err.Error(), "duplicated-id-1"), 1, "each IDs should appear only once")
assert.Equal(t, strings.Count(err.Error(), "duplicated-id-2"), 1, "each IDs should appear only once")

},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))
},
},
{
name: "duplicated IDs and empty ID",
cfg: []string{`
type: filestream
`, `
type: filestream
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"type":"filestream"},{"type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))

},
},
{
name: "only unique IDs",
cfg: []string{`
type: filestream
id: unique-id-1
`, `
type: filestream
id: unique-id-2
`, `
type: filestream
id: unique-id-3
`,
},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "only unique IDs should not return an error")
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
var inputs []*conf.C
for _, c := range tc.cfg {
cfg, err := conf.NewConfigFrom(c)
require.NoError(t, err, "could not create input configuration")
inputs = append(inputs, cfg)
}
err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err, "could not setup log for development")

err = ValidateInputIDs(inputs, logp.L())
tc.assertErr(t, err)
if tc.assertLogs != nil {
tc.assertLogs(t, logp.ObserverLogs())
}
})
}
}
Loading
Loading