Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor #38776

Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9ae8f53
feat(processors/process_metadata): support reporting group id and name
pkoutsovasilis Apr 3, 2024
d022a61
feat(processors/process_metadata): support reporting process entity_id
pkoutsovasilis Apr 3, 2024
79be3a6
feat(fim/kprobes): allow metricsSets to expose beat processors after …
pkoutsovasilis Apr 3, 2024
de77ec4
doc: update CHANGELOG.next.asciidoc
pkoutsovasilis Apr 3, 2024
4f3149d
fix(linter): SA1015 prevent leaking the ticker
pkoutsovasilis Apr 4, 2024
fe8973e
fix(linter): SA1019 mark metricbeat/mb deprecation warnings that are …
pkoutsovasilis Apr 4, 2024
8f99586
fix(linter): check for return err
pkoutsovasilis Apr 4, 2024
ae86af4
fix(linter): prealloc slices
pkoutsovasilis Apr 4, 2024
46457aa
fix(linter): remove unused field
pkoutsovasilis Apr 4, 2024
da3ca7c
fix(linter): G601 prevent implicit memory aliasing in for loop
pkoutsovasilis Apr 4, 2024
63fd97d
doc: update CHANGELOG.next.asciidoc
pkoutsovasilis Apr 8, 2024
b68c2df
Merge remote-tracking branch 'refs/remotes/beats/main' into pkoutsova…
pkoutsovasilis Apr 9, 2024
586d027
fix: update filebaet fields.asciidoc (unrelated to this work)
pkoutsovasilis Apr 9, 2024
740f35b
doc: remove irrelevant changes from CHANGELOG.next.asciidoc
pkoutsovasilis Apr 9, 2024
bbb9d4f
feat(processor/metadata): introduce new type based allocation func
pkoutsovasilis Apr 12, 2024
0caf10d
feat(fim/kprobe): instantiate new processor alongside a new kprobes e…
pkoutsovasilis Apr 12, 2024
ca94e5f
fix(fim): remove redundant whitespace
pkoutsovasilis Apr 12, 2024
e335b2f
doc(metricbeat): enrich documentation about Processors attached to a …
pkoutsovasilis Apr 12, 2024
89b5128
Merge remote-tracking branch 'refs/remotes/beats/main' into pkoutsova…
pkoutsovasilis Apr 12, 2024
32c5c7c
fix(fim): gofumpt eventreader_kprobes.go
pkoutsovasilis Apr 12, 2024
a121cd8
fix(add_process_metadata): gofmt add_process_metadata.go gosysinfo_pr…
pkoutsovasilis Apr 12, 2024
1fbddbf
fix(lint): goimports eventreader_kprobes.go
pkoutsovasilis Apr 12, 2024
7bfae26
fix(winlogbeat): generate include list [unrelated to this PR]
pkoutsovasilis Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add opt-in eBPF backend for file_integrity module. {pull}37223[37223]
- Add process data to file events (Linux only, eBPF backend). {pull}38199[38199]
- Add container id to file events (Linux only, eBPF backend). {pull}38328[38328]
- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776]

*Filebeat*

Expand Down
34 changes: 33 additions & 1 deletion auditbeat/module/file_integrity/eventreader_kprobes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
"time"

"github.com/elastic/beats/v7/auditbeat/module/file_integrity/kprobes"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors/add_process_metadata"

"github.com/elastic/elastic-agent-libs/logp"

"golang.org/x/sys/unix"
)

Expand All @@ -39,6 +40,30 @@ type kProbesReader struct {
log *logp.Logger

parsers []FileParser

processor beat.Processor
}

func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) {
processor, err := add_process_metadata.NewWithConfig(
add_process_metadata.ConfigOverwriteKeys(true),
add_process_metadata.ConfigMatchPIDs([]string{"process.pid"}),
)
if err != nil {
return nil, err
}

return &kProbesReader{
config: config,
eventC: make(chan Event),
log: l,
parsers: parsers,
processor: processor,
}, nil
}

func (r kProbesReader) Processor() beat.Processor {
return r.processor
}

