diff --git a/filebeat/input/journald/config.go b/filebeat/input/journald/config.go index 4e1c0b66da4d..e2ea723e51ac 100644 --- a/filebeat/input/journald/config.go +++ b/filebeat/input/journald/config.go @@ -22,20 +22,30 @@ package journald import ( "errors" + "sync" "time" + "github.com/elastic/go-ucfg" + "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield" "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalread" + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/reader/parser" ) +var ( + // includeMatchesWarnOnce allow for a config deprecation warning to be + // logged only once if an old config format is detected. + includeMatchesWarnOnce sync.Once +) + // Config stores the options of a journald input. type config struct { // Paths stores the paths to the journal files to be read. Paths []string `config:"paths"` // Backoff is the current interval to wait before - // attemting to read again from the journal. + // attempting to read again from the journal. Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` // MaxBackoff is the limit of the backoff time. @@ -48,7 +58,7 @@ type config struct { CursorSeekFallback journalread.SeekMode `config:"cursor_seek_fallback"` // Matches store the key value pairs to match entries. - Matches journalfield.IncludeMatches `config:"include_matches"` + Matches bwcIncludeMatches `config:"include_matches"` // Units stores the units to monitor. Units []string `config:"units"` @@ -66,6 +76,33 @@ type config struct { Parsers parser.Config `config:",inline"` } +// bwcIncludeMatches is a wrapper that accepts include_matches configuration +// from 7.x to allow old config to remain compatible. +type bwcIncludeMatches journalfield.IncludeMatches + +func (im *bwcIncludeMatches) Unpack(c *ucfg.Config) error { + // Handle 7.x config format in a backwards compatible manner. Old format: + // include_matches: [_SYSTEMD_UNIT=foo.service, _SYSTEMD_UNIT=bar.service] + if c.IsArray() { + var matches []journalfield.Matcher + if err := c.Unpack(&matches); err != nil { + return err + } + for _, x := range matches { + im.OR = append(im.OR, journalfield.IncludeMatches{ + Matches: []journalfield.Matcher{x}, + }) + } + includeMatchesWarnOnce.Do(func() { + cfgwarn.Deprecate("", "Please migrate your journald input's "+ + "include_matches config to the new more expressive format.") + }) + return nil + } + + return c.Unpack((*journalfield.IncludeMatches)(im)) +} + var errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback") func defaultConfig() config { diff --git a/filebeat/input/journald/config_test.go b/filebeat/input/journald/config_test.go new file mode 100644 index 000000000000..5bf3d2fc69b8 --- /dev/null +++ b/filebeat/input/journald/config_test.go @@ -0,0 +1,65 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build linux && cgo && withjournald +// +build linux,cgo,withjournald + +package journald + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestConfigIncludeMatches(t *testing.T) { + verify := func(t *testing.T, yml string) { + t.Helper() + + c, err := common.NewConfigWithYAML([]byte(yml), "source") + require.NoError(t, err) + + conf := defaultConfig() + require.NoError(t, c.Unpack(&conf)) + + assert.EqualValues(t, "_SYSTEMD_UNIT=foo.service", conf.Matches.OR[0].Matches[0].String()) + assert.EqualValues(t, "_SYSTEMD_UNIT=bar.service", conf.Matches.OR[1].Matches[0].String()) + } + + t.Run("normal", func(t *testing.T) { + const yaml = ` +include_matches: + or: + - match: _SYSTEMD_UNIT=foo.service + - match: _SYSTEMD_UNIT=bar.service +` + verify(t, yaml) + }) + + t.Run("backwards-compatible", func(t *testing.T) { + const yaml = ` +include_matches: + - _SYSTEMD_UNIT=foo.service + - _SYSTEMD_UNIT=bar.service +` + + verify(t, yaml) + }) +} diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index 41b6c649f90d..bf86aa59626a 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -107,7 +107,7 @@ func configure(cfg *common.Config) ([]cursor.Source, cursor.Input, error) { MaxBackoff: config.MaxBackoff, Seek: config.Seek, CursorSeekFallback: config.CursorSeekFallback, - Matches: config.Matches, + Matches: journalfield.IncludeMatches(config.Matches), Units: config.Units, Transports: config.Transports, Identifiers: config.Identifiers,