Skip to content

Commit

Permalink
Fix ignoring external input configuration in take_over: true mode (e…
Browse files Browse the repository at this point in the history
…lastic#36395)

The take-over mode didn't take into account external configuration
files. Now it's reading the merged configuration that contains the
externally defined inputs too.
  • Loading branch information
rdner authored and Scholar-Li committed Feb 5, 2024
1 parent 98251b4 commit c6e90e7
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added a fix for Crowdstrike pipeline handling process arrays {pull}36496[36496]
- Ensure winlog input retains metric collection when handling recoverable errors. {issue}36479[36479] {pull}36483[36483]
- Revert error introduced in {pull}35734[35734] when symlinks can't be resolved in filestream. {pull}36557[36557]
- Fix ignoring external input configuration in `take_over: true` mode {issue}36378[36378] {pull}36395[36395]

*Heartbeat*

Expand Down
56 changes: 54 additions & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,17 @@ func newPipelineLoaderFactory(esConfig *conf.C) fileset.PipelineLoaderFactory {
// some of the filestreams might want to take over the loginput state
// if their `take_over` flag is set to `true`.
func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
inputs, err := fetchInputConfiguration(config)
if err != nil {
return fmt.Errorf("Failed to fetch input configuration when attempting take over: %w", err)
}
if len(inputs) == 0 {
return nil
}

store, err := stateStore.Access()
if err != nil {
return fmt.Errorf("Failed to access state for attempting take over: %w", err)
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
}
defer store.Close()
logger := logp.NewLogger("filestream-takeover")
Expand All @@ -514,5 +522,49 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {

backuper := backup.NewRegistryBackuper(logger, registryHome)

return takeover.TakeOverLogInputStates(logger, store, backuper, config)
return takeover.TakeOverLogInputStates(logger, store, backuper, inputs)
}

// fetches all the defined input configuration available at Filebeat startup including external files.
func fetchInputConfiguration(config *cfg.Config) (inputs []*conf.C, err error) {
if len(config.Inputs) == 0 {
inputs = []*conf.C{}
} else {
inputs = config.Inputs
}

// reading external input configuration if defined
var dynamicInputCfg cfgfile.DynamicConfig
if config.ConfigInput != nil {
err = config.ConfigInput.Unpack(&dynamicInputCfg)
if err != nil {
return nil, fmt.Errorf("failed to unpack the dynamic input configuration: %w", err)
}
}
if dynamicInputCfg.Path == "" {
return inputs, nil
}

cfgPaths, err := filepath.Glob(dynamicInputCfg.Path)
if err != nil {
return nil, fmt.Errorf("failed to resolve external input configuration paths: %w", err)
}

if len(cfgPaths) == 0 {
return inputs, nil
}

// making a copy so we can safely extend the slice
inputs = make([]*conf.C, len(config.Inputs))
copy(inputs, config.Inputs)

for _, p := range cfgPaths {
externalInputs, err := cfgfile.LoadList(p)
if err != nil {
return nil, fmt.Errorf("failed to load external input configuration: %w", err)
}
inputs = append(inputs, externalInputs...)
}

return inputs, nil
}
163 changes: 163 additions & 0 deletions filebeat/beater/filebeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package beater

import (
"os"
"path/filepath"
"testing"

"github.com/elastic/beats/v7/filebeat/config"
conf "github.com/elastic/elastic-agent-libs/config"

"github.com/stretchr/testify/require"
)

type inputEntry struct {
ID string `config:"id"`
}

func TestFetchInputConfiguration(t *testing.T) {
dir := t.TempDir()
err := os.WriteFile(filepath.Join(dir, "config1.yml"), []byte(`
- type: filestream
id: external-1
paths:
- "/some"
- type: filestream
id: external-2
paths:
- "/another"
`), 0777)
require.NoError(t, err)
err = os.WriteFile(filepath.Join(dir, "config2.yml.disabled"), []byte(`
- type: filestream
id: disabled
paths:
- "/some"
`), 0777)
require.NoError(t, err)

cases := []struct {
name string
configFile string
expected []inputEntry
}{
{
name: "loads mixed configuration",
configFile: `
filebeat.config.inputs:
enabled: true
path: ` + dir + `/*.yml
filebeat.inputs:
- type: filestream
id: internal
paths:
- "/another"
output.console:
enabled: true
`,
expected: []inputEntry{
{
ID: "internal",
},
{
ID: "external-1",
},
{
ID: "external-2",
},
},
},
{
name: "loads only internal configuration",
configFile: `
filebeat.inputs:
- type: filestream
id: internal
paths:
- "/another"
output.console:
enabled: true
`,
expected: []inputEntry{
{
ID: "internal",
},
},
},
{
name: "loads only external configuration",
configFile: `
filebeat.config.inputs:
enabled: true
path: ` + dir + `/*.yml
output.console:
enabled: true
`,
expected: []inputEntry{
{
ID: "external-1",
},
{
ID: "external-2",
},
},
},
{
name: "loads nothing",
configFile: `
filebeat.config.inputs:
enabled: true
path: ` + dir + `/*.nothing
output.console:
enabled: true
`,
expected: []inputEntry{},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
rawConfig, err := conf.NewConfigFrom(tc.configFile)
require.NoError(t, err)

cfg := struct {
Filebeat config.Config `config:"filebeat"`
}{
Filebeat: config.DefaultConfig,
}
err = rawConfig.Unpack(&cfg)
require.NoError(t, err)

inputs, err := fetchInputConfiguration(&cfg.Filebeat)
require.NoError(t, err)

actual := []inputEntry{}

for _, i := range inputs {
var entry inputEntry
err := i.Unpack(&entry)
require.NoError(t, err)
actual = append(actual, entry)
}

require.Equal(t, tc.expected, actual)
})
}
}
10 changes: 5 additions & 5 deletions filebeat/input/filestream/takeover/takeover.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"strings"

"github.com/elastic/beats/v7/filebeat/backup"
cfg "github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/backend"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand All @@ -47,8 +47,8 @@ type filestreamMatchers map[string]func(source string) bool
//
// This mode is created for a smooth loginput->filestream migration experience, so the filestream
// inputs would pick up ingesting files from the same point where a loginput stopped.
func TakeOverLogInputStates(log *logp.Logger, store backend.Store, backuper backup.Backuper, cfg *cfg.Config) error {
filestreamMatchers, err := findFilestreams(log, cfg)
func TakeOverLogInputStates(log *logp.Logger, store backend.Store, backuper backup.Backuper, inputsCfg []*conf.C) error {
filestreamMatchers, err := findFilestreams(log, inputsCfg)
if err != nil {
return fmt.Errorf("failed to read input configuration: %w", err)
}
Expand Down Expand Up @@ -141,10 +141,10 @@ func takeOverStates(log *logp.Logger, store backend.Store, matchers filestreamMa
// findFilestreams finds filestream inputs that are marked as `take_over: true`
// and creates a file matcher for each such filestream for the future use in state
// processing
func findFilestreams(log *logp.Logger, cfg *cfg.Config) (matchers filestreamMatchers, err error) {
func findFilestreams(log *logp.Logger, inputs []*conf.C) (matchers filestreamMatchers, err error) {
matchers = make(filestreamMatchers)

for _, input := range cfg.Inputs {
for _, input := range inputs {
inputCfg := defaultInputConfig()
err := input.Unpack(&inputCfg)
if err != nil {
Expand Down
Loading

0 comments on commit c6e90e7

Please sign in to comment.