Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filestream returns error when an input with duplicated ID is created #41954

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]
- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731]
- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954]

*Heartbeat*

Expand Down
4 changes: 2 additions & 2 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func (c *crawler) Start(
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %w", err)
}
}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
newPath := strings.TrimSuffix(origPath, ".yml")
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
}
modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules)
modulesLoader := cfgfile.NewReloader(logp.L().Named("module.reloader"), fb.pipeline, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
}

Expand Down
28 changes: 22 additions & 6 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/go-concert/unison"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -164,17 +165,32 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
}

if settings.ID == "" {
cim.Logger.Error("filestream input ID without ID might lead to data" +
" duplication, please add an ID and restart Filebeat")
cim.Logger.Warn("filestream input without ID is discouraged, please add an ID and restart Filebeat")
}

metricsID := settings.ID
cim.idsMux.Lock()
if _, exists := cim.ids[settings.ID]; exists {
cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+
"will lead to data duplication, please use a different ID. Metrics "+
"collection has been disabled on this input.", settings.ID)
metricsID = ""
duplicatedInput := map[string]any{}
unpackErr := config.Unpack(&duplicatedInput)
if unpackErr != nil {
duplicatedInput["error"] = fmt.Errorf("failed to umpack dupliucated input config: %w", unpackErr).Error()
}

cim.Logger.Errorw(
fmt.Sprintf(
"filestream input '%s' is duplicated: input will NOT start",
settings.ID,
),
"input.cfg", conf.DebugString(config, true))

cim.idsMux.Unlock()
return nil, &common.ErrNonReloadable{
Err: fmt.Errorf(
"filestream input with ID '%s' already exists, this "+
"will lead to data duplication, please use a different ID",
settings.ID,
)}
}

// TODO: improve how inputs with empty IDs are tracked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package input_logfile

import (
"bytes"
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/elastic-agent-libs/config"
Expand All @@ -42,6 +45,18 @@ func (s *testSource) Name() string {
return s.name
}

type noopProspector struct{}

func (m noopProspector) Init(_, _ ProspectorCleaner, _ func(Source) string) error {
return nil
}

func (m noopProspector) Run(_ v2.Context, _ StateMetadataUpdater, _ HarvesterGroup) {}

func (m noopProspector) Test() error {
return nil
}

func TestSourceIdentifier_ID(t *testing.T) {
testCases := map[string]struct {
userID string
Expand Down Expand Up @@ -198,6 +213,67 @@ func TestInputManager_Create(t *testing.T) {
assert.NotContains(t, buff.String(),
"already exists")
})

t.Run("does not start an input with duplicated ID", func(t *testing.T) {
tcs := []struct {
name string
id string
}{
{name: "ID is empty", id: ""},
{name: "non-empty ID", id: "non-empty-ID"},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
testStore, err := storeReg.Get("test")
require.NoError(t, err)

log, buff := newBufferLogger()

cim := &InputManager{
Logger: log,
StateStore: testStateStore{Store: testStore},
Configure: func(_ *config.C) (Prospector, Harvester, error) {
var wg sync.WaitGroup

return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil
}}
cfg1 := config.MustNewConfigFrom(fmt.Sprintf(`
type: filestream
id: %s
paths:
- /var/log/foo
`, tc.id))

// Create a different 2nd config with duplicated ID to ensure
// the ID itself is the only requirement to prevent the 2nd input
// from being created.
cfg2 := config.MustNewConfigFrom(fmt.Sprintf(`
type: filestream
id: %s
paths:
- /var/log/bar
`, tc.id))

_, err = cim.Create(cfg1)
require.NoError(t, err, "1st input should have been created")

// Attempt to create an input with a duplicated ID
_, err = cim.Create(cfg2)
require.Error(t, err, "filestream should not have created an input with a duplicated ID")

logs := buff.String()
// Assert the logs contain the correct log message
assert.Contains(t, logs,
fmt.Sprintf("filestream input '%s' is duplicated:", tc.id))

// Assert the error contains the correct text
assert.Contains(t, err.Error(),
fmt.Sprintf("filestream input with ID '%s' already exists", tc.id))
})
}
})
}

func newBufferLogger() (*logp.Logger, *bytes.Buffer) {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/integration/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,5 +269,5 @@ logging:
filebeat.WaitForLogs(
"Input 'filestream' starting",
10*time.Second,
"Filebeat did log a validation error")
"Filebeat did not log a validation error")
}
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
}

if bt.config.ConfigMonitors.Enabled() {
bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors)
bt.monitorReloader = cfgfile.NewReloader(logp.L().Named("module.reload"), b.Publisher, bt.config.ConfigMonitors)
defer bt.monitorReloader.Stop()

err := bt.RunReloadableMonitors()
Expand Down
8 changes: 3 additions & 5 deletions libbeat/cfgfile/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"sync"

