Skip to content

Commit

Permalink
Session view processor procfs (elastic#38799)
Browse files Browse the repository at this point in the history
Add a procfs backend to the auditbeat add_session_metadata processor. For systems that don't support ebpf, this procfs backend can be used instead to enrich auditbeat events with the data needed for session view.

However the backend is expected to be less reliable than the ebpf backend; if processes exit before their info is read from procfs, the info will not be available, and some processes will be not be enriched (particularly very short-lived processes).
  • Loading branch information
mjwolf authored Apr 15, 2024
1 parent f1934b5 commit 1898d37
Show file tree
Hide file tree
Showing 9 changed files with 723 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add opt-in eBPF backend for file_integrity module. {pull}37223[37223]
- Add process data to file events (Linux only, eBPF backend). {pull}38199[38199]
- Add container id to file events (Linux only, eBPF backend). {pull}38328[38328]
- Add procfs backend to the `add_session_metadata` processor. {pull}38799[38799]
- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776]

*Filebeat*
Expand Down
32 changes: 23 additions & 9 deletions x-pack/auditbeat/processors/sessionmd/add_session_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/procfs"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider"
cfg "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -59,24 +60,37 @@ func New(cfg *cfg.C) (beat.Processor, error) {
backfilledPIDs := db.ScrapeProcfs()
logger.Debugf("backfilled %d processes", len(backfilledPIDs))

var p provider.Provider

switch c.Backend {
case "auto":
// "auto" always uses ebpf, as it's currently the only backend
fallthrough
p, err = ebpf_provider.NewProvider(ctx, logger, db)
if err != nil {
// Most likely cause of error is not supporting ebpf on system, try procfs
p, err = procfs_provider.NewProvider(ctx, logger, db, reader, c.PIDField)
if err != nil {
return nil, fmt.Errorf("failed to create provider: %w", err)
}
}
case "ebpf":
p, err := ebpf_provider.NewProvider(ctx, logger, db)
p, err = ebpf_provider.NewProvider(ctx, logger, db)
if err != nil {
return nil, fmt.Errorf("failed to create ebpf provider: %w", err)
}
case "procfs":
p, err = procfs_provider.NewProvider(ctx, logger, db, reader, c.PIDField)
if err != nil {
return nil, fmt.Errorf("failed to create ebpf provider: %w", err)
}
return &addSessionMetadata{
config: c,
logger: logger,
db: db,
provider: p,
}, nil
default:
return nil, fmt.Errorf("unknown backend configuration")
}
return &addSessionMetadata{
config: c,
logger: logger,
db: db,
provider: p,
}, nil
}