func (r kProbesReader) Start(done <-chan struct{}) (<-chan Event, error) {
Expand Down Expand Up @@ -152,6 +177,13 @@ func (r kProbesReader) nextEvent(done <-chan struct{}) *Event {
start := time.Now()
e := NewEvent(event.Path, kProbeTypeToAction(event.Op), SourceKProbes,
r.config.MaxFileSizeBytes, r.config.HashTypes, r.parsers)

if e.Process == nil {
e.Process = &Process{}
}

e.Process.PID = event.PID

e.rtt = time.Since(start)

return &e
Expand Down
6 changes: 1 addition & 5 deletions auditbeat/module/file_integrity/eventreader_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ func NewEventReader(c Config, logger *logp.Logger) (EventProducer, error) {
if c.Backend == BackendKprobes {
l := logger.Named("kprobes")
l.Info("selected backend: kprobes")
return &kProbesReader{
config: c,
log: l,
parsers: FileParsers(c),
}, nil
return newKProbesReader(c, l, FileParsers(c))
}

// unimplemented
Expand Down
20 changes: 20 additions & 0 deletions auditbeat/module/file_integrity/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/elastic/beats/v7/auditbeat/ab"
"github.com/elastic/beats/v7/auditbeat/datastore"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -63,6 +64,11 @@ type EventProducer interface {
Start(done <-chan struct{}) (<-chan Event, error)
}

// eventProducerWithProcessor is an EventProducer that requires a Processor
type eventProducerWithProcessor interface {
Processor() beat.Processor
}

// MetricSet for monitoring file integrity.
type MetricSet struct {
mb.BaseMetricSet
Expand All @@ -79,6 +85,9 @@ type MetricSet struct {

// Used when a hash can't be calculated
nullHashes map[HashType]Digest

// Processors
processors []beat.Processor
}

// New returns a new file.MetricSet.
Expand Down Expand Up @@ -106,6 +115,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
log: logger,
}

// reader supports a processor
if rWithProcessor, ok := r.(eventProducerWithProcessor); ok {
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
if proc := rWithProcessor.Processor(); proc != nil {
ms.processors = append(ms.processors, proc)
}
}

ms.nullHashes = make(map[HashType]Digest, len(config.HashTypes))
for _, hashType := range ms.config.HashTypes {
// One byte is enough so that the hashes are persisted to the datastore.
Expand All @@ -118,6 +134,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return ms, nil
}

func (ms *MetricSet) Processors() []beat.Processor {
return ms.processors
}

// Run runs the MetricSet. The method will not return control to the caller
// until it is finished (to stop it close the reporter.Done() channel).
func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
Expand Down
56 changes: 46 additions & 10 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"github.com/elastic/go-sysinfo"
)

const (
Expand Down Expand Up @@ -65,16 +66,18 @@
cidProvider cidProvider
log *logp.Logger
mappings mapstr.M
uniqueID []byte
}

type processMetadata struct {
entityID string

Check failure on line 73 in libbeat/processors/add_process_metadata/add_process_metadata.go

View workflow job for this annotation

GitHub Actions / lint (windows)

File is not `goimports`-ed with -local github.com/elastic (goimports)
name, title, exe, username, userid string
args []string
env map[string]string
startTime time.Time
pid, ppid int
groupname, groupid string
capEffective, capPermitted []string
//
fields mapstr.M
}

Expand All @@ -93,33 +96,48 @@

// New constructs a new add_process_metadata processor.
func New(cfg *conf.C) (beat.Processor, error) {
return newProcessMetadataProcessorWithProvider(cfg, &procCache, false)
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

return newProcessMetadataProcessorWithProvider(config, &procCache, false)
}

// NewWithCache construct a new add_process_metadata processor with cache for container IDs.
// Resulting processor implements `Close()` to release the cache resources.
func NewWithCache(cfg *conf.C) (beat.Processor, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

return newProcessMetadataProcessorWithProvider(config, &procCache, true)
}

func NewWithConfig(opts ...ConfigOption) (beat.Processor, error) {
cfg := defaultConfig()

for _, o := range opts {
o(&cfg)
}

return newProcessMetadataProcessorWithProvider(cfg, &procCache, true)
}

func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) {
func newProcessMetadataProcessorWithProvider(config config, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) {
// Logging (each processor instance has a unique ID).
var (
id = int(instanceID.Inc())
log = logp.NewLogger(processorName).With("instance_id", id)
)

config := defaultConfig()
if err = cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

// If neither option is configured, then add a default. A default cgroup_regex
// cannot be added to the struct returned by defaultConfig() because if
// config_regex is set, it would take precedence over any user-configured
// cgroup_prefixes.
hasCgroupPrefixes, _ := cfg.Has("cgroup_prefixes", -1)
hasCgroupRegex, _ := cfg.Has("cgroup_regex", -1)
hasCgroupPrefixes := len(config.CgroupPrefixes) > 0
hasCgroupRegex := config.CgroupRegex != nil
if !hasCgroupPrefixes && !hasCgroupRegex {
config.CgroupRegex = defaultCgroupRegex
}
Expand All @@ -135,6 +153,13 @@
log: log,
mappings: mappings,
}

if host, _ := sysinfo.Host(); host != nil {
if uniqueID := host.Info().UniqueID; uniqueID != "" {
p.uniqueID = []byte(uniqueID)
}
}

// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if withCache && config.CgroupCacheExpireTime != 0 {
Expand Down Expand Up @@ -312,6 +337,7 @@

func (p *processMetadata) toMap() mapstr.M {
process := mapstr.M{
"entity_id": p.entityID,
"name": p.name,
"title": p.title,
"executable": p.exe,
Expand Down Expand Up @@ -339,6 +365,16 @@
if len(p.capPermitted) > 0 {
process.Put("thread.capabilities.permitted", p.capPermitted)
}
if p.groupname != "" || p.groupid != "" {
group := mapstr.M{}
if p.groupname != "" {
group["name"] = p.groupname
}
if p.groupid != "" {
group["id"] = p.groupid
}
process["group"] = group
}

return mapstr.M{
"process": process,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ func TestAddProcessMetadata(t *testing.T) {
startTime := time.Now()
testProcs := testProvider{
1: {
name: "systemd",
title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
exe: "/usr/lib/systemd/systemd",
args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
name: "systemd",
entityID: "XCOVE56SVVEOKBNX",
title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
exe: "/usr/lib/systemd/systemd",
args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
env: map[string]string{
"HOME": "/",
"TERM": "linux",
Expand All @@ -67,10 +68,11 @@ func TestAddProcessMetadata(t *testing.T) {
capPermitted: capMock,
},
3: {
name: "systemd",
title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
exe: "/usr/lib/systemd/systemd",
args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
name: "systemd",
entityID: "XCOVE56SVVEOKBNX",
title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
exe: "/usr/lib/systemd/systemd",
args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
env: map[string]string{
"HOME": "/",
"TERM": "linux",
Expand Down Expand Up @@ -159,6 +161,7 @@ func TestAddProcessMetadata(t *testing.T) {
},
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -250,6 +253,7 @@ func TestAddProcessMetadata(t *testing.T) {
"parent": mapstr.M{
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -290,6 +294,7 @@ func TestAddProcessMetadata(t *testing.T) {
"parent": mapstr.M{
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -337,6 +342,7 @@ func TestAddProcessMetadata(t *testing.T) {
"parent": mapstr.M{
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -541,6 +547,7 @@ func TestAddProcessMetadata(t *testing.T) {
},
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -672,6 +679,7 @@ func TestAddProcessMetadata(t *testing.T) {
},
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -768,7 +776,7 @@ func TestAddProcessMetadata(t *testing.T) {
config: mapstr.M{
"cgroup_regex": "",
},
initErr: errors.New("fail to unpack the add_process_metadata configuration: cgroup_regexp must contain exactly one capturing group for the container ID accessing config"),
initErr: errors.New("cgroup_regexp must contain exactly one capturing group for the container ID accessing config"),
},
{
description: "cgroup_prefixes configured",
Expand All @@ -789,17 +797,23 @@ func TestAddProcessMetadata(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
config, err := conf.NewConfigFrom(test.config)
if err != nil {
t.Fatal(err)
configC, err := conf.NewConfigFrom(test.config)
assert.NoError(t, err)

config := defaultConfig()
if err := configC.Unpack(&config); err != nil {
if test.initErr == nil {
t.Fatal(err)
}
assert.EqualError(t, err, test.initErr.Error())
return
}

proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true)
if test.initErr == nil {
if err != nil {
if err != nil {
if test.initErr == nil {
t.Fatal(err)
}
} else {
assert.EqualError(t, err, test.initErr.Error())
return
}
Expand Down Expand Up @@ -830,7 +844,11 @@ func TestAddProcessMetadata(t *testing.T) {
"include_fields": []string{"process.name"},
}

config, err := conf.NewConfigFrom(c)
configC, err := conf.NewConfigFrom(c)
assert.NoError(t, err)

config := defaultConfig()
err = configC.Unpack(&config)
assert.NoError(t, err)

proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/add_process_metadata/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (pc *processCache) getEntryUnlocked(pid int) (entry processCacheEntry, vali
if entry, valid = pc.cache[pid]; valid {
valid = entry.expiration.After(time.Now())
}
return
return entry, valid
}

func (pc *processCache) GetProcessMetadata(pid int) (*processMetadata, error) {
Expand Down
Loading
Loading