"github.com/joeshaw/multierror"
"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -71,8 +70,7 @@ func (r *RunnerList) Runners() []Runner {
//
// Runners might fail to start, it's the callers responsibility to
// handle any error. During execution, any encountered errors are
// accumulated in a `multierror.Errors` and returned as
// a `multierror.MultiError` upon completion.
// accumulated in a []errors and returned as errors.Join(errs) upon completion.
//
// While the stopping of runners occurs on separate goroutines,
// Reload will wait for all runners to finish before starting any new runners.
Expand All @@ -85,7 +83,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {
r.mutex.Lock()
defer r.mutex.Unlock()

var errs multierror.Errors
var errs []error

startList := map[uint64]*reload.ConfigWithMeta{}
stopList := r.copyRunnerList()
Expand Down Expand Up @@ -179,7 +177,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {
// above it is done asynchronously.
moduleRunning.Set(int64(len(r.runners)))

return errs.Err()
return errors.Join(errs...)
}

// Stop all runners
Expand Down
57 changes: 44 additions & 13 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cfgfile

import (
"errors"
"fmt"
"path/filepath"
"sync"
Expand All @@ -26,6 +27,7 @@
"github.com/joeshaw/multierror"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -42,8 +44,6 @@
},
}

debugf = logp.MakeDebug("cfgfile")

// configScans measures how many times the config dir was scanned for
// changes, configReloads measures how many times there were changes that
// triggered an actual reload.
Expand Down Expand Up @@ -101,10 +101,11 @@
path string
done chan struct{}
wg sync.WaitGroup
logger *logp.Logger
}

// NewReloader creates new Reloader instance for the given config
func NewReloader(pipeline beat.PipelineConnector, cfg *config.C) *Reloader {
func NewReloader(logger *logp.Logger, pipeline beat.PipelineConnector, cfg *config.C) *Reloader {
conf := DefaultDynamicConfig
_ = cfg.Unpack(&conf)

Expand All @@ -118,6 +119,7 @@
config: conf,
path: path,
done: make(chan struct{}),
logger: logger,
}
}

Expand All @@ -128,7 +130,7 @@
return nil
}

debugf("Checking module configs from: %s", rl.path)
rl.logger.Debugf("Checking module configs from: %s", rl.path)
gw := NewGlobWatcher(rl.path)

files, _, err := gw.Scan()
Expand All @@ -142,7 +144,7 @@
return fmt.Errorf("loading configs: %w", err)
}

debugf("Number of module configs found: %v", len(configs))
rl.logger.Debugf("Number of module configs found: %v", len(configs))

// Initialize modules
for _, c := range configs {
Expand Down Expand Up @@ -190,7 +192,7 @@
return

case <-time.After(rl.config.Reload.Period):
debugf("Scan for new config files")
rl.logger.Debug("Scan for new config files")
configScans.Add(1)

files, updated, err := gw.Scan()
Expand All @@ -209,13 +211,19 @@
// Load all config objects
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))
rl.logger.Debugf("Number of module configs found: %v", len(configs))

err = list.Reload(configs)
// Force reload on the next iteration if and only if this one failed.
// (Any errors are already logged by list.Reload, so we don't need to
// propagate the details further.)
forceReload = err != nil
// Force reload on the next iteration if and only if the error
// can be retried.
// Errors are already logged by list.Reload, so we don't need to
// propagate details any further.
forceReload = isReloadable(err)
if forceReload {
rl.logger.Debugf("error '%v' can be retried. Will try again in %s", err, rl.config.Reload.Period.String())
} else {
rl.logger.Debugf("error '%v' cannot retried. Modify any input file to reload.", err)
}
}

// Path loading is enabled but not reloading. Loads files only once and then stops.
Expand All @@ -228,6 +236,29 @@
}
}

func isReloadable(err error) bool {
if err == nil {
return false
}

type unwrapList interface {
Unwrap() []error
}

errList, isErrList := err.(unwrapList)

Check failure on line 248 in libbeat/cfgfile/reload.go

View workflow job for this annotation

GitHub Actions / lint (linux)

type assertion on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)
if !isErrList {
return !errors.Is(err, common.ErrNonReloadable{})
}

for _, e := range errList.Unwrap() {
if !errors.Is(e, common.ErrNonReloadable{}) {
return true
}
}

return false
}

// Load loads configuration files once.
func (rl *Reloader) Load(runnerFactory RunnerFactory) {
list := NewRunnerList("load", runnerFactory, rl.pipeline)
Expand All @@ -240,7 +271,7 @@

gw := NewGlobWatcher(rl.path)

debugf("Scan for config files")
rl.logger.Debug("Scan for config files")
files, _, err := gw.Scan()
if err != nil {
logp.Err("Error fetching new config files: %v", err)
Expand All @@ -249,7 +280,7 @@
// Load all config objects
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))
rl.logger.Debugf("Number of module configs found: %v", len(configs))

if err := list.Reload(configs); err != nil {
logp.Err("Error loading configuration files: %+v", err)
Expand Down
Loading
Loading