Skip to content

Commit

Permalink
[Auditbeat] Report process errors (#9693)
Browse files Browse the repository at this point in the history
Changes the process metricset to keep iterating through processes even when an unexpected error occurs. The error will be stored in the Process object and sent to Elasticsearch as well as logged as a warning. This only happens the first time the error is encountered for a process, not on subsequent collection cycles.
  • Loading branch information
Christoph Wurm authored Jan 2, 2019
1 parent 033b021 commit 2cd7c42
Showing 1 changed file with 123 additions and 71 deletions.
194 changes: 123 additions & 71 deletions x-pack/auditbeat/module/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (

eventTypeState = "state"
eventTypeEvent = "event"
eventTypeError = "error"
)

type eventAction uint8
Expand All @@ -44,6 +45,7 @@ const (
eventActionExistingProcess eventAction = iota
eventActionProcessStarted
eventActionProcessStopped
eventActionProcessError
)

func (action eventAction) String() string {
Expand All @@ -54,6 +56,8 @@ func (action eventAction) String() string {
return "process_started"
case eventActionProcessStopped:
return "process_stopped"
case eventActionProcessError:
return "process_error"
default:
return ""
}
Expand All @@ -78,29 +82,30 @@ type MetricSet struct {
suppressPermissionWarnings bool
}

// ProcessInfo wraps the process information and implements cache.Cacheable.
type ProcessInfo struct {
types.ProcessInfo
// Process represents information about a process.
type Process struct {
Info types.ProcessInfo
Error error
}

// Hash creates a hash for ProcessInfo.
func (pInfo ProcessInfo) Hash() uint64 {
// Hash creates a hash for Process.
func (p Process) Hash() uint64 {
h := xxhash.New64()
h.WriteString(strconv.Itoa(pInfo.PID))
h.WriteString(pInfo.StartTime.String())
h.WriteString(strconv.Itoa(p.Info.PID))
h.WriteString(p.Info.StartTime.String())
return h.Sum64()
}

func (pInfo ProcessInfo) toMapStr() common.MapStr {
func (p Process) toMapStr() common.MapStr {
return common.MapStr{
// https://github.com/elastic/ecs#-process-fields
"name": pInfo.Name,
"args": pInfo.Args,
"pid": pInfo.PID,
"ppid": pInfo.PPID,
"working_directory": pInfo.CWD,
"executable": pInfo.Exe,
"start": pInfo.StartTime,
"name": p.Info.Name,
"args": p.Info.Args,
"pid": p.Info.PID,
"ppid": p.Info.PPID,
"working_directory": p.Info.CWD,
"executable": p.Info.Exe,
"start": p.Info.StartTime,
}
}

Expand Down Expand Up @@ -142,6 +147,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
ms.log.Debug("No state timestamp found")
}

if os.Geteuid() != 0 {
ms.log.Warn("Running as non-root user, will likely not report all processes.")
}

return ms, nil
}

Expand Down Expand Up @@ -181,25 +190,30 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error {
ms.lastState = time.Now()
}

processInfos, err := ms.getProcessInfos()
processes, err := ms.getProcesses()
if err != nil {
return errors.Wrap(err, "failed to get process infos")
}
ms.log.Debugf("Found %v processes", len(processInfos))
ms.log.Debugf("Found %v processes", len(processes))

stateID, err := uuid.NewV4()
if err != nil {
return errors.Wrap(err, "error generating state ID")
}
for _, pInfo := range processInfos {
event := processEvent(pInfo, eventTypeState, eventActionExistingProcess)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
for _, p := range processes {
if p.Error == nil {
event := processEvent(p, eventTypeState, eventActionExistingProcess)
event.RootFields.Put("event.id", stateID.String())
report.Event(event)
} else {
ms.log.Warn(p.Error)
report.Event(processEvent(p, eventTypeError, eventActionProcessError))
}
}

if ms.cache != nil {
// This will initialize the cache with the current processes
ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos))
ms.cache.DiffAndUpdateCache(convertToCacheable(processes))
}

// Save time so we know when to send the state again (config.StatePeriod)
Expand All @@ -217,39 +231,60 @@ func (ms *MetricSet) reportState(report mb.ReporterV2) error {

// reportChanges detects and reports any changes to processes on this system since the last call.
func (ms *MetricSet) reportChanges(report mb.ReporterV2) error {
processInfos, err := ms.getProcessInfos()
processes, err := ms.getProcesses()
if err != nil {
return errors.Wrap(err, "failed to get process infos")
return errors.Wrap(err, "failed to get processes")
}
ms.log.Debugf("Found %v processes", len(processInfos))
ms.log.Debugf("Found %v processes", len(processes))

started, stopped := ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos))
started, stopped := ms.cache.DiffAndUpdateCache(convertToCacheable(processes))

for _, pInfo := range started {
report.Event(processEvent(pInfo.(*ProcessInfo), eventTypeEvent, eventActionProcessStarted))
for _, cacheValue := range started {
p := cacheValue.(*Process)

if p.Error == nil {
report.Event(processEvent(p, eventTypeEvent, eventActionProcessStarted))
} else {
ms.log.Warn(p.Error)
report.Event(processEvent(p, eventTypeError, eventActionProcessError))
}
}

for _, pInfo := range stopped {
report.Event(processEvent(pInfo.(*ProcessInfo), eventTypeEvent, eventActionProcessStopped))
for _, cacheValue := range stopped {
p := cacheValue.(*Process)

if p.Error == nil {
report.Event(processEvent(p, eventTypeEvent, eventActionProcessStopped))
}
}

return nil
}

func processEvent(pInfo *ProcessInfo, eventType string, action eventAction) mb.Event {
return mb.Event{
func processEvent(process *Process, eventType string, action eventAction) mb.Event {
event := mb.Event{
RootFields: common.MapStr{
"event": common.MapStr{
"kind": eventType,
"action": action.String(),
},
"process": pInfo.toMapStr(),
"message": processMessage(pInfo, action),
"process": process.toMapStr(),
"message": processMessage(process, action),
},
}

if process.Error != nil {
event.RootFields.Put("error.message", process.Error.Error())
}

return event
}

func processMessage(pInfo *ProcessInfo, action eventAction) string {
func processMessage(process *Process, action eventAction) string {
if process.Error != nil {
return fmt.Sprintf("ERROR for PID %d: %v", process.Info.PID, process.Error)
}

var actionString string
switch action {
case eventActionProcessStarted:
Expand All @@ -261,77 +296,94 @@ func processMessage(pInfo *ProcessInfo, action eventAction) string {
}

return fmt.Sprintf("Process %v (PID: %d) %v",
pInfo.Name, pInfo.PID, actionString)
process.Info.Name, process.Info.PID, actionString)
}

func convertToCacheable(processInfos []*ProcessInfo) []cache.Cacheable {
c := make([]cache.Cacheable, 0, len(processInfos))
func convertToCacheable(processes []*Process) []cache.Cacheable {
c := make([]cache.Cacheable, 0, len(processes))

for _, p := range processInfos {
for _, p := range processes {
c = append(c, p)
}

return c
}

func (ms *MetricSet) getProcessInfos() ([]*ProcessInfo, error) {
func (ms *MetricSet) getProcesses() ([]*Process, error) {
// TODO: Implement Processes() in go-sysinfo
// e.g. https://github.com/elastic/go-sysinfo/blob/master/providers/darwin/process_darwin_amd64.go#L41
pids, err := process.Pids()
if err != nil {
return nil, errors.Wrap(err, "failed to fetch the list of PIDs")
}

var processInfos []*ProcessInfo

var processes []*Process
for _, pid := range pids {
process, err := sysinfo.Process(pid)
var process *Process

sysinfoProc, err := sysinfo.Process(pid)
if err != nil {
if os.IsNotExist(err) {
// Skip - process probably just terminated since our call
// to Pids()
continue
}
return nil, errors.Wrap(err, "failed to load process")
}

pInfo, err := process.Info()
if err != nil {
if os.IsNotExist(err) {
// Skip - process probably just terminated since our call
// to Pids()
continue
// Record what we can and continue
process = &Process{
Info: types.ProcessInfo{
PID: pid,
},
Error: errors.Wrapf(err, "failed to load process with PID %d", pid),
}
} else {
pInfo, err := sysinfoProc.Info()
if err == nil {
process = &Process{
Info: pInfo,
}
} else {
if os.IsNotExist(err) {
// Skip - process probably just terminated since our call
// to Pids()
continue
}

if os.Geteuid() != 0 {
if os.IsPermission(err) || runtime.GOOS == "darwin" {
/*
Running as non-root, permission issues when trying to access other user's private
process information are expected.
if os.Geteuid() != 0 {
if os.IsPermission(err) || runtime.GOOS == "darwin" {
/*
Running as non-root, permission issues when trying to access other user's private
process information are expected.
Unfortunately, for darwin os.IsPermission() does not
work because it is a custom error created using errors.New() in
getProcTaskAllInfo() in go-sysinfo/providers/darwin/process_darwin_amd64.go
Unfortunately, for darwin os.IsPermission() does not
work because it is a custom error created using errors.New() in
getProcTaskAllInfo() in go-sysinfo/providers/darwin/process_darwin_amd64.go
TODO: Fix go-sysinfo to have better error for darwin.
*/
if !ms.suppressPermissionWarnings {
ms.log.Warnf("Failed to load process information for PID %d as non-root user. "+
"Will suppress further errors of this kind. Error: %v", pid, err)

TODO: Fix go-sysinfo to have better error for darwin.
*/
if !ms.suppressPermissionWarnings {
ms.log.Warnf("Failed to load process information for PID %d as non-root user. "+
"Will suppress further errors of this kind. Error: %v", pid, err)
// Only warn once at the start of Auditbeat.
ms.suppressPermissionWarnings = true
}

// Only warn once at the start of Auditbeat.
ms.suppressPermissionWarnings = true
//continue
}
}

continue
// Record what we can and continue
process = &Process{
Info: pInfo,
Error: errors.Wrapf(err, "failed to load process information for PID %d", pid),
}
process.Info.PID = pid // in case pInfo did not contain it
}

return nil, errors.Wrap(err, "failed to load process information")
}

processInfos = append(processInfos, &ProcessInfo{pInfo})
processes = append(processes, process)
}

return processInfos, nil
return processes, nil
}

0 comments on commit 2cd7c42

Please sign in to comment.