Skip to content

Commit

Permalink
add enable-all-filesets flag to setup command (#33114)
Browse files Browse the repository at this point in the history
* add `enable-all-filesets` flag to setup command

- use with `--pipelines` to enable all modules and filesets, to load
  all ingest pipelines.

Closes #30916

Co-authored-by: Craig MacKenzie <[email protected]>
  • Loading branch information
2 people authored and chrisberkhout committed Jun 1, 2023
1 parent 6536030 commit ae0a9f5
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

- Improve performance of disk queue by coalescing writes. {pull}31935[31935]
- Update `elastic/go-structform` from `v0.0.9` to `v0.0.10` to reduce memory usage. {pull}32536[32536]
- Added `--enable-all-filesets` to the `setup` command to simplify loading all ingest pipelines. {issue}30916[30916] {pull}33114[33114]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/autodiscover/builder/hints/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewLogHints(cfg *conf.C) (autodiscover.Builder, error) {
return nil, fmt.Errorf("unable to unpack hints config due to error: %w", err)
}

moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{}, false)
moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{}, false, false)
if err != nil {
return nil, err
}
Expand Down
13 changes: 11 additions & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
return nil, err
}

moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true)
moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -173,8 +173,17 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
// When running the subcommand setup, configuration from modules.d directories
// have to be loaded using cfg.Reloader. Otherwise those configurations are skipped.
pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config())
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory)
enableAllFilesets, _ := b.BeatConfig.Bool("config.modules.enable_all_filesets", -1)
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, enableAllFilesets)
if fb.config.ConfigModules.Enabled() {
if enableAllFilesets {
//All module configs need to be loaded to enable all the filesets
//contained in the modules. The default glob just loads the enabled
//ones. Switching the glob pattern from *.yml to * achieves this.
origPath, _ := fb.config.ConfigModules.String("path", -1)
newPath := strings.TrimSuffix(origPath, ".yml")
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
}
modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
}
Expand Down
52 changes: 43 additions & 9 deletions filebeat/docs/howto/load-ingest-pipelines.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,74 @@
The ingest pipelines used to parse log lines are set up automatically the first
time you run {beatname_uc}, assuming the {es} output is enabled. If you're sending
events to {ls} you need to load the ingest pipelines manually. To do this, run the
`setup` command with the `--pipelines` option specified. If you used the
<<modules-command,`modules`>> command to enable modules in the `modules.d`
directory, also specify the `--modules` flag. For example, the following command
loads the ingest pipelines used by all filesets enabled in the system, nginx,
and mysql modules:
`setup` command with the `--pipelines` option specified. You also need to enable
the modules and filesets, this can be accomplished one of two ways.

First you can use the `--modules` option to enable the module, and the
`-M` option to enable the fileset. For example, the following command
loads the access pipeline from the nginx module.

*deb and rpm:*

["source","sh",subs="attributes"]
----
{beatname_lc} setup --pipelines --modules nginx -M "nginx.access.enabled=true"
----

*mac:*

["source","sh",subs="attributes"]
----
./{beatname_lc} setup --pipelines --modules nginx -M "nginx.access.enabled=true"
----

*linux:*

["source","sh",subs="attributes"]
----
./{beatname_lc} setup --pipelines --modules nginx -M "nginx.access.enabled=true"
----

*win:*

["source","sh",subs="attributes"]
----
PS > .{backslash}{beatname_lc}.exe setup --pipelines --modules nginx -M "nginx.access.enabled=true"
----

The second option is to use the `--enable-all-filesets` option to
enable all the modules and all the filesets so all of the ingest
pipelines are loaded. For example, the following command loads all
the ingest pipelines.

//TODO: Replace with the platform tab widget.

*deb and rpm:*

["source","sh",subs="attributes"]
----
{beatname_lc} setup --pipelines --modules system,nginx,mysql
{beatname_lc} setup --pipelines --enable-all-filesets
----

*mac:*

["source","sh",subs="attributes"]
----
./{beatname_lc} setup --pipelines --modules system,nginx,mysql
./{beatname_lc} setup --pipelines --enable-all-filesets
----

*linux:*

["source","sh",subs="attributes"]
----
./{beatname_lc} setup --pipelines --modules system,nginx,mysql
./{beatname_lc} setup --pipelines --enable-all-filesets
----

*win:*

["source","sh",subs="attributes"]
----
PS > .{backslash}{beatname_lc}.exe setup --pipelines --modules system,nginx,mysql
PS > .{backslash}{beatname_lc}.exe setup --pipelines --enable-all-filesets
----