func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/processdb"
Expand Down Expand Up @@ -327,7 +327,7 @@ func TestEnrich(t *testing.T) {
for _, tt := range enrichTests {
reader := procfs.NewMockReader()
db, err := processdb.NewDB(reader, *logger)
assert.Nil(t, err)
require.Nil(t, err)

for _, ev := range tt.mockProcesses {
db.InsertExec(ev)
Expand All @@ -342,10 +342,10 @@ func TestEnrich(t *testing.T) {
i := tt.input
actual, err := s.enrich(&i)
if tt.expect_error {
assert.Error(t, err, "%s: error unexpectedly nil", tt.testName)
require.Error(t, err, "%s: error unexpectedly nil", tt.testName)
} else {
assert.Nil(t, err, "%s: enrich error: %w", tt.testName, err)
assert.NotNil(t, actual, "%s: returned nil event", tt.testName)
require.Nil(t, err, "%s: enrich error: %w", tt.testName, err)
require.NotNil(t, actual, "%s: returned nil event", tt.testName)

//Validate output
if diff := cmp.Diff(tt.expected.Fields, actual.Fields, ignoreMissingFrom(tt.expected.Fields)); diff != "" {
Expand Down
66 changes: 58 additions & 8 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
ttyMajor = 4
consoleMaxMinor = 63
ttyMaxMinor = 255
retryCount = 2
)

type Process struct {
Expand Down Expand Up @@ -229,6 +230,8 @@ func (db *DB) InsertFork(fork types.ProcessForkEvent) {

pid := fork.ChildPIDs.Tgid
ppid := fork.ParentPIDs.Tgid
db.scrapeAncestors(db.processes[pid])

if entry, ok := db.processes[ppid]; ok {
entry.PIDs = pidInfoFromProto(fork.ChildPIDs)
entry.Creds = credInfoFromProto(fork.Creds)
Expand Down Expand Up @@ -271,6 +274,7 @@ func (db *DB) InsertExec(exec types.ProcessExecEvent) {
}

db.processes[exec.PIDs.Tgid] = proc
db.scrapeAncestors(proc)
entryLeaderPID := db.evaluateEntryLeader(proc)
if entryLeaderPID != nil {
db.entryLeaderRelationships[exec.PIDs.Tgid] = *entryLeaderPID
Expand Down Expand Up @@ -431,6 +435,8 @@ func fullProcessFromDBProcess(p Process) types.Process {
ret.Group.ID = strconv.FormatUint(uint64(egid), 10)
ret.Thread.Capabilities.Permitted, _ = capabilities.FromUint64(p.Creds.CapPermitted)
ret.Thread.Capabilities.Effective, _ = capabilities.FromUint64(p.Creds.CapEffective)
ret.TTY.CharDevice.Major = p.CTTY.Major
ret.TTY.CharDevice.Minor = p.CTTY.Minor

return ret
}
Expand Down Expand Up @@ -555,27 +561,48 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {

ret := fullProcessFromDBProcess(process)

if parent, ok := db.processes[process.PIDs.Ppid]; ok {
fillParent(&ret, parent)
if process.PIDs.Ppid != 0 {
for i := 0; i < retryCount; i++ {
if parent, ok := db.processes[process.PIDs.Ppid]; ok {
fillParent(&ret, parent)
break
}
db.logger.Debugf("failed to find %d in DB (parent of %d), attempting to scrape", process.PIDs.Ppid, pid)
db.scrapeAncestors(process)
}
}

if groupLeader, ok := db.processes[process.PIDs.Pgid]; ok {
fillGroupLeader(&ret, groupLeader)
if process.PIDs.Pgid != 0 {
for i := 0; i < retryCount; i++ {
if groupLeader, ok := db.processes[process.PIDs.Pgid]; ok {
fillGroupLeader(&ret, groupLeader)
break
}
db.logger.Debugf("failed to find %d in DB (group leader of %d), attempting to scrape", process.PIDs.Pgid, pid)
db.scrapeAncestors(process)
}
}

if sessionLeader, ok := db.processes[process.PIDs.Sid]; ok {
fillSessionLeader(&ret, sessionLeader)
if process.PIDs.Sid != 0 {
for i := 0; i < retryCount; i++ {
if sessionLeader, ok := db.processes[process.PIDs.Sid]; ok {
fillSessionLeader(&ret, sessionLeader)
break
}
db.logger.Debugf("failed to find %d in DB (session leader of %d), attempting to scrape", process.PIDs.Sid, pid)
db.scrapeAncestors(process)
}
}

if entryLeaderPID, foundEntryLeaderPID := db.entryLeaderRelationships[process.PIDs.Tgid]; foundEntryLeaderPID {
if entryLeader, foundEntryLeader := db.processes[entryLeaderPID]; foundEntryLeader {
// if there is an entry leader then there is a matching member in the entryLeaders table
fillEntryLeader(&ret, db.entryLeaders[entryLeaderPID], entryLeader)
} else {
db.logger.Errorf("failed to find entry leader entry %d for %d (%s)", entryLeaderPID, pid, db.processes[pid].Filename)
db.logger.Debugf("failed to find entry leader entry %d for %d (%s)", entryLeaderPID, pid, db.processes[pid].Filename)
}
} else {
db.logger.Errorf("failed to find entry leader for %d (%s)", pid, db.processes[pid].Filename)
db.logger.Debugf("failed to find entry leader for %d (%s)", pid, db.processes[pid].Filename)
}

db.setEntityID(&ret)
Expand Down Expand Up @@ -666,3 +693,26 @@ func getTTYType(major uint16, minor uint16) TTYType {

return TTYUnknown
}

func (db *DB) scrapeAncestors(proc Process) {
for _, pid := range []uint32{proc.PIDs.Pgid, proc.PIDs.Ppid, proc.PIDs.Sid} {
if _, exists := db.processes[pid]; pid == 0 || exists {
continue
}
procInfo, err := db.procfs.GetProcess(pid)
if err != nil {
db.logger.Debugf("couldn't get %v from procfs: %w", pid, err)
continue
}
p := Process{
PIDs: pidInfoFromProto(procInfo.PIDs),
Creds: credInfoFromProto(procInfo.Creds),
CTTY: ttyDevFromProto(procInfo.CTTY),
Argv: procInfo.Argv,
Cwd: procInfo.Cwd,
Env: procInfo.Env,
Filename: procInfo.Filename,
}
db.insertProcess(p)
}
}
10 changes: 5 additions & 5 deletions x-pack/auditbeat/processors/sessionmd/processdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ package processdb
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/logp"
)

var logger = logp.NewLogger("processdb")

func TestGetTTYType(t *testing.T) {
assert.Equal(t, TTYConsole, getTTYType(4, 0))
assert.Equal(t, Pts, getTTYType(136, 0))
assert.Equal(t, TTY, getTTYType(4, 64))
assert.Equal(t, TTYUnknown, getTTYType(1000, 1000))
require.Equal(t, TTYConsole, getTTYType(4, 0))
require.Equal(t, Pts, getTTYType(136, 0))
require.Equal(t, TTY, getTTYType(4, 64))
require.Equal(t, TTYUnknown, getTTYType(1000, 1000))
}
2 changes: 1 addition & 1 deletion x-pack/auditbeat/processors/sessionmd/procfs/procfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

func MajorTTY(ttyNr uint32) uint16 {
return uint16((ttyNr >> 8) & 0xf)
return uint16((ttyNr >> 8) & 0xff)
}

func MinorTTY(ttyNr uint32) uint16 {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build linux

package procfs_provider

import (
"context"
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/processdb"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/procfs"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider"
"github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types"
"github.com/elastic/elastic-agent-libs/logp"
)

const (
syscallField = "auditd.data.syscall"
)

type prvdr struct {
ctx context.Context
logger *logp.Logger
db *processdb.DB
reader procfs.Reader
pidField string
}

func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, reader procfs.Reader, pidField string) (provider.Provider, error) {
return prvdr{
ctx: ctx,
logger: logger,
db: db,
reader: reader,
pidField: pidField,
}, nil
}

// UpdateDB will update the process DB with process info from procfs or the event itself
func (s prvdr) UpdateDB(ev *beat.Event) error {
pi, err := ev.Fields.GetValue(s.pidField)
if err != nil {
return fmt.Errorf("event not supported, no pid")
}
pid, ok := pi.(int)
if !ok {
return fmt.Errorf("pid field not int")
}

syscall, err := ev.GetValue(syscallField)
if err != nil {
return fmt.Errorf("event not supported, no syscall data")
}

switch syscall {
case "execveat", "execve":
pe := types.ProcessExecEvent{}
proc_info, err := s.reader.GetProcess(uint32(pid))
if err == nil {
pe.PIDs = proc_info.PIDs
pe.Creds = proc_info.Creds
pe.CTTY = proc_info.CTTY
pe.CWD = proc_info.Cwd
pe.Argv = proc_info.Argv
pe.Env = proc_info.Env
pe.Filename = proc_info.Filename
} else {
s.logger.Errorf("get process info from proc for pid %v: %w", pid, err)
// If process info couldn't be taken from procfs, populate with as much info as
// possible from the event
pe.PIDs.Tgid = uint32(pid)
var intr interface{}
var i int
var ok bool
var parent types.Process
intr, err := ev.Fields.GetValue("process.parent.pid")
if err != nil {
goto out
}
if i, ok = intr.(int); !ok {
goto out
}
pe.PIDs.Ppid = uint32(i)

parent, err = s.db.GetProcess(pe.PIDs.Ppid)
if err != nil {
goto out
}
pe.PIDs.Sid = parent.SessionLeader.PID

intr, err = ev.Fields.GetValue("process.working_directory")
if err != nil {
goto out
}
pe.CWD = intr.(string)
out:
}
s.db.InsertExec(pe)
if err != nil {
return fmt.Errorf("insert exec to db: %w", err)
}
case "exit_group":
pe := types.ProcessExitEvent{
PIDs: types.PIDInfo{
Tgid: uint32(pid),
},
}
s.db.InsertExit(pe)
case "setsid":
intr, err := ev.Fields.GetValue("auditd.result")
if err != nil {
return fmt.Errorf("syscall exit value not found")
}
result, ok := intr.(string)
if !ok {
return fmt.Errorf("\"auditd.result\" not string")
}
if result == "success" {
setsid_ev := types.ProcessSetsidEvent{
PIDs: types.PIDInfo{
Tgid: uint32(pid),
Sid: uint32(pid),
},
}
s.db.InsertSetsid(setsid_ev)
}
}
return nil
}
Loading

0 comments on commit 1898d37

Please sign in to comment.