From 305edac91f6d9146cdde667c0eb8c8ef4254ac9f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 18 Dec 2024 13:10:05 +0100 Subject: [PATCH] Add regex pattern matching to add_kubernetes_metadata processor (#41903) (#42085) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add regex pattern matching to add_kubernetes_metadata processor * Add changelog entry * Add documentation * Fix changelog (cherry picked from commit da7bbf68ff9380602b48c4c62be8fbfea2638429) Co-authored-by: Mikołaj Świątek --- CHANGELOG.next.asciidoc | 9 +++ .../docs/indexers_and_matchers.asciidoc | 21 +++++++ .../add_kubernetes_metadata/matchers.go | 62 ++++++++++++++----- .../add_kubernetes_metadata/matchers_test.go | 45 ++++++++++++++ 4 files changed, 120 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5ecac8856b2..290ac194726 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -151,6 +151,15 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843] - Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572] - The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669] +- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415] +- The script processor has a new configuration option that only uses the cached javascript sessions and prevents the creation of new javascript sessions. +- Update to Go 1.22.7. {pull}41018[41018] +- Replace Ubuntu 20.04 with 24.04 for Docker base images {issue}40743[40743] {pull}40942[40942] +- Reduce memory consumption of k8s autodiscovery and the add_kubernetes_metadata processor when Deployment metadata is enabled +- Add `lowercase` processor. {issue}22254[22254] {pull}41424[41424] +- Add `uppercase` processor. {issue}22254[22254] {pull}41535[41535] +- Replace `compress/gzip` with https://github.com/klauspost/compress/gzip library for gzip compression {pull}41584[41584] +- Add regex pattern matching to add_kubernetes_metadata processor {pull}41903[41903] *Auditbeat* diff --git a/libbeat/processors/add_kubernetes_metadata/docs/indexers_and_matchers.asciidoc b/libbeat/processors/add_kubernetes_metadata/docs/indexers_and_matchers.asciidoc index 6f69c4803fb..bf205cc4846 100644 --- a/libbeat/processors/add_kubernetes_metadata/docs/indexers_and_matchers.asciidoc +++ b/libbeat/processors/add_kubernetes_metadata/docs/indexers_and_matchers.asciidoc @@ -74,6 +74,27 @@ processors: lookup_fields: ['destination.ip', 'server.ip'] ------------------------------------------------------------------------------- +It's also possible to extract the matching key from fields using a regex pattern. +The optional `regex_pattern` field can be used to set the pattern. The pattern +*must* contain a capture group named `key`, whose value will be used as the matching key. + +For example, the following configuration uses the `container` indexer to identify +containers by their id, and extracts the matching key from the cgroup id field added +to system process metrics. This field has the form `cri-containerd-.scope`, so +we need a regex pattern to obtain the container id. + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - add_kubernetes_metadata: + indexers: + - container: + matchers: + - fields: + lookup_fields: ['system.process.cgroup.id'] + regex_pattern: 'cri-containerd-(?P[0-9a-z]+)\.scope' +------------------------------------------------------------------------------- + ifdef::has_kubernetes_logs_path_matcher[] ===== `logs_path` diff --git a/libbeat/processors/add_kubernetes_metadata/matchers.go b/libbeat/processors/add_kubernetes_metadata/matchers.go index 75b3bbbf696..53df5835f78 100644 --- a/libbeat/processors/add_kubernetes_metadata/matchers.go +++ b/libbeat/processors/add_kubernetes_metadata/matchers.go @@ -19,6 +19,8 @@ package add_kubernetes_metadata import ( "fmt" + "regexp" + "slices" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/fmtstr" @@ -32,6 +34,7 @@ import ( const ( FieldMatcherName = "fields" FieldFormatMatcherName = "field_format" + regexKeyGroupName = "key" ) // Matcher takes a new event and returns the index @@ -87,42 +90,67 @@ func (m *Matchers) MetadataIndex(event mapstr.M) string { } func (m *Matchers) Empty() bool { - if len(m.matchers) == 0 { - return true - } - - return false + return len(m.matchers) == 0 } type FieldMatcher struct { MatchFields []string + Regexp *regexp.Regexp } func NewFieldMatcher(cfg config.C) (Matcher, error) { - config := struct { + matcherConfig := struct { LookupFields []string `config:"lookup_fields"` + RegexPattern string `config:"regex_pattern"` }{} - err := cfg.Unpack(&config) + err := cfg.Unpack(&matcherConfig) if err != nil { - return nil, fmt.Errorf("fail to unpack the `lookup_fields` configuration: %s", err) + return nil, fmt.Errorf("fail to unpack the fields matcher configuration: %w", err) } - if len(config.LookupFields) == 0 { + if len(matcherConfig.LookupFields) == 0 { return nil, fmt.Errorf("lookup_fields can not be empty") } - return &FieldMatcher{MatchFields: config.LookupFields}, nil + if len(matcherConfig.RegexPattern) == 0 { + return &FieldMatcher{MatchFields: matcherConfig.LookupFields}, nil + } + regex, err := regexp.Compile(matcherConfig.RegexPattern) + if err != nil { + return nil, fmt.Errorf("invalid regex: %w", err) + } + + captureGroupNames := regex.SubexpNames() + if !slices.Contains(captureGroupNames, regexKeyGroupName) { + return nil, fmt.Errorf("regex missing required capture group `key`") + } + + return &FieldMatcher{MatchFields: matcherConfig.LookupFields, Regexp: regex}, nil } func (f *FieldMatcher) MetadataIndex(event mapstr.M) string { for _, field := range f.MatchFields { - keyIface, err := event.GetValue(field) - if err == nil { - key, ok := keyIface.(string) - if ok { - return key - } + fieldIface, err := event.GetValue(field) + if err != nil { + continue + } + fieldValue, ok := fieldIface.(string) + if !ok { + continue + } + if f.Regexp == nil { + return fieldValue + } + + matches := f.Regexp.FindStringSubmatch(fieldValue) + if matches == nil { + continue + } + keyIndex := f.Regexp.SubexpIndex(regexKeyGroupName) + key := matches[keyIndex] + if key != "" { + return key } } @@ -140,7 +168,7 @@ func NewFieldFormatMatcher(cfg config.C) (Matcher, error) { err := cfg.Unpack(&config) if err != nil { - return nil, fmt.Errorf("fail to unpack the `format` configuration of `field_format` matcher: %s", err) + return nil, fmt.Errorf("fail to unpack the `format` configuration of `field_format` matcher: %w", err) } if config.Format == "" { diff --git a/libbeat/processors/add_kubernetes_metadata/matchers_test.go b/libbeat/processors/add_kubernetes_metadata/matchers_test.go index 48d6fae51f3..cd66ccc18ca 100644 --- a/libbeat/processors/add_kubernetes_metadata/matchers_test.go +++ b/libbeat/processors/add_kubernetes_metadata/matchers_test.go @@ -20,6 +20,8 @@ package add_kubernetes_metadata import ( "testing" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "github.com/elastic/elastic-agent-libs/config" @@ -35,6 +37,7 @@ func TestFieldMatcher(t *testing.T) { assert.NoError(t, err) matcher, err := NewFieldMatcher(*fieldCfg) assert.Error(t, err) + assert.Nil(t, matcher) testCfg["lookup_fields"] = "foo" fieldCfg, _ = config.NewConfigFrom(testCfg) @@ -58,6 +61,47 @@ func TestFieldMatcher(t *testing.T) { assert.Equal(t, out, "") } +func TestFieldMatcherRegex(t *testing.T) { + testCfg := map[string]interface{}{ + "lookup_fields": []string{"foo"}, + "regex_pattern": "(?!)", + } + fieldCfg, err := config.NewConfigFrom(testCfg) + assert.NoError(t, err) + matcher, err := NewFieldMatcher(*fieldCfg) + assert.ErrorContains(t, err, "invalid regex:") + assert.Nil(t, matcher) + + testCfg["regex_pattern"] = "(?P.*)" + fieldCfg, _ = config.NewConfigFrom(testCfg) + + matcher, err = NewFieldMatcher(*fieldCfg) + assert.ErrorContains(t, err, "regex missing required capture group `key`") + assert.Nil(t, matcher) + + testCfg["regex_pattern"] = "bar-(?P[^-]+)-suffix" + fieldCfg, _ = config.NewConfigFrom(testCfg) + + matcher, err = NewFieldMatcher(*fieldCfg) + require.NoError(t, err) + require.NotNil(t, matcher) + + input := mapstr.M{ + "foo": "bar-keyvalue-suffix", + } + + out := matcher.MetadataIndex(input) + assert.Equal(t, out, "keyvalue") + + nonMatchInput := mapstr.M{ + "not": "match", + "foo": "nomatch", + } + + out = matcher.MetadataIndex(nonMatchInput) + assert.Equal(t, out, "") +} + func TestFieldFormatMatcher(t *testing.T) { testCfg := map[string]interface{}{} fieldCfg, err := config.NewConfigFrom(testCfg) @@ -65,6 +109,7 @@ func TestFieldFormatMatcher(t *testing.T) { assert.NoError(t, err) matcher, err := NewFieldFormatMatcher(*fieldCfg) assert.Error(t, err) + assert.Nil(t, matcher) testCfg["format"] = `%{[namespace]}/%{[pod]}` fieldCfg, _ = config.NewConfigFrom(testCfg)