TIP: If you're loading ingest pipelines manually because you want to send events
Expand Down
2 changes: 1 addition & 1 deletion filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (f *Factory) CheckConfig(c *conf.C) error {
// createRegistry starts a registry for a set of filesets, it returns the registry and
// its input configurations
func (f *Factory) createRegistry(c *conf.C) (*ModuleRegistry, []*conf.C, error) {
m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false)
m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false, false)
if err != nil {
return nil, nil, err
}
Expand Down
46 changes: 29 additions & 17 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ package fileset

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -51,12 +51,19 @@ func newModuleRegistry(modulesPath string,
moduleConfigs []*ModuleConfig,
overrides *ModuleOverrides,
beatInfo beat.Info,
enableAllFilesets bool,
) (*ModuleRegistry, error) {
reg := ModuleRegistry{
registry: []Module{},
log: logp.NewLogger(logName),
}

for _, mcfg := range moduleConfigs {
// an empty ModuleConfig can reach this so we only force enable a
// config if the Module name is set and Enabled pointer is valid.
if enableAllFilesets && mcfg.Module != "" && mcfg.Enabled != nil {
*mcfg.Enabled = true
}
if mcfg.Module == "" || (mcfg.Enabled != nil && !(*mcfg.Enabled)) {
continue
}
Expand All @@ -67,7 +74,7 @@ func newModuleRegistry(modulesPath string,
}
moduleFilesets, err := getModuleFilesets(modulesPath, mcfg.Module)
if err != nil {
return nil, fmt.Errorf("error getting filesets for module %s: %v", mcfg.Module, err)
return nil, fmt.Errorf("error getting filesets for module %s: %w", mcfg.Module, err)
}
module := Module{
config: *mcfg,
Expand All @@ -77,9 +84,14 @@ func newModuleRegistry(modulesPath string,

fcfg, err = applyOverrides(fcfg, mcfg.Module, filesetName, overrides)
if err != nil {
return nil, fmt.Errorf("error applying overrides on fileset %s/%s: %v", mcfg.Module, filesetName, err)
return nil, fmt.Errorf("error applying overrides on fileset %s/%s: %w", mcfg.Module, filesetName, err)
}

// ModuleConfig can have empty Filesets so we only force
// enable if the Enabled pointer is valid
if enableAllFilesets && fcfg.Enabled != nil {
*fcfg.Enabled = true
}
if fcfg.Enabled != nil && !(*fcfg.Enabled) {
continue
}
Expand All @@ -98,7 +110,7 @@ func newModuleRegistry(modulesPath string,
return nil, err
}
if err = fileset.Read(beatInfo); err != nil {
return nil, fmt.Errorf("error reading fileset %s/%s: %v", mcfg.Module, filesetName, err)
return nil, fmt.Errorf("error reading fileset %s/%s: %w", mcfg.Module, filesetName, err)
}
module.filesets = append(module.filesets, *fileset)
}
Expand All @@ -109,14 +121,14 @@ func newModuleRegistry(modulesPath string,
for _, mod := range reg.registry {
filesets := reg.ModuleConfiguredFilesets(mod)
if len(filesets) == 0 {
return nil, errors.Errorf("module %s is configured but has no enabled filesets", mod.config.Module)
return nil, fmt.Errorf("module %s is configured but has no enabled filesets", mod.config.Module)
}
}
return &reg, nil
}

// NewModuleRegistry reads and loads the configured module into the registry.
func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool) (*ModuleRegistry, error) {
func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool, enableAllFilesets bool) (*ModuleRegistry, error) {
modulesPath := paths.Resolve(paths.Home, "module")

stat, err := os.Stat(modulesPath)
Expand All @@ -143,7 +155,7 @@ func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool) (

moduleConfig, err := mcfgFromConfig(cfg)
if err != nil {
return nil, errors.Wrap(err, "error unpacking module config")
return nil, fmt.Errorf("error unpacking module config :%w", err)
}
mcfgs = append(mcfgs, moduleConfig)
}
Expand All @@ -154,7 +166,7 @@ func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool) (
}

enableFilesetsFromOverrides(mcfgs, modulesOverrides)
return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatInfo)
return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatInfo, enableAllFilesets)
}

