Skip to content

Commit

Permalink
Let users configure multiple instances of the same module from filebe…
Browse files Browse the repository at this point in the history
…at.yml (#29952)
  • Loading branch information
legoguy1000 authored Feb 10, 2022
1 parent e61173f commit 5617b65
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 96 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif

- tcp/unix input: Stop accepting connections after socket is closed. {pull}29712[29712]
- Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695]
- Fix multiple instances of the same module configured within `filebeat.modules` in filebeat.yml. {issue}29649[29649] {pull}29952[29952]
- aws-s3: fix race condition in states used by s3-poller. {issue}30123[30123] {pull}30131[30131]
- cisco module: Fix change the broke ASA and FTD configs that used `var.input: syslog`. {pull}30072[30072]
- Fix broken Kafka input {issue}29746[29746] {pull}30277[30277]


*Heartbeat*


*Metricbeat*

- Enhance metricbeat on openshift documentation {pull}30054[30054]
Expand Down
12 changes: 6 additions & 6 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ func (p *inputsRunner) Start() {
input.Start()
}

// Loop through and add modules, only 1 normally
for m := range p.moduleRegistry.registry {
moduleList.Add(m)
// Loop through and add modules
for _, module := range p.moduleRegistry.registry {
moduleList.Add(module.config.Module)
}
}

Expand All @@ -193,9 +193,9 @@ func (p *inputsRunner) Stop() {
input.Stop()
}

// Loop through and remove modules, only 1 normally
for m := range p.moduleRegistry.registry {
moduleList.Remove(m)
// Loop through and remove modules
for _, module := range p.moduleRegistry.registry {
moduleList.Remove(module.config.Module)
}
}

Expand Down
20 changes: 10 additions & 10 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
// Fileset struct is the representation of a fileset.
type Fileset struct {
name string
mcfg *ModuleConfig
mname string
fcfg *FilesetConfig
modulePath string
manifest *manifest
Expand All @@ -63,25 +63,25 @@ type pipeline struct {
func New(
modulesPath string,
name string,
mcfg *ModuleConfig,
mname string,
fcfg *FilesetConfig) (*Fileset, error) {

modulePath := filepath.Join(modulesPath, mcfg.Module)
modulePath := filepath.Join(modulesPath, mname)
if _, err := os.Stat(modulePath); os.IsNotExist(err) {
return nil, fmt.Errorf("module %s (%s) doesn't exist", mcfg.Module, modulePath)
return nil, fmt.Errorf("module %s (%s) doesn't exist", mname, modulePath)
}

return &Fileset{
name: name,
mcfg: mcfg,
mname: mname,
fcfg: fcfg,
modulePath: modulePath,
}, nil
}

// String returns the module and the name of the fileset.
func (fs *Fileset) String() string {
return fs.mcfg.Module + "/" + fs.name
return fs.mname + "/" + fs.name
}

// Read reads the manifest file and evaluates the variables.
Expand Down Expand Up @@ -336,7 +336,7 @@ func (fs *Fileset) getBuiltinVars(info beat.Info) (map[string]interface{}, error
"prefix": info.IndexPrefix,
"hostname": hostname,
"domain": domain,
"module": fs.mcfg.Module,
"module": fs.mname,
"fileset": fs.name,
"beatVersion": info.Version,
}, nil
Expand Down Expand Up @@ -391,7 +391,7 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) {
}

// force our the module/fileset name
err = cfg.SetString("_module_name", -1, fs.mcfg.Module)
err = cfg.SetString("_module_name", -1, fs.mname)
if err != nil {
return nil, fmt.Errorf("Error setting the _module_name cfg in the input config: %v", err)
}
Expand All @@ -400,7 +400,7 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) {
return nil, fmt.Errorf("Error setting the _fileset_name cfg in the input config: %v", err)
}

cfg.PrintDebugf("Merged input config for fileset %s/%s", fs.mcfg.Module, fs.name)
cfg.PrintDebugf("Merged input config for fileset %s/%s", fs.mname, fs.name)

return cfg, nil
}
Expand All @@ -414,7 +414,7 @@ func (fs *Fileset) getPipelineIDs(info beat.Info) ([]string, error) {
return nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

pipelineIDs = append(pipelineIDs, FormatPipelineID(info.IndexPrefix, fs.mcfg.Module, fs.name, path, info.Version))
pipelineIDs = append(pipelineIDs, FormatPipelineID(info.IndexPrefix, fs.mname, fs.name, path, info.Version))
}

