diff --git a/x-pack/auditbeat/processors/sessionmd/procfs/mock.go b/x-pack/auditbeat/processors/sessionmd/procfs/mock.go index e6ea547b2b02..658d80925854 100644 --- a/x-pack/auditbeat/processors/sessionmd/procfs/mock.go +++ b/x-pack/auditbeat/processors/sessionmd/procfs/mock.go @@ -8,23 +8,30 @@ package procfs import ( "fmt" + "sync" ) type MockReader struct { entries map[uint32]ProcessInfo + mut *sync.Mutex } func NewMockReader() *MockReader { return &MockReader{ entries: make(map[uint32]ProcessInfo), + mut: &sync.Mutex{}, } } func (r *MockReader) AddEntry(pid uint32, entry ProcessInfo) { + r.mut.Lock() + defer r.mut.Unlock() r.entries[pid] = entry } func (r *MockReader) GetProcess(pid uint32) (ProcessInfo, error) { + r.mut.Lock() + defer r.mut.Unlock() entry, ok := r.entries[pid] if !ok { return ProcessInfo{}, fmt.Errorf("not found") @@ -33,12 +40,16 @@ func (r *MockReader) GetProcess(pid uint32) (ProcessInfo, error) { } func (r *MockReader) ProcessExists(pid uint32) bool { + r.mut.Lock() + defer r.mut.Unlock() _, ok := r.entries[pid] return ok } func (r *MockReader) GetAllProcesses() ([]ProcessInfo, error) { + r.mut.Lock() + defer r.mut.Unlock() ret := make([]ProcessInfo, 0, len(r.entries)) for _, entry := range r.entries { diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider_test.go b/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider_test.go index 4f2ecdf864aa..23eab7b89add 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider_test.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfsprovider/procfsprovider_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" @@ -27,6 +28,71 @@ var ( timestamp = time.Now() ) +func constructEvt(tgid uint32, syscall string) *beat.Event { + evt := &beat.Event{Fields: mapstr.M{}} + evt.Fields.Put("process.pid", tgid) + evt.Fields.Put(syscallField, syscall) + return evt +} + +func assertRegistryUint(t require.TestingT, reg *monitoring.Registry, key string, expected uint64, message string) { + entry := reg.Get(key) + assert.NotNil(t, entry) + + value, ok := reg.Get(key).(*monitoring.Uint) + assert.True(t, ok) + assert.Equal(t, expected, value.Get(), message) +} + +func loadDB(t *testing.T, count uint32, procHandler procfs.MockReader, prov prvdr) { + for i := uint32(1); i < count; i++ { + evt := constructEvt(i, "execve") + procHandler.AddEntry(i, procfs.ProcessInfo{PIDs: types.PIDInfo{Tgid: i, Ppid: 1234}}) + + prov.Sync(evt, i) + + // verify that we got the process + found, err := prov.db.GetProcess(i) + require.NoError(t, err) + require.NotNil(t, found) + + // now insert the exit + exitEvt := constructEvt(i, "exit_group") + prov.Sync(exitEvt, i) + + } +} + +func TestProviderLoadMetrics(t *testing.T) { + testReg := monitoring.NewRegistry() + testProc := procfs.NewMockReader() + + procDB, err := processdb.NewDB(testReg, testProc, *logp.L(), time.Second*2, true) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15) + defer cancel() + testProvider, err := NewProvider(ctx, logp.L(), procDB, testProc, "process.pid") + require.NoError(t, err) + rawPrvdr := testProvider.(prvdr) + + events := 100_000 + loadDB(t, uint32(events), *testProc, rawPrvdr) + + // wait for the maps to empty to the correct amount as the reaper runs + require.EventuallyWithT(t, func(collect *assert.CollectT) { + assertRegistryUint(collect, testReg, "processdb.processes", 0, "processdb.processes") + assertRegistryUint(collect, testReg, "processdb.exit_events", 0, "processdb.exit_events") + }, time.Minute*5, time.Second*10) + + // ensure processes are getting resolved properly + assertRegistryUint(t, testReg, "processdb.resolved_orphan_exits", 0, "resolved_orphan_exits") + assertRegistryUint(t, testReg, "processdb.reaped_orphan_exits", 0, "reaped_orphan_exits") + assertRegistryUint(t, testReg, "processdb.failed_process_lookup_count", 0, "processdb.failed_process_lookup_count") + assertRegistryUint(t, testReg, "processdb.procfs_lookup_fail", 0, "processdb.procfs_lookup_fail") + +} + func TestExecveEvent(t *testing.T) { var pid uint32 = 100 event := beat.Event{