Skip to content

Commit

Permalink
Add regex pattern matching to add_kubernetes_metadata processor (#41903
Browse files Browse the repository at this point in the history
…) (#42085)

* Add regex pattern matching to add_kubernetes_metadata processor

* Add changelog entry

* Add documentation

* Fix changelog

(cherry picked from commit da7bbf6)

Co-authored-by: Mikołaj Świątek <[email protected]>
  • Loading branch information
mergify[bot] and swiatekm authored Dec 18, 2024
1 parent 073be89 commit 305edac
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 17 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843]
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]
- The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669]
- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415]
- The script processor has a new configuration option that only uses the cached javascript sessions and prevents the creation of new javascript sessions.
- Update to Go 1.22.7. {pull}41018[41018]
- Replace Ubuntu 20.04 with 24.04 for Docker base images {issue}40743[40743] {pull}40942[40942]
- Reduce memory consumption of k8s autodiscovery and the add_kubernetes_metadata processor when Deployment metadata is enabled
- Add `lowercase` processor. {issue}22254[22254] {pull}41424[41424]
- Add `uppercase` processor. {issue}22254[22254] {pull}41535[41535]
- Replace `compress/gzip` with https://github.com/klauspost/compress/gzip library for gzip compression {pull}41584[41584]
- Add regex pattern matching to add_kubernetes_metadata processor {pull}41903[41903]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,27 @@ processors:
lookup_fields: ['destination.ip', 'server.ip']
-------------------------------------------------------------------------------

It's also possible to extract the matching key from fields using a regex pattern.
The optional `regex_pattern` field can be used to set the pattern. The pattern
*must* contain a capture group named `key`, whose value will be used as the matching key.

For example, the following configuration uses the `container` indexer to identify
containers by their id, and extracts the matching key from the cgroup id field added
to system process metrics. This field has the form `cri-containerd-<id>.scope`, so
we need a regex pattern to obtain the container id.

[source,yaml]
-------------------------------------------------------------------------------
processors:
- add_kubernetes_metadata:
indexers:
- container:
matchers:
- fields:
lookup_fields: ['system.process.cgroup.id']
regex_pattern: 'cri-containerd-(?P<key>[0-9a-z]+)\.scope'
-------------------------------------------------------------------------------

ifdef::has_kubernetes_logs_path_matcher[]
===== `logs_path`

Expand Down
62 changes: 45 additions & 17 deletions libbeat/processors/add_kubernetes_metadata/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package add_kubernetes_metadata

