From 5617b65d7ed645ff706aa5922d527cfba91b1530 Mon Sep 17 00:00:00 2001 From: Alex Resnick Date: Thu, 10 Feb 2022 06:32:08 -0600 Subject: [PATCH] Let users configure multiple instances of the same module from filebeat.yml (#29952) --- CHANGELOG.next.asciidoc | 4 +- filebeat/fileset/factory.go | 12 ++--- filebeat/fileset/fileset.go | 20 +++---- filebeat/fileset/fileset_test.go | 6 +-- filebeat/fileset/modules.go | 86 ++++++++++++++---------------- filebeat/fileset/modules_test.go | 41 +++++++------- filebeat/fileset/pipelines.go | 12 ++--- filebeat/fileset/pipelines_test.go | 10 ++-- 8 files changed, 95 insertions(+), 96 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2b983eb7e8e..d12b9704f52 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -48,13 +48,13 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - tcp/unix input: Stop accepting connections after socket is closed. {pull}29712[29712] - Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695] +- Fix multiple instances of the same module configured within `filebeat.modules` in filebeat.yml. {issue}29649[29649] {pull}29952[29952] - aws-s3: fix race condition in states used by s3-poller. {issue}30123[30123] {pull}30131[30131] -- cisco module: Fix change the broke ASA and FTD configs that used `var.input: syslog`. {pull}30072[30072] - Fix broken Kafka input {issue}29746[29746] {pull}30277[30277] - *Heartbeat* + *Metricbeat* - Enhance metricbeat on openshift documentation {pull}30054[30054] diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 43aa8e1636e..12c74d33e31 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -178,9 +178,9 @@ func (p *inputsRunner) Start() { input.Start() } - // Loop through and add modules, only 1 normally - for m := range p.moduleRegistry.registry { - moduleList.Add(m) + // Loop through and add modules + for _, module := range p.moduleRegistry.registry { + moduleList.Add(module.config.Module) } } @@ -193,9 +193,9 @@ func (p *inputsRunner) Stop() { input.Stop() } - // Loop through and remove modules, only 1 normally - for m := range p.moduleRegistry.registry { - moduleList.Remove(m) + // Loop through and remove modules + for _, module := range p.moduleRegistry.registry { + moduleList.Remove(module.config.Module) } } diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index dae801738f9..52947f0c67f 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -46,7 +46,7 @@ import ( // Fileset struct is the representation of a fileset. type Fileset struct { name string - mcfg *ModuleConfig + mname string fcfg *FilesetConfig modulePath string manifest *manifest @@ -63,17 +63,17 @@ type pipeline struct { func New( modulesPath string, name string, - mcfg *ModuleConfig, + mname string, fcfg *FilesetConfig) (*Fileset, error) { - modulePath := filepath.Join(modulesPath, mcfg.Module) + modulePath := filepath.Join(modulesPath, mname) if _, err := os.Stat(modulePath); os.IsNotExist(err) { - return nil, fmt.Errorf("module %s (%s) doesn't exist", mcfg.Module, modulePath) + return nil, fmt.Errorf("module %s (%s) doesn't exist", mname, modulePath) } return &Fileset{ name: name, - mcfg: mcfg, + mname: mname, fcfg: fcfg, modulePath: modulePath, }, nil @@ -81,7 +81,7 @@ func New( // String returns the module and the name of the fileset. func (fs *Fileset) String() string { - return fs.mcfg.Module + "/" + fs.name + return fs.mname + "/" + fs.name } // Read reads the manifest file and evaluates the variables. @@ -336,7 +336,7 @@ func (fs *Fileset) getBuiltinVars(info beat.Info) (map[string]interface{}, error "prefix": info.IndexPrefix, "hostname": hostname, "domain": domain, - "module": fs.mcfg.Module, + "module": fs.mname, "fileset": fs.name, "beatVersion": info.Version, }, nil @@ -391,7 +391,7 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) { } // force our the module/fileset name - err = cfg.SetString("_module_name", -1, fs.mcfg.Module) + err = cfg.SetString("_module_name", -1, fs.mname) if err != nil { return nil, fmt.Errorf("Error setting the _module_name cfg in the input config: %v", err) } @@ -400,7 +400,7 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) { return nil, fmt.Errorf("Error setting the _fileset_name cfg in the input config: %v", err) } - cfg.PrintDebugf("Merged input config for fileset %s/%s", fs.mcfg.Module, fs.name) + cfg.PrintDebugf("Merged input config for fileset %s/%s", fs.mname, fs.name) return cfg, nil } @@ -414,7 +414,7 @@ func (fs *Fileset) getPipelineIDs(info beat.Info) ([]string, error) { return nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) } - pipelineIDs = append(pipelineIDs, FormatPipelineID(info.IndexPrefix, fs.mcfg.Module, fs.name, path, info.Version)) + pipelineIDs = append(pipelineIDs, FormatPipelineID(info.IndexPrefix, fs.mname, fs.name, path, info.Version)) } return pipelineIDs, nil diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index d87f99947e9..de79d570cdc 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -44,7 +44,7 @@ func makeTestInfo(version string) beat.Info { func getModuleForTesting(t *testing.T, module, fileset string) *Fileset { modulesPath, err := filepath.Abs("../module") require.NoError(t, err) - fs, err := New(modulesPath, fileset, &ModuleConfig{Module: module}, &FilesetConfig{}) + fs, err := New(modulesPath, fileset, module, &FilesetConfig{}) require.NoError(t, err) return fs @@ -98,7 +98,7 @@ func TestEvaluateVarsNginx(t *testing.T) { func TestEvaluateVarsNginxOverride(t *testing.T) { modulesPath, err := filepath.Abs("../module") require.NoError(t, err) - fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{ + fs, err := New(modulesPath, "access", "nginx", &FilesetConfig{ Var: map[string]interface{}{ "pipeline": "no_plugins", }, @@ -239,7 +239,7 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{ + fs, err := New(modulesPath, "access", "nginx", &FilesetConfig{ Input: test.input, }) require.NoError(t, err) diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index b8c2c9e374a..3591fb97590 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -37,10 +37,15 @@ import ( const logName = "modules" type ModuleRegistry struct { - registry map[string]map[string]*Fileset // module -> fileset -> Fileset + registry []Module // []Module -> []Fileset log *logp.Logger } +type Module struct { + filesets []Fileset + config ModuleConfig +} + // newModuleRegistry reads and loads the configured module into the registry. func newModuleRegistry(modulesPath string, moduleConfigs []*ModuleConfig, @@ -48,27 +53,26 @@ func newModuleRegistry(modulesPath string, beatInfo beat.Info, ) (*ModuleRegistry, error) { reg := ModuleRegistry{ - registry: map[string]map[string]*Fileset{}, + registry: []Module{}, log: logp.NewLogger(logName), } - for _, mcfg := range moduleConfigs { - if mcfg.Enabled != nil && !(*mcfg.Enabled) { + if mcfg.Module == "" || (mcfg.Enabled != nil && !(*mcfg.Enabled)) { continue } - // Look for moved modules - if module, moved := getCurrentModuleName(modulesPath, mcfg.Module); moved { - reg.log.Warnf("Configuration uses the old name %q for module %q, please update your configuration.", mcfg.Module, module) - mcfg.Module = module + if moduleName, moved := getCurrentModuleName(modulesPath, mcfg.Module); moved { + reg.log.Warnf("Configuration uses the old name %q for module %q, please update your configuration.", mcfg.Module, moduleName) + mcfg.Module = moduleName } - - reg.registry[mcfg.Module] = map[string]*Fileset{} moduleFilesets, err := getModuleFilesets(modulesPath, mcfg.Module) if err != nil { return nil, fmt.Errorf("error getting filesets for module %s: %v", mcfg.Module, err) } - + module := Module{ + config: *mcfg, + filesets: []Fileset{}, + } for filesetName, fcfg := range mcfg.Filesets { fcfg, err = applyOverrides(fcfg, mcfg.Module, filesetName, overrides) @@ -89,29 +93,23 @@ func newModuleRegistry(modulesPath string, return nil, fmt.Errorf("fileset %s/%s is configured but doesn't exist", mcfg.Module, filesetName) } - fileset, err := New(modulesPath, filesetName, mcfg, fcfg) + fileset, err := New(modulesPath, filesetName, mcfg.Module, fcfg) if err != nil { return nil, err } if err = fileset.Read(beatInfo); err != nil { return nil, fmt.Errorf("error reading fileset %s/%s: %v", mcfg.Module, filesetName, err) } - reg.registry[mcfg.Module][filesetName] = fileset + module.filesets = append(module.filesets, *fileset) } + reg.registry = append(reg.registry, module) } - logp.Info("Enabled modules/filesets: %s", reg.InfoString()) - for _, mod := range reg.ModuleNames() { - if mod == "" { - continue - } - filesets, err := reg.ModuleConfiguredFilesets(mod) - if err != nil { - logp.Err("Failed listing filesets for module %s", mod) - continue - } + reg.log.Infof("Enabled modules/filesets: %s", reg.InfoString()) + for _, mod := range reg.registry { + filesets := reg.ModuleConfiguredFilesets(mod) if len(filesets) == 0 { - return nil, errors.Errorf("module %s is configured but has no enabled filesets", mod) + return nil, errors.Errorf("module %s is configured but has no enabled filesets", mod.config.Module) } } return ®, nil @@ -322,12 +320,11 @@ func appendWithoutDuplicates(moduleConfigs []*ModuleConfig, modules []string) ([ func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) { var result []*common.Config - for module, filesets := range reg.registry { - for name, fileset := range filesets { + for _, module := range reg.registry { + for _, fileset := range module.filesets { fcfg, err := fileset.getInputConfig() if err != nil { - return result, fmt.Errorf("error getting config for fileset %s/%s: %v", - module, name, err) + return result, fmt.Errorf("error getting config for fileset %s/%s: %v", module.config.Module, fileset.name, err) } result = append(result, fcfg) } @@ -339,18 +336,18 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) { // be shown to the user func (reg *ModuleRegistry) InfoString() string { var result string - for module, filesets := range reg.registry { - var filesetNames string - for name := range filesets { + for _, module := range reg.registry { + for _, fileset := range module.filesets { + var filesetNames string if filesetNames != "" { filesetNames += ", " } - filesetNames += name - } - if result != "" { - result += ", " + filesetNames += fileset.name + if result != "" { + result += ", " + } + result += fmt.Sprintf("%s (%s)", module.config.Module, filesetNames) } - result += fmt.Sprintf("%s (%s)", module, filesetNames) } return result } @@ -416,8 +413,8 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc func (reg *ModuleRegistry) Empty() bool { count := 0 - for _, filesets := range reg.registry { - count += len(filesets) + for _, module := range reg.registry { + count += len(module.filesets) } return count == 0 } @@ -425,8 +422,8 @@ func (reg *ModuleRegistry) Empty() bool { // ModuleNames returns the names of modules in the ModuleRegistry. func (reg *ModuleRegistry) ModuleNames() []string { var modules []string - for m := range reg.registry { - modules = append(modules, m) + for _, m := range reg.registry { + modules = append(modules, m.config.Module) } return modules } @@ -440,10 +437,9 @@ func (reg *ModuleRegistry) ModuleAvailableFilesets(module string) ([]string, err // ModuleConfiguredFilesets return the list of configured filesets for the given module // it returns an empty list if the module doesn't exist -func (reg *ModuleRegistry) ModuleConfiguredFilesets(module string) (list []string, err error) { - filesets, _ := reg.registry[module] - for name := range filesets { - list = append(list, name) +func (reg *ModuleRegistry) ModuleConfiguredFilesets(module Module) (list []string) { + for _, fileset := range module.filesets { + list = append(list, fileset.name) } - return + return list } diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index d023c0aec83..82daecacef2 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -85,37 +85,37 @@ func TestNewModuleRegistry(t *testing.T) { require.NoError(t, err) assert.NotNil(t, reg) - expectedModules := map[string][]string{ - "auditd": {"log"}, - "nginx": {"access", "error"}, - "mysql": {"slowlog", "error"}, - "system": {"syslog", "auth"}, + expectedModules := []map[string][]string{ + {"nginx": {"access", "error"}}, + {"mysql": {"slowlog", "error"}}, + {"system": {"syslog", "auth"}}, + {"auditd": {"log"}}, } - assert.Equal(t, len(expectedModules), len(reg.registry)) - for name, filesets := range reg.registry { - expectedFilesets, exists := expectedModules[name] + for i, module := range reg.registry { + expectedFilesets, exists := expectedModules[i][module.config.Module] assert.True(t, exists) - assert.Equal(t, len(expectedFilesets), len(filesets)) - for _, fileset := range expectedFilesets { - fs := filesets[fileset] - assert.NotNil(t, fs) + assert.Equal(t, len(expectedFilesets), len(module.filesets)) + var filesetList []string + for _, fileset := range module.filesets { + filesetList = append(filesetList, fileset.name) } + assert.Equal(t, filesetList, expectedFilesets) } - for module, filesets := range reg.registry { - for name, fileset := range filesets { + for _, module := range reg.registry { + for _, fileset := range module.filesets { cfg, err := fileset.getInputConfig() - require.NoError(t, err, fmt.Sprintf("module: %s, fileset: %s", module, name)) + require.NoError(t, err, fmt.Sprintf("module: %s, fileset: %s", module.config.Module, fileset.name)) moduleName, err := cfg.String("_module_name", -1) require.NoError(t, err) - assert.Equal(t, module, moduleName) + assert.Equal(t, module.config.Module, moduleName) filesetName, err := cfg.String("_fileset_name", -1) require.NoError(t, err) - assert.Equal(t, name, filesetName) + assert.Equal(t, fileset.name, filesetName) } } } @@ -150,12 +150,13 @@ func TestNewModuleRegistryConfig(t *testing.T) { require.NoError(t, err) assert.NotNil(t, reg) - nginxAccess := reg.registry["nginx"]["access"] + nginxAccess := reg.registry[0].filesets[0] if assert.NotNil(t, nginxAccess) { assert.Equal(t, []interface{}{"/hello/test"}, nginxAccess.vars["paths"]) } - - assert.NotContains(t, reg.registry["nginx"], "error") + for _, fileset := range reg.registry[0].filesets { + assert.NotEqual(t, fileset.name, "error") + } } func TestMovedModule(t *testing.T) { diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index c93825ad09e..03ee9200a97 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -60,35 +60,35 @@ func (m MultiplePipelineUnsupportedError) Error() string { // LoadPipelines loads the pipelines for each configured fileset. func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool) error { - for module, filesets := range reg.registry { - for name, fileset := range filesets { + for _, module := range reg.registry { + for _, fileset := range module.filesets { // check that all the required Ingest Node plugins are available requiredProcessors := fileset.GetRequiredProcessors() reg.log.Debugf("Required processors: %s", requiredProcessors) if len(requiredProcessors) > 0 { err := checkAvailableProcessors(esClient, requiredProcessors) if err != nil { - return fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module, name, err) + return fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module.config.Module, fileset.name, err) } } pipelines, err := fileset.GetPipelines(esClient.GetVersion()) if err != nil { - return fmt.Errorf("error getting pipeline for fileset %s/%s: %v", module, name, err) + return fmt.Errorf("error getting pipeline for fileset %s/%s: %v", module.config.Module, fileset.name, err) } // Filesets with multiple pipelines can only be supported by Elasticsearch >= 6.5.0 esVersion := esClient.GetVersion() minESVersionRequired := common.MustNewVersion("6.5.0") if len(pipelines) > 1 && esVersion.LessThan(minESVersionRequired) { - return MultiplePipelineUnsupportedError{module, name, esVersion, *minESVersionRequired} + return MultiplePipelineUnsupportedError{module.config.Module, fileset.name, esVersion, *minESVersionRequired} } var pipelineIDsLoaded []string for _, pipeline := range pipelines { err = LoadPipeline(esClient, pipeline.id, pipeline.contents, overwrite, reg.log.With("pipeline", pipeline.id)) if err != nil { - err = fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module, name, err) + err = fmt.Errorf("error loading pipeline for fileset %s/%s: %v", module.config.Module, fileset.name, err) break } pipelineIDsLoaded = append(pipelineIDsLoaded, pipeline.id) diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index c7adf9cf78a..45c136481e7 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -69,7 +69,7 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { }, IngestPipeline: []string{"pipeline-plain.json", "pipeline-json.json"}, } - testFileset := &Fileset{ + testFileset := Fileset{ name: "fls", modulePath: "./test/mod", manifest: testFilesetManifest, @@ -79,9 +79,11 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { pipelineIDs: []string{"filebeat-7.0.0-mod-fls-pipeline-plain", "filebeat-7.0.0-mod-fls-pipeline-json"}, } testRegistry := ModuleRegistry{ - registry: map[string]map[string]*Fileset{ - "mod": map[string]*Fileset{ - "fls": testFileset, + registry: []Module{ + { + filesets: []Fileset{ + testFileset, + }, }, }, log: logp.NewLogger(logName),