From 9ae8f53328e9750ed4486c3ccd3df4cc51519a10 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 3 Apr 2024 15:58:42 +0300 Subject: [PATCH 01/21] feat(processors/process_metadata): support reporting group id and name --- .../add_process_metadata.go | 23 +++++++++++++------ .../processors/add_process_metadata/config.go | 4 ++++ .../gosysinfo_provider.go | 9 +++++++- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 60a533a8e77..beabb42fe42 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -68,13 +68,12 @@ type addProcessMetadata struct { } type processMetadata struct { - name, title, exe, username, userid string - args []string - env map[string]string - startTime time.Time - pid, ppid int - // - fields mapstr.M + name, title, exe, username, userid, groupname, groupid string + args []string + env map[string]string + startTime time.Time + pid, ppid int + fields mapstr.M } type processMetadataProvider interface { @@ -332,6 +331,16 @@ func (p *processMetadata) toMap() mapstr.M { } process["owner"] = user } + if p.groupname != "" || p.groupid != "" { + group := mapstr.M{} + if p.groupname != "" { + group["name"] = p.groupname + } + if p.groupid != "" { + group["id"] = p.groupid + } + process["group"] = group + } return mapstr.M{ "process": process, diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index f16ba6771a8..582712b12bc 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -85,6 +85,10 @@ var defaultFields = mapstr.M{ "name": nil, "id": nil, }, + "group": mapstr.M{ + "name": nil, + "id": nil, + }, }, "container": mapstr.M{ "id": nil, diff --git a/libbeat/processors/add_process_metadata/gosysinfo_provider.go b/libbeat/processors/add_process_metadata/gosysinfo_provider.go index ecc94233dce..90de4025a86 100644 --- a/libbeat/processors/add_process_metadata/gosysinfo_provider.go +++ b/libbeat/processors/add_process_metadata/gosysinfo_provider.go @@ -44,12 +44,17 @@ func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, env, _ = e.Environment() } - username, userid := "", "" + username, userid, groupname, groupid := "", "", "", "" if userInfo, err := proc.User(); err == nil { userid = userInfo.UID if u, err := user.LookupId(userInfo.UID); err == nil { username = u.Username } + + groupid = userInfo.GID + if g, err := user.LookupGroupId(userInfo.GID); err == nil { + groupname = g.Name + } } r := processMetadata{ @@ -63,6 +68,8 @@ func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, startTime: info.StartTime, username: username, userid: userid, + groupname: groupname, + groupid: groupid, } r.fields = r.toMap() return &r, nil From d022a61549369516d3d2059bb2038e9a14687ac1 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 3 Apr 2024 16:36:10 +0300 Subject: [PATCH 02/21] feat(processors/process_metadata): support reporting process entity_id --- .../add_process_metadata.go | 25 +++++++--- .../add_process_metadata_test.go | 24 ++++++--- .../processors/add_process_metadata/cache.go | 2 +- .../add_process_metadata/cache_test.go | 5 +- .../processors/add_process_metadata/config.go | 1 + .../gosysinfo_provider.go | 49 +++++++++++++++++++ 6 files changed, 90 insertions(+), 16 deletions(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index beabb42fe42..168a9f291d0 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" + "github.com/elastic/go-sysinfo" ) const ( @@ -65,15 +66,19 @@ type addProcessMetadata struct { cidProvider cidProvider log *logp.Logger mappings mapstr.M + uniqueID []byte } type processMetadata struct { - name, title, exe, username, userid, groupname, groupid string - args []string - env map[string]string - startTime time.Time - pid, ppid int - fields mapstr.M + entityID string + name, title, exe string + username, userid string + groupname, groupid string + args []string + env map[string]string + startTime time.Time + pid, ppid int + fields mapstr.M } type processMetadataProvider interface { @@ -133,6 +138,13 @@ func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetada log: log, mappings: mappings, } + + if host, _ := sysinfo.Host(); host != nil { + if uniqueID := host.Info().UniqueID; uniqueID != "" { + p.uniqueID = []byte(uniqueID) + } + } + // don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled if ok := containsValue(mappings, "container.id"); ok { if withCache && config.CgroupCacheExpireTime != 0 { @@ -310,6 +322,7 @@ func (p *addProcessMetadata) String() string { func (p *processMetadata) toMap() mapstr.M { process := mapstr.M{ + "entity_id": p.entityID, "name": p.name, "title": p.title, "executable": p.exe, diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index 9dd1a7eb4dd..9e7675ca991 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -43,10 +43,11 @@ func TestAddProcessMetadata(t *testing.T) { startTime := time.Now() testProcs := testProvider{ 1: { - name: "systemd", - title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", - exe: "/usr/lib/systemd/systemd", - args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, + name: "systemd", + entityID: "XCOVE56SVVEOKBNX", + title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", + exe: "/usr/lib/systemd/systemd", + args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, env: map[string]string{ "HOME": "/", "TERM": "linux", @@ -60,10 +61,11 @@ func TestAddProcessMetadata(t *testing.T) { userid: "0", }, 3: { - name: "systemd", - title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", - exe: "/usr/lib/systemd/systemd", - args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, + name: "systemd", + entityID: "XCOVE56SVVEOKBNX", + title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", + exe: "/usr/lib/systemd/systemd", + args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, env: map[string]string{ "HOME": "/", "TERM": "linux", @@ -150,6 +152,7 @@ func TestAddProcessMetadata(t *testing.T) { }, "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -235,6 +238,7 @@ func TestAddProcessMetadata(t *testing.T) { "parent": mapstr.M{ "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -269,6 +273,7 @@ func TestAddProcessMetadata(t *testing.T) { "parent": mapstr.M{ "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -310,6 +315,7 @@ func TestAddProcessMetadata(t *testing.T) { "parent": mapstr.M{ "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -508,6 +514,7 @@ func TestAddProcessMetadata(t *testing.T) { }, "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -633,6 +640,7 @@ func TestAddProcessMetadata(t *testing.T) { }, "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, diff --git a/libbeat/processors/add_process_metadata/cache.go b/libbeat/processors/add_process_metadata/cache.go index e3435aa92af..713c2bb730e 100644 --- a/libbeat/processors/add_process_metadata/cache.go +++ b/libbeat/processors/add_process_metadata/cache.go @@ -53,7 +53,7 @@ func (pc *processCache) getEntryUnlocked(pid int) (entry processCacheEntry, vali if entry, valid = pc.cache[pid]; valid { valid = entry.expiration.After(time.Now()) } - return + return entry, valid } func (pc *processCache) GetProcessMetadata(pid int) (*processMetadata, error) { diff --git a/libbeat/processors/add_process_metadata/cache_test.go b/libbeat/processors/add_process_metadata/cache_test.go index 9d9886d9932..882349b6ae6 100644 --- a/libbeat/processors/add_process_metadata/cache_test.go +++ b/libbeat/processors/add_process_metadata/cache_test.go @@ -21,6 +21,8 @@ import ( "math/rand" "testing" "time" + + "github.com/stretchr/testify/require" ) var cacheEvictionTests = []struct { @@ -96,7 +98,8 @@ func TestCacheEviction(t *testing.T) { for i := 0; i < test.iters; i++ { pid := rnd.Intn(test.maxPID) - c.GetProcessMetadata(pid) + _, err := c.GetProcessMetadata(pid) + require.NoError(t, err) if len(c.cache) > test.cap { t.Errorf("cache overflow for %s after %d iterations", test.name, i) break diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index 582712b12bc..e06a005b0b8 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -80,6 +80,7 @@ var defaultFields = mapstr.M{ "parent": mapstr.M{ "pid": nil, }, + "entity_id": nil, "start_time": nil, "owner": mapstr.M{ "name": nil, diff --git a/libbeat/processors/add_process_metadata/gosysinfo_provider.go b/libbeat/processors/add_process_metadata/gosysinfo_provider.go index 90de4025a86..0fc4f63ae26 100644 --- a/libbeat/processors/add_process_metadata/gosysinfo_provider.go +++ b/libbeat/processors/add_process_metadata/gosysinfo_provider.go @@ -18,13 +18,29 @@ package add_process_metadata import ( + "crypto/sha256" + "encoding/base64" + "encoding/binary" "os/user" "strings" + "sync" + "time" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" ) +var hostInfoOnce = sync.OnceValues(func() ([]byte, error) { + host, err := sysinfo.Host() + if err == nil { + if uniqueID := host.Info().UniqueID; uniqueID != "" { + return []byte(uniqueID), err + } + } + + return nil, err +}) + type gosysinfoProvider struct{} func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, err error) { @@ -57,7 +73,10 @@ func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, } } + eID, _ := entityID(pid, info.StartTime) + r := processMetadata{ + entityID: eID, name: info.Name, args: info.Args, env: env, @@ -71,6 +90,36 @@ func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, groupname: groupname, groupid: groupid, } + r.fields = r.toMap() return &r, nil } + +// entityID creates an ID that uniquely identifies this process across machines. +func entityID(pid int, start time.Time) (string, error) { + uniqueID, err := hostInfoOnce() + if err != nil && len(uniqueID) == 0 { + return "", err + } + + if len(uniqueID) == 0 || start.IsZero() { + return "", nil + } + + h := sha256.New() + if _, err := h.Write(uniqueID); err != nil { + return "", err + } + if err := binary.Write(h, binary.LittleEndian, int64(pid)); err != nil { + return "", err + } + if err := binary.Write(h, binary.LittleEndian, int64(start.Nanosecond())); err != nil { + return "", err + } + + sum := h.Sum(nil) + if len(sum) > 12 { + sum = sum[:12] + } + return base64.RawStdEncoding.EncodeToString(sum), nil +} From 79be3a649869e0255c651d3c2bc3114c060b6d26 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 3 Apr 2024 16:00:25 +0300 Subject: [PATCH 03/21] feat(fim/kprobes): allow metricsSets to expose beat processors after initialisation --- .../file_integrity/eventreader_kprobes.go | 47 +++++++++++++++++++ .../file_integrity/eventreader_linux.go | 6 +-- auditbeat/module/file_integrity/metricset.go | 20 ++++++++ metricbeat/mb/module/connector.go | 11 +++++ metricbeat/mb/module/factory.go | 9 ++++ 5 files changed, 88 insertions(+), 5 deletions(-) diff --git a/auditbeat/module/file_integrity/eventreader_kprobes.go b/auditbeat/module/file_integrity/eventreader_kprobes.go index 7cddd7f60cd..095c4325131 100644 --- a/auditbeat/module/file_integrity/eventreader_kprobes.go +++ b/auditbeat/module/file_integrity/eventreader_kprobes.go @@ -23,11 +23,16 @@ import ( "errors" "fmt" "path/filepath" + "sync" "time" "github.com/elastic/beats/v7/auditbeat/module/file_integrity/kprobes" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" "golang.org/x/sys/unix" ) @@ -39,6 +44,41 @@ type kProbesReader struct { log *logp.Logger parsers []FileParser + + processor beat.Processor +} + +var processMetadataProcessorOnce = sync.OnceValues(func() (beat.Processor, error) { + config, err := conf.NewConfigFrom(mapstr.M{ + "match_pids": []string{"process.pid"}, + "overwrite_keys": true, + }) + + if err != nil { + return nil, err + } + + return add_process_metadata.NewWithCache(config) +}) + +func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) { + + processor, err := processMetadataProcessorOnce() + if err != nil { + return nil, err + } + + return &kProbesReader{ + config: config, + eventC: make(chan Event), + log: l, + parsers: parsers, + processor: processor, + }, nil +} + +func (r kProbesReader) Processor() beat.Processor { + return r.processor } func (r kProbesReader) Start(done <-chan struct{}) (<-chan Event, error) { @@ -152,6 +192,13 @@ func (r kProbesReader) nextEvent(done <-chan struct{}) *Event { start := time.Now() e := NewEvent(event.Path, kProbeTypeToAction(event.Op), SourceKProbes, r.config.MaxFileSizeBytes, r.config.HashTypes, r.parsers) + + if e.Process == nil { + e.Process = &Process{} + } + + e.Process.PID = event.PID + e.rtt = time.Since(start) return &e diff --git a/auditbeat/module/file_integrity/eventreader_linux.go b/auditbeat/module/file_integrity/eventreader_linux.go index ac9ce7de60d..c6b3d330c77 100644 --- a/auditbeat/module/file_integrity/eventreader_linux.go +++ b/auditbeat/module/file_integrity/eventreader_linux.go @@ -58,11 +58,7 @@ func NewEventReader(c Config, logger *logp.Logger) (EventProducer, error) { if c.Backend == BackendKprobes { l := logger.Named("kprobes") l.Info("selected backend: kprobes") - return &kProbesReader{ - config: c, - log: l, - parsers: FileParsers(c), - }, nil + return newKProbesReader(c, l, FileParsers(c)) } // unimplemented diff --git a/auditbeat/module/file_integrity/metricset.go b/auditbeat/module/file_integrity/metricset.go index eeaaa67b365..2efb19d8a6b 100644 --- a/auditbeat/module/file_integrity/metricset.go +++ b/auditbeat/module/file_integrity/metricset.go @@ -27,6 +27,7 @@ import ( bolt "go.etcd.io/bbolt" "github.com/elastic/beats/v7/auditbeat/datastore" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" "github.com/elastic/elastic-agent-libs/logp" @@ -62,6 +63,11 @@ type EventProducer interface { Start(done <-chan struct{}) (<-chan Event, error) } +// eventProducerWithProcessor is an EventProducer that requires a Processor +type eventProducerWithProcessor interface { + Processor() beat.Processor +} + // MetricSet for monitoring file integrity. type MetricSet struct { mb.BaseMetricSet @@ -78,6 +84,9 @@ type MetricSet struct { // Used when a hash can't be calculated nullHashes map[HashType]Digest + + // Processors + processors []beat.Processor } // New returns a new file.MetricSet. @@ -105,6 +114,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { log: logger, } + // reader supports a processor + if rWithProcessor, ok := r.(eventProducerWithProcessor); ok { + if proc := rWithProcessor.Processor(); proc != nil { + ms.processors = append(ms.processors, proc) + } + } + ms.nullHashes = make(map[HashType]Digest, len(config.HashTypes)) for _, hashType := range ms.config.HashTypes { // One byte is enough so that the hashes are persisted to the datastore. @@ -117,6 +133,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return ms, nil } +func (ms *MetricSet) Processors() []beat.Processor { + return ms.processors +} + // Run runs the MetricSet. The method will not return control to the caller // until it is finished (to stop it close the reporter.Done() channel). func (ms *MetricSet) Run(reporter mb.PushReporterV2) { diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 6e6b0ca6113..1bfe84bcfec 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -97,6 +97,17 @@ func (c *Connector) UseMetricSetProcessors(r metricSetRegister, moduleName, metr return nil } +// addProcessors appends processors to the connector properties. +func (c *Connector) addProcessors(procs []beat.Processor) { + if c.processors == nil { + c.processors = processors.NewList(nil) + } + + for _, p := range procs { + c.processors.AddProcessor(p) + } +} + func (c *Connector) Connect() (beat.Client, error) { return c.pipeline.ConnectWith(beat.ClientConfig{ Processing: beat.ProcessingConfig{ diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index be8999a84d1..da6c7dfdce5 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -31,6 +31,11 @@ type Factory struct { options []Option } +// metricSetWithProcessors is an interface to check if a MetricSet has directly attached Processors +type metricSetWithProcessors interface { + Processors() []beat.Processor +} + // NewFactory creates new Reloader instance for the given config func NewFactory(beatInfo beat.Info, options ...Option) *Factory { return &Factory{ @@ -63,6 +68,10 @@ func (r *Factory) Create(p beat.PipelineConnector, c *conf.C) (cfgfile.Runner, e return nil, err } + if msWithProcs, ok := metricSet.(metricSetWithProcessors); ok { + connector.addProcessors(msWithProcs.Processors()) + } + client, err := connector.Connect() if err != nil { return nil, err From de77ec47abb80b652468d7ba529c0f40ff1cf1e0 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 3 Apr 2024 17:05:41 +0300 Subject: [PATCH 04/21] doc: update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 377c3c9f133..1a88215ab81 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -150,6 +150,11 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add opt-in eBPF backend for file_integrity module. {pull}37223[37223] - Add process data to file events (Linux only, eBPF backend). {pull}38199[38199] - Add container id to file events (Linux only, eBPF backend). {pull}38328[38328] +- Add linux capabilities to processes in the system/process. {pull}37453[37453] +- Add opt-in eBPF backend for file_integrity module. {pull}37223[37223] +- Add process data to file events (Linux only, eBPF backend). {pull}38199[38199] +- Add container id to file events (Linux only, eBPF backend). {pull}38328[38328] +- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38716[38716] *Filebeat* From 4f3149d68a0924273b5a01cd151b989d34f778b1 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Apr 2024 09:13:43 +0300 Subject: [PATCH 05/21] fix(linter): SA1015 prevent leaking the ticker --- metricbeat/mb/module/testing.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metricbeat/mb/module/testing.go b/metricbeat/mb/module/testing.go index 6d903fe62e5..779a630dd96 100644 --- a/metricbeat/mb/module/testing.go +++ b/metricbeat/mb/module/testing.go @@ -36,8 +36,11 @@ func receiveOneEvent(d testing.Driver, events <-chan beat.Event, timeout time.Du go func() { defer close(done) + ticker := time.NewTicker(timeout) + defer ticker.Stop() + select { - case <-time.Tick(timeout): + case <-ticker.C: d.Error("error", errors.New("timeout waiting for an event")) case event, ok := <-events: if !ok { From fe8973e64f435822b085de1d9506e814992fa3ca Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Apr 2024 09:15:25 +0300 Subject: [PATCH 06/21] fix(linter): SA1019 mark metricbeat/mb deprecation warnings that are not removed yet --- metricbeat/mb/module/wrapper.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 6df0aaa2364..d41bdf01497 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -199,13 +199,13 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { } switch ms := msw.MetricSet.(type) { - case mb.PushMetricSet: + case mb.PushMetricSet: //nolint:staticcheck // PushMetricSet is deprecated but not removed ms.Run(reporter.V1()) case mb.PushMetricSetV2: ms.Run(reporter.V2()) case mb.PushMetricSetV2WithContext: ms.Run(&channelContext{done}, reporter.V2()) - case mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext: + case mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext: //nolint:staticcheck // ReportingMetricSet is deprecated but not removed msw.startPeriodicFetching(&channelContext{done}, reporter) default: // Earlier startup stages prevent this from happening. @@ -242,7 +242,7 @@ func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter // and log a stack track if one occurs. func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { switch fetcher := msw.MetricSet.(type) { - case mb.ReportingMetricSet: + case mb.ReportingMetricSet: //nolint:staticcheck // ReportingMetricSet is deprecated but not removed reporter.StartFetchTimer() fetcher.Fetch(reporter.V1()) case mb.ReportingMetricSetV2: @@ -292,7 +292,7 @@ func (msw *metricSetWrapper) Test(d testing.Driver) { type reporter interface { StartFetchTimer() - V1() mb.PushReporter + V1() mb.PushReporter //nolint:staticcheck // PushReporter is deprecated but not removed V2() mb.PushReporterV2 } @@ -309,7 +309,7 @@ type eventReporter struct { // startFetchTimer demarcates the start of a new fetch. The elapsed time of a // fetch is computed based on the time of this call. func (r *eventReporter) StartFetchTimer() { r.start = time.Now() } -func (r *eventReporter) V1() mb.PushReporter { +func (r *eventReporter) V1() mb.PushReporter { //nolint:staticcheck // PushReporter is deprecated but not removed return reporterV1{v2: r.V2(), module: r.msw.module.Name()} } func (r *eventReporter) V2() mb.PushReporterV2 { return reporterV2{r} } From 8f9958697d7d634c0b52844914d00e91021eedd0 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Apr 2024 09:19:51 +0300 Subject: [PATCH 07/21] fix(linter): check for return err --- metricbeat/mb/module/configuration.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metricbeat/mb/module/configuration.go b/metricbeat/mb/module/configuration.go index 1e69d6094c4..5c1ef60c0f1 100644 --- a/metricbeat/mb/module/configuration.go +++ b/metricbeat/mb/module/configuration.go @@ -40,7 +40,9 @@ func ConfiguredModules(modulesData []*conf.C, configModulesData *conf.C, moduleO // Add dynamic modules if configModulesData.Enabled() { config := cfgfile.DefaultDynamicConfig - configModulesData.Unpack(&config) + if err := configModulesData.Unpack(&config); err != nil { + return nil, err + } modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") if err != nil { From ae86af4068e702324e57f6f439d77289cfda8c38 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Apr 2024 09:20:19 +0300 Subject: [PATCH 08/21] fix(linter): prealloc slices --- metricbeat/mb/module/configuration.go | 2 +- metricbeat/mb/module/factory.go | 2 +- metricbeat/mb/module/runner_group.go | 2 +- metricbeat/mb/module/runner_group_test.go | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/metricbeat/mb/module/configuration.go b/metricbeat/mb/module/configuration.go index 5c1ef60c0f1..65f3220a804 100644 --- a/metricbeat/mb/module/configuration.go +++ b/metricbeat/mb/module/configuration.go @@ -27,7 +27,7 @@ import ( // ConfiguredModules returns a list of all configured modules, including anyone present under dynamic config settings. func ConfiguredModules(modulesData []*conf.C, configModulesData *conf.C, moduleOptions []Option) ([]*Wrapper, error) { - var modules []*Wrapper + var modules []*Wrapper //nolint:prealloc //can't be preallocated for _, moduleCfg := range modulesData { module, err := NewWrapper(moduleCfg, mb.Registry, moduleOptions...) diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index da6c7dfdce5..60e91406bee 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -51,7 +51,7 @@ func (r *Factory) Create(p beat.PipelineConnector, c *conf.C) (cfgfile.Runner, e return nil, err } - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, len(metricSets)) for _, metricSet := range metricSets { wrapper, err := NewWrapperForMetricSet(module, metricSet, r.options...) if err != nil { diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index 542926325f6..e020cd87d55 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -57,7 +57,7 @@ func (rg *runnerGroup) Stop() { } func (rg *runnerGroup) String() string { - var entries []string + entries := make([]string, 0, len(rg.runners)) for _, runner := range rg.runners { entries = append(entries, runner.String()) } diff --git a/metricbeat/mb/module/runner_group_test.go b/metricbeat/mb/module/runner_group_test.go index 036396a3103..1d462359968 100644 --- a/metricbeat/mb/module/runner_group_test.go +++ b/metricbeat/mb/module/runner_group_test.go @@ -79,7 +79,7 @@ func TestStartStop(t *testing.T) { startCounter := atomic.NewInt(0) stopCounter := atomic.NewInt(0) - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, fakeRunnersNum) for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunner{ id: i, @@ -98,7 +98,7 @@ func TestStartStop(t *testing.T) { } func TestDiagnosticsUnsupported(t *testing.T) { - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, fakeRunnersNum) for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunner{ id: i, @@ -119,7 +119,7 @@ func TestDiagnosticsUnsupported(t *testing.T) { } func TestDiagosticsSupported(t *testing.T) { - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, fakeRunnersNum) for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunnerDiag{ id: i, @@ -134,7 +134,7 @@ func TestDiagosticsSupported(t *testing.T) { } func TestString(t *testing.T) { - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, fakeRunnersNum) for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunner{ id: i, From 46457aa804f86976105621ddf9e4bd316bd2c042 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Apr 2024 09:20:42 +0300 Subject: [PATCH 09/21] fix(linter): remove unused field --- metricbeat/mb/module/connector.go | 1 - 1 file changed, 1 deletion(-) diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 1bfe84bcfec..960280bc747 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -34,7 +34,6 @@ type Connector struct { pipeline beat.PipelineConnector processors *processors.Processors eventMeta mapstr.EventMetadata - timeSeries bool keepNull bool } From da3ca7c93071d5947f49f2fdea957c1d76def97a Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Apr 2024 09:23:12 +0300 Subject: [PATCH 10/21] fix(linter): G601 prevent implicit memory aliasing in for loop --- metricbeat/mb/module/connector_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metricbeat/mb/module/connector_test.go b/metricbeat/mb/module/connector_test.go index ed7008889c0..5079fbb23f9 100644 --- a/metricbeat/mb/module/connector_test.go +++ b/metricbeat/mb/module/connector_test.go @@ -67,7 +67,8 @@ func TestProcessorsForConfig(t *testing.T) { t.Errorf("[%s] %v", description, err) continue } - processedEvent, err := processors.Run(&test.event) + testEvent := testCases[description].event + processedEvent, err := processors.Run(&testEvent) // We don't check if err != nil, because we are testing the final outcome // of running the processors, including when some of them fail. if processedEvent == nil { From 63fd97da40ba86978e886181cd787071ca77ad9d Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Mon, 8 Apr 2024 21:00:44 +0300 Subject: [PATCH 11/21] doc: update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1a88215ab81..315490d14bb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -154,7 +154,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add opt-in eBPF backend for file_integrity module. {pull}37223[37223] - Add process data to file events (Linux only, eBPF backend). {pull}38199[38199] - Add container id to file events (Linux only, eBPF backend). {pull}38328[38328] -- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38716[38716] +- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776] *Filebeat* From 586d0274ceac4ada3683be6194d85c10547cc383 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 9 Apr 2024 13:22:41 +0300 Subject: [PATCH 12/21] fix: update filebaet fields.asciidoc (unrelated to this work) --- filebeat/docs/fields.asciidoc | 70 +++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index 5b03cdfb0a9..0c897ca3b1e 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -111136,6 +111136,13 @@ type: keyword -- +*`o365.audit.Activity`*:: ++ +-- +type: keyword + +-- + *`o365.audit.Actor`*:: + -- @@ -111360,6 +111367,13 @@ type: object -- +*`o365.audit.Experience`*:: ++ +-- +type: keyword + +-- + *`o365.audit.ExtendedProperties.*`*:: + -- @@ -111605,6 +111619,20 @@ type: keyword -- +*`o365.audit.ObjectDisplayName`*:: ++ +-- +type: keyword + +-- + +*`o365.audit.ObjectType`*:: ++ +-- +type: keyword + +-- + *`o365.audit.Operation`*:: + -- @@ -111612,6 +111640,20 @@ type: keyword -- +*`o365.audit.OperationId`*:: ++ +-- +type: keyword + +-- + +*`o365.audit.OperationProperties`*:: ++ +-- +type: object + +-- + *`o365.audit.OrganizationId`*:: + -- @@ -111661,6 +111703,13 @@ type: keyword -- +*`o365.audit.RequestId`*:: ++ +-- +type: keyword + +-- + *`o365.audit.ResultStatus`*:: + -- @@ -111801,6 +111850,13 @@ type: keyword -- +*`o365.audit.Timestamp`*:: ++ +-- +type: keyword + +-- + *`o365.audit.UniqueSharingId`*:: + -- @@ -111857,6 +111913,20 @@ type: keyword -- +*`o365.audit.WorkspaceId`*:: ++ +-- +type: keyword + +-- + +*`o365.audit.WorkspaceName`*:: ++ +-- +type: keyword + +-- + *`o365.audit.YammerNetworkId`*:: + -- From 740f35bd25a8d69dfb5f196676d917ace8a756ee Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 9 Apr 2024 17:24:02 +0300 Subject: [PATCH 13/21] doc: remove irrelevant changes from CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 4 ---- 1 file changed, 4 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 52ccc974059..66eccbedcbb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -151,10 +151,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add opt-in eBPF backend for file_integrity module. {pull}37223[37223] - Add process data to file events (Linux only, eBPF backend). {pull}38199[38199] - Add container id to file events (Linux only, eBPF backend). {pull}38328[38328] -- Add linux capabilities to processes in the system/process. {pull}37453[37453] -- Add opt-in eBPF backend for file_integrity module. {pull}37223[37223] -- Add process data to file events (Linux only, eBPF backend). {pull}38199[38199] -- Add container id to file events (Linux only, eBPF backend). {pull}38328[38328] - Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776] *Filebeat* From bbb9d4f562abb620f2e0b65fa5323584e86c9acf Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 12 Apr 2024 23:40:47 +0300 Subject: [PATCH 14/21] feat(processor/metadata): introduce new type based allocation func --- .../file_integrity/eventreader_kprobes.go | 13 +++----- .../add_process_metadata.go | 33 ++++++++++++++----- .../add_process_metadata_test.go | 26 ++++++++++----- .../processors/add_process_metadata/config.go | 14 ++++++++ 4 files changed, 61 insertions(+), 25 deletions(-) diff --git a/auditbeat/module/file_integrity/eventreader_kprobes.go b/auditbeat/module/file_integrity/eventreader_kprobes.go index 095c4325131..5e708cae785 100644 --- a/auditbeat/module/file_integrity/eventreader_kprobes.go +++ b/auditbeat/module/file_integrity/eventreader_kprobes.go @@ -30,10 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata" - conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" - "golang.org/x/sys/unix" ) @@ -49,16 +46,16 @@ type kProbesReader struct { } var processMetadataProcessorOnce = sync.OnceValues(func() (beat.Processor, error) { - config, err := conf.NewConfigFrom(mapstr.M{ - "match_pids": []string{"process.pid"}, - "overwrite_keys": true, - }) + processor, err := add_process_metadata.NewWithConfig( + add_process_metadata.ConfigOverwriteKeys(true), + add_process_metadata.ConfigMatchPIDs([]string{"process.pid"}), + ) if err != nil { return nil, err } - return add_process_metadata.NewWithCache(config) + return processor, nil }) func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) { diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 168a9f291d0..84427a1045a 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -96,33 +96,48 @@ func init() { // New constructs a new add_process_metadata processor. func New(cfg *conf.C) (beat.Processor, error) { - return newProcessMetadataProcessorWithProvider(cfg, &procCache, false) + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err) + } + + return newProcessMetadataProcessorWithProvider(config, &procCache, false) } // NewWithCache construct a new add_process_metadata processor with cache for container IDs. // Resulting processor implements `Close()` to release the cache resources. func NewWithCache(cfg *conf.C) (beat.Processor, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err) + } + + return newProcessMetadataProcessorWithProvider(config, &procCache, true) +} + +func NewWithConfig(opts ...ConfigOption) (beat.Processor, error) { + cfg := defaultConfig() + + for _, o := range opts { + o(&cfg) + } + return newProcessMetadataProcessorWithProvider(cfg, &procCache, true) } -func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) { +func newProcessMetadataProcessorWithProvider(config config, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) { // Logging (each processor instance has a unique ID). var ( id = int(instanceID.Inc()) log = logp.NewLogger(processorName).With("instance_id", id) ) - config := defaultConfig() - if err = cfg.Unpack(&config); err != nil { - return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err) - } - // If neither option is configured, then add a default. A default cgroup_regex // cannot be added to the struct returned by defaultConfig() because if // config_regex is set, it would take precedence over any user-configured // cgroup_prefixes. - hasCgroupPrefixes, _ := cfg.Has("cgroup_prefixes", -1) - hasCgroupRegex, _ := cfg.Has("cgroup_regex", -1) + hasCgroupPrefixes := len(config.CgroupPrefixes) > 0 + hasCgroupRegex := config.CgroupRegex != nil if !hasCgroupPrefixes && !hasCgroupRegex { config.CgroupRegex = defaultCgroupRegex } diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index 9e7675ca991..ef410ea8617 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -731,7 +731,7 @@ func TestAddProcessMetadata(t *testing.T) { config: mapstr.M{ "cgroup_regex": "", }, - initErr: errors.New("fail to unpack the add_process_metadata configuration: cgroup_regexp must contain exactly one capturing group for the container ID accessing config"), + initErr: errors.New("cgroup_regexp must contain exactly one capturing group for the container ID accessing config"), }, { description: "cgroup_prefixes configured", @@ -752,17 +752,23 @@ func TestAddProcessMetadata(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - config, err := conf.NewConfigFrom(test.config) - if err != nil { - t.Fatal(err) + configC, err := conf.NewConfigFrom(test.config) + assert.NoError(t, err) + + config := defaultConfig() + if err := configC.Unpack(&config); err != nil { + if test.initErr == nil { + t.Fatal(err) + } + assert.EqualError(t, err, test.initErr.Error()) + return } proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true) - if test.initErr == nil { - if err != nil { + if err != nil { + if test.initErr == nil { t.Fatal(err) } - } else { assert.EqualError(t, err, test.initErr.Error()) return } @@ -793,7 +799,11 @@ func TestAddProcessMetadata(t *testing.T) { "include_fields": []string{"process.name"}, } - config, err := conf.NewConfigFrom(c) + configC, err := conf.NewConfigFrom(c) + assert.NoError(t, err) + + config := defaultConfig() + err = configC.Unpack(&config) assert.NoError(t, err) proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true) diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index e06a005b0b8..5a6ada5b09e 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -119,6 +119,20 @@ func defaultConfig() config { } } +type ConfigOption func(c *config) + +func ConfigOverwriteKeys(overwriteKeys bool) ConfigOption { + return func(c *config) { + c.OverwriteKeys = overwriteKeys + } +} + +func ConfigMatchPIDs(matchPIDs []string) ConfigOption { + return func(c *config) { + c.MatchPIDs = matchPIDs + } +} + func (c *config) getMappings() (mappings mapstr.M, err error) { mappings = mapstr.M{} validFields := defaultFields From 0caf10d7937be5a8329e1741d714c518caa8ca1a Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 12 Apr 2024 23:41:49 +0300 Subject: [PATCH 15/21] feat(fim/kprobe): instantiate new processor alongside a new kprobes event reader --- .../module/file_integrity/eventreader_kprobes.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/auditbeat/module/file_integrity/eventreader_kprobes.go b/auditbeat/module/file_integrity/eventreader_kprobes.go index 5e708cae785..f125a52c132 100644 --- a/auditbeat/module/file_integrity/eventreader_kprobes.go +++ b/auditbeat/module/file_integrity/eventreader_kprobes.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "path/filepath" - "sync" "time" "github.com/elastic/beats/v7/auditbeat/module/file_integrity/kprobes" @@ -45,22 +44,12 @@ type kProbesReader struct { processor beat.Processor } -var processMetadataProcessorOnce = sync.OnceValues(func() (beat.Processor, error) { +func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) { + processor, err := add_process_metadata.NewWithConfig( add_process_metadata.ConfigOverwriteKeys(true), add_process_metadata.ConfigMatchPIDs([]string{"process.pid"}), ) - - if err != nil { - return nil, err - } - - return processor, nil -}) - -func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) { - - processor, err := processMetadataProcessorOnce() if err != nil { return nil, err } From ca94e5f28c0220b65fac3977a0b2f51d436cc75e Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 12 Apr 2024 23:47:48 +0300 Subject: [PATCH 16/21] fix(fim): remove redundant whitespace --- auditbeat/module/file_integrity/metricset.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auditbeat/module/file_integrity/metricset.go b/auditbeat/module/file_integrity/metricset.go index 2efb19d8a6b..dc4b7c9169c 100644 --- a/auditbeat/module/file_integrity/metricset.go +++ b/auditbeat/module/file_integrity/metricset.go @@ -114,7 +114,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { log: logger, } - // reader supports a processor + // reader supports a processor if rWithProcessor, ok := r.(eventProducerWithProcessor); ok { if proc := rWithProcessor.Processor(); proc != nil { ms.processors = append(ms.processors, proc) From e335b2f9ba3ea45bfd630705e7881ddb39b6a125 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Sat, 13 Apr 2024 00:02:50 +0300 Subject: [PATCH 17/21] doc(metricbeat): enrich documentation about Processors attached to a Metricbeat --- metricbeat/mb/module/factory.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 60e91406bee..dab4bc8816f 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -32,6 +32,9 @@ type Factory struct { } // metricSetWithProcessors is an interface to check if a MetricSet has directly attached Processors +// NOTE: Processors that implement the Closer interface are going to be closed from the pipeline when required, +// namely during dynamic configuration reloading. Thus, it is critical for the Metricset to always instantiate +// properly the processor and not consider it as always running. type metricSetWithProcessors interface { Processors() []beat.Processor } From 32c5c7c7b20e50723ab62b6ac8d594e80b5c11ad Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Sat, 13 Apr 2024 00:18:14 +0300 Subject: [PATCH 18/21] fix(fim): gofumpt eventreader_kprobes.go --- auditbeat/module/file_integrity/eventreader_kprobes.go | 1 - 1 file changed, 1 deletion(-) diff --git a/auditbeat/module/file_integrity/eventreader_kprobes.go b/auditbeat/module/file_integrity/eventreader_kprobes.go index f125a52c132..9eeb106167f 100644 --- a/auditbeat/module/file_integrity/eventreader_kprobes.go +++ b/auditbeat/module/file_integrity/eventreader_kprobes.go @@ -45,7 +45,6 @@ type kProbesReader struct { } func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) { - processor, err := add_process_metadata.NewWithConfig( add_process_metadata.ConfigOverwriteKeys(true), add_process_metadata.ConfigMatchPIDs([]string{"process.pid"}), From a121cd8136e669144ee867aa1d7fc27c6fc371c4 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Sat, 13 Apr 2024 00:32:38 +0300 Subject: [PATCH 19/21] fix(add_process_metadata): gofmt add_process_metadata.go gosysinfo_provider.go --- .../processors/add_process_metadata/add_process_metadata.go | 6 +++--- .../processors/add_process_metadata/gosysinfo_provider.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 0830964d455..8bb8ecea5a9 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -70,15 +70,15 @@ type addProcessMetadata struct { } type processMetadata struct { - entityID string + entityID string name, title, exe, username, userid string args []string env map[string]string startTime time.Time pid, ppid int - groupname, groupid string + groupname, groupid string capEffective, capPermitted []string - fields mapstr.M + fields mapstr.M } type processMetadataProvider interface { diff --git a/libbeat/processors/add_process_metadata/gosysinfo_provider.go b/libbeat/processors/add_process_metadata/gosysinfo_provider.go index ce7d25e5575..350acc81000 100644 --- a/libbeat/processors/add_process_metadata/gosysinfo_provider.go +++ b/libbeat/processors/add_process_metadata/gosysinfo_provider.go @@ -83,7 +83,7 @@ func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, capEffective, _ := capabilities.FromPid(capabilities.Effective, pid) r := processMetadata{ - entityID: eID, + entityID: eID, name: info.Name, args: info.Args, env: env, @@ -96,8 +96,8 @@ func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, startTime: info.StartTime, username: username, userid: userid, - groupname: groupname, - groupid: groupid, + groupname: groupname, + groupid: groupid, } r.fields = r.toMap() From 1fbddbfff2675d135867fb0d6979e7212e810e63 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Sat, 13 Apr 2024 02:45:26 +0300 Subject: [PATCH 20/21] fix(lint): goimports eventreader_kprobes.go --- auditbeat/module/file_integrity/eventreader_kprobes.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/auditbeat/module/file_integrity/eventreader_kprobes.go b/auditbeat/module/file_integrity/eventreader_kprobes.go index 9eeb106167f..e5cdd76f4b7 100644 --- a/auditbeat/module/file_integrity/eventreader_kprobes.go +++ b/auditbeat/module/file_integrity/eventreader_kprobes.go @@ -29,8 +29,9 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata" - "github.com/elastic/elastic-agent-libs/logp" "golang.org/x/sys/unix" + + "github.com/elastic/elastic-agent-libs/logp" ) type kProbesReader struct { From 7bfae2621d9cdc3735eea98f857222998b4bcdfb Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Sat, 13 Apr 2024 02:59:59 +0300 Subject: [PATCH 21/21] fix(winlogbeat): generate include list [unrelated to this PR] --- x-pack/winlogbeat/include/list.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/winlogbeat/include/list.go b/x-pack/winlogbeat/include/list.go index af2071e249e..6ee9c51eefb 100644 --- a/x-pack/winlogbeat/include/list.go +++ b/x-pack/winlogbeat/include/list.go @@ -7,7 +7,7 @@ package include import ( - // Import packages that need to register themselves. + // Import packages that perform 'func init()'. _ "github.com/elastic/beats/v7/x-pack/winlogbeat/module/powershell" _ "github.com/elastic/beats/v7/x-pack/winlogbeat/module/security" _ "github.com/elastic/beats/v7/x-pack/winlogbeat/module/sysmon"