return pipelineIDs, nil
Expand Down
6 changes: 3 additions & 3 deletions filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func makeTestInfo(version string) beat.Info {
func getModuleForTesting(t *testing.T, module, fileset string) *Fileset {
modulesPath, err := filepath.Abs("../module")
require.NoError(t, err)
fs, err := New(modulesPath, fileset, &ModuleConfig{Module: module}, &FilesetConfig{})
fs, err := New(modulesPath, fileset, module, &FilesetConfig{})
require.NoError(t, err)

return fs
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestEvaluateVarsNginx(t *testing.T) {
func TestEvaluateVarsNginxOverride(t *testing.T) {
modulesPath, err := filepath.Abs("../module")
require.NoError(t, err)
fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{
fs, err := New(modulesPath, "access", "nginx", &FilesetConfig{
Var: map[string]interface{}{
"pipeline": "no_plugins",
},
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestGetInputConfigNginxOverrides(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
fs, err := New(modulesPath, "access", &ModuleConfig{Module: "nginx"}, &FilesetConfig{
fs, err := New(modulesPath, "access", "nginx", &FilesetConfig{
Input: test.input,
})
require.NoError(t, err)
Expand Down
86 changes: 41 additions & 45 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,42 @@ import (
const logName = "modules"

type ModuleRegistry struct {
registry map[string]map[string]*Fileset // module -> fileset -> Fileset
registry []Module // []Module -> []Fileset
log *logp.Logger
}

type Module struct {
filesets []Fileset
config ModuleConfig
}

// newModuleRegistry reads and loads the configured module into the registry.
func newModuleRegistry(modulesPath string,
moduleConfigs []*ModuleConfig,
overrides *ModuleOverrides,
beatInfo beat.Info,
) (*ModuleRegistry, error) {
reg := ModuleRegistry{
registry: map[string]map[string]*Fileset{},
registry: []Module{},
log: logp.NewLogger(logName),
}

for _, mcfg := range moduleConfigs {
if mcfg.Enabled != nil && !(*mcfg.Enabled) {
if mcfg.Module == "" || (mcfg.Enabled != nil && !(*mcfg.Enabled)) {
continue
}

// Look for moved modules
if module, moved := getCurrentModuleName(modulesPath, mcfg.Module); moved {
reg.log.Warnf("Configuration uses the old name %q for module %q, please update your configuration.", mcfg.Module, module)
mcfg.Module = module
if moduleName, moved := getCurrentModuleName(modulesPath, mcfg.Module); moved {
reg.log.Warnf("Configuration uses the old name %q for module %q, please update your configuration.", mcfg.Module, moduleName)
mcfg.Module = moduleName
}

reg.registry[mcfg.Module] = map[string]*Fileset{}
moduleFilesets, err := getModuleFilesets(modulesPath, mcfg.Module)
if err != nil {
return nil, fmt.Errorf("error getting filesets for module %s: %v", mcfg.Module, err)
}

module := Module{
config: *mcfg,
filesets: []Fileset{},
}
for filesetName, fcfg := range mcfg.Filesets {

fcfg, err = applyOverrides(fcfg, mcfg.Module, filesetName, overrides)
Expand All @@ -89,29 +93,23 @@ func newModuleRegistry(modulesPath string,
return nil, fmt.Errorf("fileset %s/%s is configured but doesn't exist", mcfg.Module, filesetName)
}

fileset, err := New(modulesPath, filesetName, mcfg, fcfg)
fileset, err := New(modulesPath, filesetName, mcfg.Module, fcfg)
if err != nil {
return nil, err
}
if err = fileset.Read(beatInfo); err != nil {
return nil, fmt.Errorf("error reading fileset %s/%s: %v", mcfg.Module, filesetName, err)
}
reg.registry[mcfg.Module][filesetName] = fileset
module.filesets = append(module.filesets, *fileset)
}
reg.registry = append(reg.registry, module)
}

logp.Info("Enabled modules/filesets: %s", reg.InfoString())
for _, mod := range reg.ModuleNames() {
if mod == "" {
continue
}
filesets, err := reg.ModuleConfiguredFilesets(mod)
if err != nil {
logp.Err("Failed listing filesets for module %s", mod)
continue
}
reg.log.Infof("Enabled modules/filesets: %s", reg.InfoString())
for _, mod := range reg.registry {
filesets := reg.ModuleConfiguredFilesets(mod)
if len(filesets) == 0 {
return nil, errors.Errorf("module %s is configured but has no enabled filesets", mod)
return nil, errors.Errorf("module %s is configured but has no enabled filesets", mod.config.Module)
}
}
return &reg, nil
Expand Down Expand Up @@ -322,12 +320,11 @@ func appendWithoutDuplicates(moduleConfigs []*ModuleConfig, modules []string) ([

func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) {
var result []*common.Config
for module, filesets := range reg.registry {
for name, fileset := range filesets {
for _, module := range reg.registry {
for _, fileset := range module.filesets {
fcfg, err := fileset.getInputConfig()
if err != nil {
return result, fmt.Errorf("error getting config for fileset %s/%s: %v",
module, name, err)
return result, fmt.Errorf("error getting config for fileset %s/%s: %v", module.config.Module, fileset.name, err)
}
result = append(result, fcfg)
}
Expand All @@ -339,18 +336,18 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*common.Config, error) {
// be shown to the user
func (reg *ModuleRegistry) InfoString() string {
var result string
for module, filesets := range reg.registry {
var filesetNames string
for name := range filesets {
for _, module := range reg.registry {
for _, fileset := range module.filesets {
var filesetNames string
if filesetNames != "" {
filesetNames += ", "
}
filesetNames += name
}
if result != "" {
result += ", "
filesetNames += fileset.name
if result != "" {
result += ", "
}
result += fmt.Sprintf("%s (%s)", module.config.Module, filesetNames)
}
result += fmt.Sprintf("%s (%s)", module, filesetNames)
}
return result
}
Expand Down Expand Up @@ -416,17 +413,17 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc

func (reg *ModuleRegistry) Empty() bool {
count := 0
for _, filesets := range reg.registry {
count += len(filesets)
for _, module := range reg.registry {
count += len(module.filesets)
}
return count == 0
}

// ModuleNames returns the names of modules in the ModuleRegistry.
func (reg *ModuleRegistry) ModuleNames() []string {
var modules []string
for m := range reg.registry {
modules = append(modules, m)
for _, m := range reg.registry {
modules = append(modules, m.config.Module)
}
return modules
}
Expand All @@ -440,10 +437,9 @@ func (reg *ModuleRegistry) ModuleAvailableFilesets(module string) ([]string, err

// ModuleConfiguredFilesets return the list of configured filesets for the given module
// it returns an empty list if the module doesn't exist
func (reg *ModuleRegistry) ModuleConfiguredFilesets(module string) (list []string, err error) {
filesets, _ := reg.registry[module]
for name := range filesets {
list = append(list, name)
func (reg *ModuleRegistry) ModuleConfiguredFilesets(module Module) (list []string) {
for _, fileset := range module.filesets {
list = append(list, fileset.name)
}
return
return list
}
41 changes: 21 additions & 20 deletions filebeat/fileset/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,37 +85,37 @@ func TestNewModuleRegistry(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, reg)

expectedModules := map[string][]string{
"auditd": {"log"},
"nginx": {"access", "error"},
"mysql": {"slowlog", "error"},
"system": {"syslog", "auth"},
expectedModules := []map[string][]string{
{"nginx": {"access", "error"}},
{"mysql": {"slowlog", "error"}},
{"system": {"syslog", "auth"}},
{"auditd": {"log"}},
}

assert.Equal(t, len(expectedModules), len(reg.registry))
for name, filesets := range reg.registry {
expectedFilesets, exists := expectedModules[name]
for i, module := range reg.registry {
expectedFilesets, exists := expectedModules[i][module.config.Module]
assert.True(t, exists)

assert.Equal(t, len(expectedFilesets), len(filesets))
for _, fileset := range expectedFilesets {
fs := filesets[fileset]
assert.NotNil(t, fs)
assert.Equal(t, len(expectedFilesets), len(module.filesets))
var filesetList []string
for _, fileset := range module.filesets {
filesetList = append(filesetList, fileset.name)
}
assert.Equal(t, filesetList, expectedFilesets)
}

for module, filesets := range reg.registry {
for name, fileset := range filesets {
for _, module := range reg.registry {
for _, fileset := range module.filesets {
cfg, err := fileset.getInputConfig()
require.NoError(t, err, fmt.Sprintf("module: %s, fileset: %s", module, name))
require.NoError(t, err, fmt.Sprintf("module: %s, fileset: %s", module.config.Module, fileset.name))

moduleName, err := cfg.String("_module_name", -1)
require.NoError(t, err)
assert.Equal(t, module, moduleName)
assert.Equal(t, module.config.Module, moduleName)

filesetName, err := cfg.String("_fileset_name", -1)
require.NoError(t, err)
assert.Equal(t, name, filesetName)
assert.Equal(t, fileset.name, filesetName)
}
}
}
Expand Down Expand Up @@ -150,12 +150,13 @@ func TestNewModuleRegistryConfig(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, reg)

nginxAccess := reg.registry["nginx"]["access"]
nginxAccess := reg.registry[0].filesets[0]
if assert.NotNil(t, nginxAccess) {
assert.Equal(t, []interface{}{"/hello/test"}, nginxAccess.vars["paths"])
}

assert.NotContains(t, reg.registry["nginx"], "error")
for _, fileset := range reg.registry[0].filesets {
assert.NotEqual(t, fileset.name, "error")
}
}

func TestMovedModule(t *testing.T) {
Expand Down
Loading

0 comments on commit 5617b65

Please sign in to comment.