Skip to content

Commit

Permalink
[add_session_metadata processor] Enrich events with user and group na…
Browse files Browse the repository at this point in the history
…mes (elastic#39537)

Update the add_session_metadata processor to add user and group names to enriched events, rather than just IDs, as it was doing previously.

This also renames the UpdateDB function to SyncDB. Previously this function was confusing, because it didn't always update the DB. With ebpf, the DB update is done separately. By renaming and updating the func comment, it should be more clear that the function should synchronize the DB so it's ready for enriching events, either by waiting until the DB is updated, or doing the synchronization itself, as it does with procfs backend.
  • Loading branch information
mjwolf authored May 15, 2024
1 parent 0ac96f4 commit 4f65d8c
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add procfs backend to the `add_session_metadata` processor. {pull}38799[38799]
- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776]
- Reduce data size for add_session_metadata processor by removing unneeded fields {pull}39500[39500]
- Enrich process events with user and group names, with add_session_metadata processor {pull}39537[39537]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) {
return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error
}

err = p.provider.UpdateDB(ev, pid)
err = p.provider.SyncDB(ev, pid)
if err != nil {
return ev, err
}
Expand Down
76 changes: 50 additions & 26 deletions x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ var (
Pgid: uint32(100),
Sid: uint32(40),
},
Creds: types.CredInfo{
Ruid: 0,
Euid: 0,
Suid: 0,
Rgid: 0,
Egid: 0,
Sgid: 0,
},
CWD: "/",
Filename: "/bin/ls",
},
Expand Down Expand Up @@ -78,12 +86,24 @@ var (
"pid": uint32(100),
"parent": mapstr.M{
"pid": uint32(50),
"user": mapstr.M{
"id": "0",
"name": "root",
},
},
"session_leader": mapstr.M{
"pid": uint32(40),
"user": mapstr.M{
"id": "0",
"name": "root",
},
},
"group_leader": mapstr.M{
"pid": uint32(100),
"user": mapstr.M{
"id": "0",
"name": "root",
},
},
},
},
Expand Down Expand Up @@ -318,33 +338,35 @@ var (

func TestEnrich(t *testing.T) {
for _, tt := range enrichTests {
reader := procfs.NewMockReader()
db, err := processdb.NewDB(reader, *logger)
require.Nil(t, err)
t.Run(tt.testName, func(t *testing.T) {
reader := procfs.NewMockReader()
db, err := processdb.NewDB(reader, *logger)
require.Nil(t, err)

for _, ev := range tt.mockProcesses {
db.InsertExec(ev)
}
s := addSessionMetadata{
logger: logger,
db: db,
config: tt.config,
}
for _, ev := range tt.mockProcesses {
db.InsertExec(ev)
}
s := addSessionMetadata{
logger: logger,
db: db,
config: tt.config,
}

// avoid taking address of loop variable
i := tt.input
actual, err := s.enrich(&i)
if tt.expect_error {
require.Error(t, err, "%s: error unexpectedly nil", tt.testName)
} else {
require.Nil(t, err, "%s: enrich error: %w", tt.testName, err)
require.NotNil(t, actual, "%s: returned nil event", tt.testName)
// avoid taking address of loop variable
i := tt.input
actual, err := s.enrich(&i)
if tt.expect_error {
require.Error(t, err, "%s: error unexpectedly nil", tt.testName)
} else {
require.Nil(t, err, "%s: enrich error: %w", tt.testName, err)
require.NotNil(t, actual, "%s: returned nil event", tt.testName)

//Validate output
if diff := cmp.Diff(tt.expected.Fields, actual.Fields, ignoreMissingFrom(tt.expected.Fields)); diff != "" {
t.Errorf("field mismatch:\n%s", diff)
//Validate output
if diff := cmp.Diff(tt.expected.Fields, actual.Fields, ignoreMissingFrom(tt.expected.Fields)); diff != "" {
t.Errorf("field mismatch:\n%s", diff)
}
}
}
})
}
}

Expand All @@ -364,8 +386,10 @@ func ignoreMissingFrom(m mapstr.M) cmp.Option {
// Note: This validates test code only
func TestFilter(t *testing.T) {
for _, tt := range filterTests {
if eq := cmp.Equal(tt.mx, tt.my, ignoreMissingFrom(tt.mx)); eq != tt.expected {
t.Errorf("%s: unexpected comparator result", tt.testName)
}
t.Run(tt.testName, func(t *testing.T) {
if eq := cmp.Equal(tt.mx, tt.my, ignoreMissingFrom(tt.mx)); eq != tt.expected {
t.Errorf("%s: unexpected comparator result", tt.testName)
}
})
}
}
40 changes: 40 additions & 0 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,15 @@ func fullProcessFromDBProcess(p Process) types.Process {
euid := p.Creds.Euid
egid := p.Creds.Egid
ret.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := getUserName(ret.User.ID)
if ok {
ret.User.Name = username
}
ret.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := getGroupName(ret.Group.ID)
if ok {
ret.Group.Name = groupname
}
ret.Thread.Capabilities.Permitted, _ = capabilities.FromUint64(p.Creds.CapPermitted)
ret.Thread.Capabilities.Effective, _ = capabilities.FromUint64(p.Creds.CapEffective)
ret.TTY.CharDevice.Major = p.CTTY.Major
Expand All @@ -471,7 +479,15 @@ func fillParent(process *types.Process, parent Process) {
process.Parent.WorkingDirectory = parent.Cwd
process.Parent.Interactive = &interactive
process.Parent.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := getUserName(process.Parent.User.ID)
if ok {
process.Parent.User.Name = username
}
process.Parent.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := getGroupName(process.Parent.Group.ID)
if ok {
process.Parent.Group.Name = groupname
}
}

func fillGroupLeader(process *types.Process, groupLeader Process) {
Expand All @@ -488,7 +504,15 @@ func fillGroupLeader(process *types.Process, groupLeader Process) {
process.GroupLeader.WorkingDirectory = groupLeader.Cwd
process.GroupLeader.Interactive = &interactive
process.GroupLeader.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := getUserName(process.GroupLeader.User.ID)
if ok {
process.GroupLeader.User.Name = username
}
process.GroupLeader.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := getGroupName(process.GroupLeader.Group.ID)
if ok {
process.GroupLeader.Group.Name = groupname
}
}

func fillSessionLeader(process *types.Process, sessionLeader Process) {
Expand All @@ -505,7 +529,15 @@ func fillSessionLeader(process *types.Process, sessionLeader Process) {
process.SessionLeader.WorkingDirectory = sessionLeader.Cwd
process.SessionLeader.Interactive = &interactive
process.SessionLeader.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := getUserName(process.SessionLeader.User.ID)
if ok {
process.SessionLeader.User.Name = username
}
process.SessionLeader.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := getGroupName(process.SessionLeader.Group.ID)
if ok {
process.SessionLeader.Group.Name = groupname
}
}

func fillEntryLeader(process *types.Process, entryType EntryType, entryLeader Process) {
Expand All @@ -522,7 +554,15 @@ func fillEntryLeader(process *types.Process, entryType EntryType, entryLeader Pr
process.EntryLeader.WorkingDirectory = entryLeader.Cwd
process.EntryLeader.Interactive = &interactive
process.EntryLeader.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := getUserName(process.EntryLeader.User.ID)
if ok {
process.EntryLeader.User.Name = username
}
process.EntryLeader.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := getGroupName(process.EntryLeader.Group.ID)
if ok {
process.EntryLeader.Group.Name = groupname
}

process.EntryLeader.EntryMeta.Type = string(entryType)
}
Expand Down
29 changes: 29 additions & 0 deletions x-pack/auditbeat/processors/sessionmd/processdb/names.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build linux

package processdb

import (
"os/user"
)

// getUserName will return the name associated with the user ID, if it exists
func getUserName(id string) (string, bool) {
user, err := user.LookupId(id)
if err != nil {
return "", false
}
return user.Username, true
}

// getGroupName will return the name associated with the group ID, if it exists
func getGroupName(id string) (string, bool) {
group, err := user.LookupGroupId(id)
if err != nil {
return "", false
}
return group.Name, true
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr
}

const (
maxWaitLimit = 200 * time.Millisecond // Maximum time UpdateDB will wait for process
combinedWaitLimit = 2 * time.Second // Multiple UpdateDB calls will wait up to this amount within resetDuration
backoffDuration = 10 * time.Second // UpdateDB will stop waiting for processes for this time
maxWaitLimit = 200 * time.Millisecond // Maximum time SyncDB will wait for process
combinedWaitLimit = 2 * time.Second // Multiple SyncDB calls will wait up to this amount within resetDuration
backoffDuration = 10 * time.Second // SyncDB will stop waiting for processes for this time
resetDuration = 5 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset
)

Expand All @@ -176,7 +176,7 @@ var (
// If for some reason a lot of time has been spent waiting for missing processes, this also has a backoff timer during
// which it will continue without waiting for missing events to arrive, so the processor doesn't become overly backed-up
// waiting for these processes, at the cost of possibly not enriching some processes.
func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
func (s prvdr) SyncDB(ev *beat.Event, pid uint32) error {
if s.db.HasProcess(pid) {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, rea
}, nil
}

// UpdateDB will update the process DB with process info from procfs or the event itself
func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
// SyncDB will update the process DB with process info from procfs or the event itself
func (s prvdr) SyncDB(ev *beat.Event, pid uint32) error {
syscall, err := ev.GetValue(syscallField)
if err != nil {
return fmt.Errorf("event not supported, no syscall data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestExecveEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event, expected.PIDs.Tgid)
err = provider.SyncDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestExecveatEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event, expected.PIDs.Tgid)
err = provider.SyncDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestSetSidEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event, expected.PIDs.Tgid)
err = provider.SyncDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestSetSidEventFailed(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event, expected.PIDs.Tgid)
err = provider.SyncDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestSetSidSessionLeaderNotScraped(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event, expected.PIDs.Tgid)
err = provider.SyncDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down
3 changes: 2 additions & 1 deletion x-pack/auditbeat/processors/sessionmd/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
)

// SyncDB should ensure the DB is in a state to handle the event before returning.
type Provider interface {
UpdateDB(*beat.Event, uint32) error
SyncDB(event *beat.Event, pid uint32) error
}

0 comments on commit 4f65d8c

Please sign in to comment.