Skip to content

Commit

Permalink
Add cgroup.cpuacct percentages (elastic#25057)
Browse files Browse the repository at this point in the history
* initial commit of cgroup cpuacct percentages

* add in percpu flag

* fix up pct function

* formatting, name cleanup

* remove old code
  • Loading branch information
fearful-symmetry authored Apr 14, 2021
1 parent a9279cd commit b2f22fa
Show file tree
Hide file tree
Showing 9 changed files with 452 additions and 114 deletions.
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func SetupMetrics(name string) error {
beatProcessStats = &process.Stats{
Procs: []string{name},
EnvWhitelist: nil,
CpuTicks: true,
CPUTicks: true,
CacheCmdLine: true,
IncludeTop: process.IncludeTopConfig{},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,31 @@ import (

// cgroupStatsToMap returns a MapStr containing the data from the stats object.
// If stats is nil then nil is returned.
func cgroupStatsToMap(stats *cgroup.Stats, perCPU bool) common.MapStr {
if stats == nil {
func cgroupStatsToMap(stats *Process) common.MapStr {
if stats == nil || stats.RawStats == nil {
return nil
}

cgroup := common.MapStr{}

// id and path are only available when all subsystems share a common path.
if stats.ID != "" {
cgroup["id"] = stats.ID
if stats.RawStats.ID != "" {
cgroup["id"] = stats.RawStats.ID
}
if stats.Path != "" {
cgroup["path"] = stats.Path
if stats.RawStats.Path != "" {
cgroup["path"] = stats.RawStats.Path
}

if cpu := cgroupCPUToMapStr(stats.CPU); cpu != nil {
if cpu := cgroupCPUToMapStr(stats.RawStats.CPU); cpu != nil {
cgroup["cpu"] = cpu
}
if cpuacct := cgroupCPUAccountingToMapStr(stats.CPUAccounting, perCPU); cpuacct != nil {
if cpuacct := cgroupCPUAccountingToMapStr(stats); cpuacct != nil {
cgroup["cpuacct"] = cpuacct
}
if memory := cgroupMemoryToMapStr(stats.Memory); memory != nil {
if memory := cgroupMemoryToMapStr(stats.RawStats.Memory); memory != nil {
cgroup["memory"] = memory
}
if blkio := cgroupBlockIOToMapStr(stats.BlockIO); blkio != nil {
if blkio := cgroupBlockIOToMapStr(stats.RawStats.BlockIO); blkio != nil {
cgroup["blkio"] = blkio
}

Expand Down Expand Up @@ -97,7 +97,8 @@ func cgroupCPUToMapStr(cpu *cgroup.CPUSubsystem) common.MapStr {
// cgroupCPUAccountingToMapStr returns a MapStr containing
// CPUAccountingSubsystem data. If the cpuacct parameter is nil then nil is
// returned.
func cgroupCPUAccountingToMapStr(cpuacct *cgroup.CPUAccountingSubsystem, perCPU bool) common.MapStr {
func cgroupCPUAccountingToMapStr(process *Process) common.MapStr {
cpuacct := process.RawStats.CPUAccounting
if cpuacct == nil {
return nil
}
Expand All @@ -106,25 +107,35 @@ func cgroupCPUAccountingToMapStr(cpuacct *cgroup.CPUAccountingSubsystem, perCPU
"id": cpuacct.ID,
"path": cpuacct.Path,
"total": common.MapStr{
"ns": cpuacct.TotalNanos,
"ns": cpuacct.TotalNanos,
"pct": process.PctStats.CPUTotalPct,
"norm": common.MapStr{
"pct": process.PctStats.CPUTotalPctNorm,
},
},
"stats": common.MapStr{
"system": common.MapStr{
"ns": cpuacct.Stats.SystemNanos,
"ns": cpuacct.Stats.SystemNanos,
"pct": process.PctStats.CPUSystemPct,
"norm": common.MapStr{
"pct": process.PctStats.CPUSystemPctNorm,
},
},
"user": common.MapStr{
"ns": cpuacct.Stats.UserNanos,
"ns": cpuacct.Stats.UserNanos,
"pct": process.PctStats.CPUUserPct,
"norm": common.MapStr{
"pct": process.PctStats.CPUUserPctNorm,
},
},
},
}

if perCPU {
perCPUUsage := common.MapStr{}
for i, usage := range cpuacct.UsagePerCPU {
perCPUUsage[strconv.Itoa(i+1)] = usage
}
event["percpu"] = perCPUUsage
perCPUUsage := common.MapStr{}
for i, usage := range cpuacct.UsagePerCPU {
perCPUUsage[strconv.Itoa(i+1)] = usage
}
event["percpu"] = perCPUUsage

return event
}
Expand Down
157 changes: 124 additions & 33 deletions libbeat/metric/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/metric/system/memory"
sigar "github.com/elastic/gosigar"
"github.com/elastic/gosigar/cgroup"
)

// ProcsMap is a map where the keys are the names of processes and the value is the Process with that name
Expand All @@ -42,39 +43,57 @@ type ProcsMap map[int]*Process
// Process is the structure which holds the information of a process running on the host.
// It includes pid, gid and it interacts with gosigar to fetch process data from the host.
type Process struct {
Pid int `json:"pid"`
Ppid int `json:"ppid"`
Pgid int `json:"pgid"`
Name string `json:"name"`
Username string `json:"username"`
State string `json:"state"`
Args []string `json:"args"`
CmdLine string `json:"cmdline"`
Cwd string `json:"cwd"`
Executable string `json:"executable"`
Mem sigar.ProcMem
Cpu sigar.ProcTime
SampleTime time.Time
FD sigar.ProcFDUsage
Env common.MapStr
Pid int `json:"pid"`
Ppid int `json:"ppid"`
Pgid int `json:"pgid"`
Name string `json:"name"`
Username string `json:"username"`
State string `json:"state"`
Args []string `json:"args"`
CmdLine string `json:"cmdline"`
Cwd string `json:"cwd"`
Executable string `json:"executable"`
Mem sigar.ProcMem
CPU sigar.ProcTime
SampleTime time.Time
FD sigar.ProcFDUsage
Env common.MapStr

//cpu stats
cpuSinceStart float64
cpuTotalPct float64
cpuTotalPctNorm float64

// cgroup stats
RawStats *cgroup.Stats
PctStats CgroupPctStats
}

// CgroupPctStats stores rendered percent values from cgroup CPU data
type CgroupPctStats struct {
CPUTotalPct float64
CPUTotalPctNorm float64
CPUUserPct float64
CPUUserPctNorm float64
CPUSystemPct float64
CPUSystemPctNorm float64
}

// Stats stores the stats of processes on the host.
type Stats struct {
Procs []string
ProcsMap ProcsMap
CpuTicks bool
EnvWhitelist []string
CacheCmdLine bool
IncludeTop IncludeTopConfig
Procs []string
ProcsMap ProcsMap
CPUTicks bool
EnvWhitelist []string
CacheCmdLine bool
IncludeTop IncludeTopConfig
CgroupOpts cgroup.ReaderOptions
EnableCgroups bool

procRegexps []match.Matcher // List of regular expressions used to whitelist processes.
envRegexps []match.Matcher // List of regular expressions used to whitelist env vars.

logger *logp.Logger
cgroups *cgroup.Reader
logger *logp.Logger
}

// Ticks of CPU for a process
Expand Down Expand Up @@ -127,8 +146,8 @@ func (proc *Process) getDetails(envPredicate func(string) bool) error {
return fmt.Errorf("error getting process mem for pid=%d: %v", proc.Pid, err)
}

proc.Cpu = sigar.ProcTime{}
if err := proc.Cpu.Get(proc.Pid); err != nil {
proc.CPU = sigar.ProcTime{}
if err := proc.CPU.Get(proc.Pid); err != nil {
return fmt.Errorf("error getting process cpu time for pid=%d: %v", proc.Pid, err)
}

Expand Down Expand Up @@ -322,13 +341,13 @@ func (procStats *Stats) getProcessEvent(process *Process) common.MapStr {
"pct": process.cpuTotalPctNorm,
},
},
"start_time": unixTimeMsToTime(process.Cpu.StartTime),
"start_time": unixTimeMsToTime(process.CPU.StartTime),
}

if procStats.CpuTicks {
proc.Put("cpu.user.ticks", process.Cpu.User)
proc.Put("cpu.system.ticks", process.Cpu.Sys)
proc.Put("cpu.total.ticks", process.Cpu.Total)
if procStats.CPUTicks {
proc.Put("cpu.user.ticks", process.CPU.User)
proc.Put("cpu.system.ticks", process.CPU.Sys)
proc.Put("cpu.total.ticks", process.CPU.Total)
}

if process.FD != (sigar.ProcFDUsage{}) {
Expand All @@ -341,6 +360,12 @@ func (procStats *Stats) getProcessEvent(process *Process) common.MapStr {
}
}

if procStats.EnableCgroups {
if statsMap := cgroupStatsToMap(process); statsMap != nil {
proc["cgroup"] = statsMap
}
}

return proc
}

Expand All @@ -359,18 +384,63 @@ func GetProcCPUPercentage(s0, s1 *Process) (normalizedPct, pct, totalPct float64
if s0 != nil && s1 != nil {
timeDelta := s1.SampleTime.Sub(s0.SampleTime)
timeDeltaMillis := timeDelta / time.Millisecond
totalCPUDeltaMillis := int64(s1.Cpu.Total - s0.Cpu.Total)
totalCPUDeltaMillis := int64(s1.CPU.Total - s0.CPU.Total)

pct := float64(totalCPUDeltaMillis) / float64(timeDeltaMillis)
normalizedPct := pct / float64(runtime.NumCPU())

return common.Round(normalizedPct, common.DefaultDecimalPlacesCount),
common.Round(pct, common.DefaultDecimalPlacesCount),
common.Round(float64(s1.Cpu.Total), common.DefaultDecimalPlacesCount)
common.Round(float64(s1.CPU.Total), common.DefaultDecimalPlacesCount)
}
return 0, 0, 0
}

// GetCgroupPercentage returns CPU usage percentages for a given cgroup
// see GetProcCPUPercentage for implementation details, as the two are conceptually similar.
// Note that the cgroup controller reports system and user times in USER_HZ, while
// totals are reported in nanoseconds. Because of this, any math that mixes the two might be slightly off,
// as USER_HZ is less precise value that will get rounded up to nanseconds.
// Because of that, `user` and `system` metrics reflect a precentage of overall CPU time, but can't be compared to the total pct values.
func GetCgroupPercentage(s0, s1 *Process) CgroupPctStats {
if s0 == nil || s1 == nil || s0.RawStats == nil || s1.RawStats == nil || s0.RawStats.CPUAccounting == nil || s1.RawStats.CPUAccounting == nil {
return CgroupPctStats{}
}
timeDelta := s1.SampleTime.Sub(s0.SampleTime)
timeDeltaNanos := timeDelta / time.Nanosecond
totalCPUDeltaNanos := int64(s1.RawStats.CPUAccounting.TotalNanos - s0.RawStats.CPUAccounting.TotalNanos)

pct := float64(totalCPUDeltaNanos) / float64(timeDeltaNanos)
// Avoid using NumCPU unless we need to; the values in UsagePerCPU are more likely to reflect the running conditions of the cgroup
// NumCPU can vary based on the conditions of the running metricbeat process, as it uses Affinity Masks, not hardware data.
var cpuCount int
if len(s1.RawStats.CPUAccounting.UsagePerCPU) > 0 {
cpuCount = len(s1.RawStats.CPUAccounting.UsagePerCPU)
} else {
cpuCount = runtime.NumCPU()
}

// if you look at the raw cgroup stats, the following normalized value is literally an average of per-cpu numbers.
normalizedPct := pct / float64(cpuCount)
userCPUDeltaMillis := int64(s1.RawStats.CPUAccounting.Stats.UserNanos - s0.RawStats.CPUAccounting.Stats.UserNanos)
systemCPUDeltaMillis := int64(s1.RawStats.CPUAccounting.Stats.SystemNanos - s0.RawStats.CPUAccounting.Stats.SystemNanos)

userPct := float64(userCPUDeltaMillis) / float64(timeDeltaNanos)
systemPct := float64(systemCPUDeltaMillis) / float64(timeDeltaNanos)

normalizedUser := userPct / float64(cpuCount)
normalizedSystem := systemPct / float64(cpuCount)

pctValues := CgroupPctStats{
CPUTotalPct: common.Round(pct, common.DefaultDecimalPlacesCount),
CPUTotalPctNorm: common.Round(normalizedPct, common.DefaultDecimalPlacesCount),
CPUUserPct: common.Round(userPct, common.DefaultDecimalPlacesCount),
CPUUserPctNorm: common.Round(normalizedUser, common.DefaultDecimalPlacesCount),
CPUSystemPct: common.Round(systemPct, common.DefaultDecimalPlacesCount),
CPUSystemPctNorm: common.Round(normalizedSystem, common.DefaultDecimalPlacesCount),
}
return pctValues
}

// matchProcess checks if the provided process name matches any of the process regexes
func (procStats *Stats) matchProcess(name string) bool {
for _, reg := range procStats.procRegexps {
Expand Down Expand Up @@ -409,6 +479,16 @@ func (procStats *Stats) Init() error {
procStats.envRegexps = append(procStats.envRegexps, reg)
}

if procStats.EnableCgroups {
cgReader, err := cgroup.NewReaderOptions(procStats.CgroupOpts)
if err == cgroup.ErrCgroupsMissing {
logp.Warn("cgroup data collection will be disabled: %v", err)
} else if err != nil {
return errors.Wrap(err, "error initializing cgroup reader")
}
procStats.cgroups = cgReader
}

return nil
}

Expand Down Expand Up @@ -492,6 +572,17 @@ func (procStats *Stats) getSingleProcess(pid int, newProcs ProcsMap) *Process {
return nil
}

if procStats.EnableCgroups {
cgStats, err := procStats.cgroups.GetStatsForProcess(pid)
if err != nil {
procStats.logger.Debug("Error fetching cgroup data for process %s with pid=%d: %v", process.Name, process.Pid, err)
return nil
}
process.RawStats = cgStats
last := procStats.ProcsMap[process.Pid]
process.PctStats = GetCgroupPercentage(last, process)
}

newProcs[process.Pid] = process
last := procStats.ProcsMap[process.Pid]
process.cpuTotalPctNorm, process.cpuTotalPct, process.cpuSinceStart = GetProcCPUPercentage(last, process)
Expand Down
12 changes: 6 additions & 6 deletions libbeat/metric/system/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func TestGetProcess(t *testing.T) {
assert.True(t, (process.Mem.Share >= 0))

// CPU Checks
assert.True(t, (process.Cpu.StartTime > 0))
assert.True(t, (process.Cpu.Total >= 0))
assert.True(t, (process.Cpu.User >= 0))
assert.True(t, (process.Cpu.Sys >= 0))
assert.True(t, (process.CPU.StartTime > 0))
assert.True(t, (process.CPU.Total >= 0))
assert.True(t, (process.CPU.User >= 0))
assert.True(t, (process.CPU.Sys >= 0))

assert.True(t, (process.SampleTime.Unix() <= time.Now().Unix()))

Expand Down Expand Up @@ -143,7 +143,7 @@ func TestProcMemPercentage(t *testing.T) {

func TestProcCpuPercentage(t *testing.T) {
p1 := &Process{
Cpu: gosigar.ProcTime{
CPU: gosigar.ProcTime{
User: 11345,
Sys: 37,
Total: 11382,
Expand All @@ -152,7 +152,7 @@ func TestProcCpuPercentage(t *testing.T) {
}

p2 := &Process{
Cpu: gosigar.ProcTime{
CPU: gosigar.ProcTime{
User: 14794,
Sys: 47,
Total: 14841,
Expand Down
Loading

0 comments on commit b2f22fa

Please sign in to comment.