// enableFilesetsFromOverrides enables in mcfgs the filesets mentioned in overrides,
Expand Down Expand Up @@ -189,7 +201,7 @@ func mcfgFromConfig(cfg *conf.C) (*ModuleConfig, error) {

err = cfg.Unpack(&dict)
if err != nil {
return nil, fmt.Errorf("error unpacking module %s in a dict: %v", mcfg.Module, err)
return nil, fmt.Errorf("error unpacking module %s in a dict: %w", mcfg.Module, err)
}

mcfg.Filesets = map[string]*FilesetConfig{}
Expand All @@ -203,16 +215,16 @@ func mcfgFromConfig(cfg *conf.C) (*ModuleConfig, error) {
continue
}

filesetConfig, _ := dict[name] // Nil config if name is not present.
filesetConfig := dict[name] // Nil config if name is not present.

tmpCfg, err := conf.NewConfigFrom(filesetConfig)
if err != nil {
return nil, fmt.Errorf("error creating config from fileset %s/%s: %v", mcfg.Module, name, err)
return nil, fmt.Errorf("error creating config from fileset %s/%s: %w", mcfg.Module, name, err)
}

fcfg, err := NewFilesetConfig(tmpCfg)
if err != nil {
return nil, fmt.Errorf("error creating config from fileset %s/%s: %v", mcfg.Module, name, err)
return nil, fmt.Errorf("error creating config from fileset %s/%s: %w", mcfg.Module, name, err)
}
mcfg.Filesets[name] = fcfg
}
Expand Down Expand Up @@ -274,20 +286,20 @@ func applyOverrides(fcfg *FilesetConfig,

config, err := conf.NewConfigFrom(fcfg)
if err != nil {
return nil, fmt.Errorf("error creating vars config object: %v", err)
return nil, fmt.Errorf("error creating vars config object: %w", err)
}

toMerge := []*conf.C{config}
toMerge = append(toMerge, overridesConfigs...)

resultConfig, err := conf.MergeConfigs(toMerge...)
if err != nil {
return nil, fmt.Errorf("error merging configs: %v", err)
return nil, fmt.Errorf("error merging configs: %w", err)
}

res, err := NewFilesetConfig(resultConfig)
if err != nil {
return nil, fmt.Errorf("error unpacking configs: %v", err)
return nil, fmt.Errorf("error unpacking configs: %w", err)
}

return res, nil
Expand Down Expand Up @@ -324,7 +336,7 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*conf.C, error) {
for _, fileset := range module.filesets {
fcfg, err := fileset.getInputConfig()
if err != nil {
return result, fmt.Errorf("error getting config for fileset %s/%s: %v", module.config.Module, fileset.name, err)
return result, fmt.Errorf("error getting config for fileset %s/%s: %w", module.config.Module, fileset.name, err)
}
result = append(result, fcfg)
}
Expand Down Expand Up @@ -367,7 +379,7 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc
}
status, body, err := esClient.Request("GET", "/_nodes/ingest", "", nil, nil)
if err != nil {
return fmt.Errorf("error querying _nodes/ingest: %v", err)
return fmt.Errorf("error querying _nodes/ingest: %w", err)
}
if status > 299 {
return fmt.Errorf("error querying _nodes/ingest. Status: %d. Response body: %s", status, body)
Expand Down
6 changes: 3 additions & 3 deletions filebeat/fileset/modules_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestSetupNginx(t *testing.T) {
},
}

reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("5.2.0"))
reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("5.2.0"), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestLoadMultiplePipelines(t *testing.T) {
{"foo", &enabled, filesetConfigs},
}

reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"))
reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) {
{"foo", &enabled, filesetConfigs},
}

reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"))
reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"), false)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 4 additions & 4 deletions filebeat/fileset/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestNewModuleRegistry(t *testing.T) {
},
}

reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"})
reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}, false)
require.NoError(t, err)
assert.NotNil(t, reg)

Expand Down Expand Up @@ -149,7 +149,7 @@ func TestNewModuleRegistryConfig(t *testing.T) {
},
}

reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"})
reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}, false)
require.NoError(t, err)
assert.NotNil(t, reg)

Expand All @@ -175,7 +175,7 @@ func TestMovedModule(t *testing.T) {
},
}

reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"})
reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}, false)
require.NoError(t, err)
assert.NotNil(t, reg)
}
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestMissingModuleFolder(t *testing.T) {
load(t, map[string]interface{}{"module": "nginx"}),
}

reg, err := NewModuleRegistry(configs, beat.Info{Version: "5.2.0"}, true)
reg, err := NewModuleRegistry(configs, beat.Info{Version: "5.2.0"}, true, false)
require.NoError(t, err)
assert.NotNil(t, reg)

Expand Down
Loading

0 comments on commit ae0a9f5

Please sign in to comment.