import (
"fmt"
"regexp"
"slices"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
Expand All @@ -32,6 +34,7 @@ import (
const (
FieldMatcherName = "fields"
FieldFormatMatcherName = "field_format"
regexKeyGroupName = "key"
)

// Matcher takes a new event and returns the index
Expand Down Expand Up @@ -87,42 +90,67 @@ func (m *Matchers) MetadataIndex(event mapstr.M) string {
}

func (m *Matchers) Empty() bool {
if len(m.matchers) == 0 {
return true
}

return false
return len(m.matchers) == 0
}

type FieldMatcher struct {
MatchFields []string
Regexp *regexp.Regexp
}

func NewFieldMatcher(cfg config.C) (Matcher, error) {
config := struct {
matcherConfig := struct {
LookupFields []string `config:"lookup_fields"`
RegexPattern string `config:"regex_pattern"`
}{}

err := cfg.Unpack(&config)
err := cfg.Unpack(&matcherConfig)
if err != nil {
return nil, fmt.Errorf("fail to unpack the `lookup_fields` configuration: %s", err)
return nil, fmt.Errorf("fail to unpack the fields matcher configuration: %w", err)
}

if len(config.LookupFields) == 0 {
if len(matcherConfig.LookupFields) == 0 {
return nil, fmt.Errorf("lookup_fields can not be empty")
}

return &FieldMatcher{MatchFields: config.LookupFields}, nil
if len(matcherConfig.RegexPattern) == 0 {
return &FieldMatcher{MatchFields: matcherConfig.LookupFields}, nil
}
regex, err := regexp.Compile(matcherConfig.RegexPattern)
if err != nil {
return nil, fmt.Errorf("invalid regex: %w", err)
}

captureGroupNames := regex.SubexpNames()
if !slices.Contains(captureGroupNames, regexKeyGroupName) {
return nil, fmt.Errorf("regex missing required capture group `key`")
}

return &FieldMatcher{MatchFields: matcherConfig.LookupFields, Regexp: regex}, nil
}

func (f *FieldMatcher) MetadataIndex(event mapstr.M) string {
for _, field := range f.MatchFields {
keyIface, err := event.GetValue(field)
if err == nil {
key, ok := keyIface.(string)
if ok {
return key
}
fieldIface, err := event.GetValue(field)
if err != nil {
continue
}
fieldValue, ok := fieldIface.(string)
if !ok {
continue
}
if f.Regexp == nil {
return fieldValue
}

matches := f.Regexp.FindStringSubmatch(fieldValue)
if matches == nil {
continue
}
keyIndex := f.Regexp.SubexpIndex(regexKeyGroupName)
key := matches[keyIndex]
if key != "" {
return key
}
}

Expand All @@ -140,7 +168,7 @@ func NewFieldFormatMatcher(cfg config.C) (Matcher, error) {

err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the `format` configuration of `field_format` matcher: %s", err)
return nil, fmt.Errorf("fail to unpack the `format` configuration of `field_format` matcher: %w", err)
}

if config.Format == "" {
Expand Down
45 changes: 45 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/matchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package add_kubernetes_metadata
import (
"testing"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/config"
Expand All @@ -35,6 +37,7 @@ func TestFieldMatcher(t *testing.T) {
assert.NoError(t, err)
matcher, err := NewFieldMatcher(*fieldCfg)
assert.Error(t, err)
assert.Nil(t, matcher)

testCfg["lookup_fields"] = "foo"
fieldCfg, _ = config.NewConfigFrom(testCfg)
Expand All @@ -58,13 +61,55 @@ func TestFieldMatcher(t *testing.T) {
assert.Equal(t, out, "")
}

func TestFieldMatcherRegex(t *testing.T) {
testCfg := map[string]interface{}{
"lookup_fields": []string{"foo"},
"regex_pattern": "(?!)",
}
fieldCfg, err := config.NewConfigFrom(testCfg)
assert.NoError(t, err)
matcher, err := NewFieldMatcher(*fieldCfg)
assert.ErrorContains(t, err, "invalid regex:")
assert.Nil(t, matcher)

testCfg["regex_pattern"] = "(?P<invalid>.*)"
fieldCfg, _ = config.NewConfigFrom(testCfg)

matcher, err = NewFieldMatcher(*fieldCfg)
assert.ErrorContains(t, err, "regex missing required capture group `key`")
assert.Nil(t, matcher)

testCfg["regex_pattern"] = "bar-(?P<key>[^-]+)-suffix"
fieldCfg, _ = config.NewConfigFrom(testCfg)

matcher, err = NewFieldMatcher(*fieldCfg)
require.NoError(t, err)
require.NotNil(t, matcher)

input := mapstr.M{
"foo": "bar-keyvalue-suffix",
}

out := matcher.MetadataIndex(input)
assert.Equal(t, out, "keyvalue")

nonMatchInput := mapstr.M{
"not": "match",
"foo": "nomatch",
}

out = matcher.MetadataIndex(nonMatchInput)
assert.Equal(t, out, "")
}

func TestFieldFormatMatcher(t *testing.T) {
testCfg := map[string]interface{}{}
fieldCfg, err := config.NewConfigFrom(testCfg)

assert.NoError(t, err)
matcher, err := NewFieldFormatMatcher(*fieldCfg)
assert.Error(t, err)
assert.Nil(t, matcher)

testCfg["format"] = `%{[namespace]}/%{[pod]}`
fieldCfg, _ = config.NewConfigFrom(testCfg)
Expand Down

0 comments on commit 305edac

Please sign in to comment.