Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of metricbeat process-gathering metrics and system/process #30076

Merged
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b4d6724
start work on process refactoring
fearful-symmetry Nov 15, 2021
3c3e6b0
finish new pid fetcher
fearful-symmetry Nov 16, 2021
11acfe7
continue work on linux process metrics refactoring
fearful-symmetry Nov 30, 2021
2d846f5
continue work on linux implementation
fearful-symmetry Dec 1, 2021
057da2f
complete first draft of linux support
fearful-symmetry Jan 14, 2022
46ee831
Merge remote-tracking branch 'upstream/master' into system-process-ne…
fearful-symmetry Jan 14, 2022
ddecc2d
finish linux, start MacOS
fearful-symmetry Jan 20, 2022
49816b6
fix tests
fearful-symmetry Jan 20, 2022
765f5f4
fix darwin calls
fearful-symmetry Jan 20, 2022
f9e5dd6
finish first pass at darwin
fearful-symmetry Jan 24, 2022
784a7a9
add aix, start cleanup
fearful-symmetry Jan 26, 2022
d9f8771
add new benchmark
fearful-symmetry Jan 26, 2022
578babf
start cleanup, fix misc. config values
fearful-symmetry Jan 27, 2022
88b3858
fix darwin caching
fearful-symmetry Jan 27, 2022
1bb007e
format
fearful-symmetry Jan 27, 2022
7ca6436
clean up code, fix some tests
fearful-symmetry Jan 28, 2022
ebe1a9c
Merge remote-tracking branch 'upstream/master' into system-process-ne…
fearful-symmetry Jan 28, 2022
efcdc66
move over root event logic, fix a ton of little bugs
fearful-symmetry Jan 28, 2022
3a96cf7
fix tests, add default hostfs to init
fearful-symmetry Jan 28, 2022
c53f86e
fix cgroup field
fearful-symmetry Jan 31, 2022
8e8b1ac
try to fix process tests
fearful-symmetry Feb 1, 2022
aeb77f2
no idea, just trying to fix windows
fearful-symmetry Feb 1, 2022
ee56545
remove mb import from libbeat
fearful-symmetry Feb 1, 2022
789570c
blindly attempt to fix windows unit tests
fearful-symmetry Feb 1, 2022
ab10589
fix log typo
fearful-symmetry Feb 1, 2022
c51f113
skip broken windows tests, at least for now
fearful-symmetry Feb 2, 2022
41be095
code cleanup
fearful-symmetry Feb 2, 2022
1a63b58
fix windows process states
fearful-symmetry Feb 2, 2022
1f07cae
yet another attempt at fixing windows tests
fearful-symmetry Feb 3, 2022
4bd598a
continued attempts at making windows tests less flaky
fearful-symmetry Feb 3, 2022
de6e3ea
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 3, 2022
9d968eb
try to fix formatting
fearful-symmetry Feb 3, 2022
712acb2
give up, disable flaky windows tests
fearful-symmetry Feb 4, 2022
0dcf56a
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 7, 2022
f98d3c2
remove old code
fearful-symmetry Feb 7, 2022
ffac2a9
another shot at playing with windows tests
fearful-symmetry Feb 8, 2022
aa69465
clean up now-working tests
fearful-symmetry Feb 8, 2022
b542e35
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 9, 2022
8e73a09
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 9, 2022
2f1b1ed
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 9, 2022
85c99f1
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 10, 2022
09051eb
try to give python unit tests a little more room
fearful-symmetry Feb 10, 2022
2857e08
fix conditionals in tests
fearful-symmetry Feb 10, 2022
45859c4
try to tune python tests a little
fearful-symmetry Feb 10, 2022
96b8917
just rewrite test
fearful-symmetry Feb 11, 2022
4cbce53
Merge remote-tracking branch 'upstream/main' into system-process-new-…
fearful-symmetry Feb 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 11 additions & 101 deletions libbeat/cmd/instance/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
package metrics

import (
"fmt"
"os"
"runtime"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/metric/system/cgroup"
"github.com/elastic/beats/v7/libbeat/metric/system/cpu"
Expand Down Expand Up @@ -118,129 +116,45 @@ func reportMemStats(m monitoring.Mode, V monitoring.Visitor) {
}

func getRSSSize() (uint64, error) {
state, err := getBeatProcessState()
state, err := beatProcessStats.GetSelf()
if err != nil {
return 0, err
}

iRss, err := state.GetValue("memory.rss.bytes")
if err != nil {
return 0, fmt.Errorf("error getting Resident Set Size: %v", err)
}

rss, ok := iRss.(uint64)
if !ok {
return 0, fmt.Errorf("error converting Resident Set Size to uint64: %v", iRss)
}
return rss, nil
}

func getBeatProcessState() (common.MapStr, error) {
pid, err := process.GetSelfPid()
if err != nil {
return nil, fmt.Errorf("error getting PID for self process: %v", err)
}

state, err := beatProcessStats.GetOne(pid)
if err != nil {
return nil, fmt.Errorf("error retrieving process stats: %v", err)
}

return state, nil
return state.Memory.Rss.Bytes.ValueOr(0), nil
}

func reportBeatCPU(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

totalCPUUsage, cpuTicks, err := getCPUUsage()
state, err := beatProcessStats.GetSelf()
if err != nil {
logp.Err("Error retrieving CPU percentages: %v", err)
return
}

userTime, systemTime, err := process.GetOwnResourceUsageTimeInMillis()
if err != nil {
logp.Err("Error retrieving CPU usage time: %v", err)
return
}

monitoring.ReportNamespace(V, "user", func() {
monitoring.ReportInt(V, "ticks", int64(cpuTicks.User))
monitoring.ReportInt(V, "ticks", int64(state.CPU.User.Ticks.ValueOr(0)))
monitoring.ReportNamespace(V, "time", func() {
monitoring.ReportInt(V, "ms", userTime)
monitoring.ReportInt(V, "ms", int64(state.CPU.User.Ticks.ValueOr(0)))
})
})
monitoring.ReportNamespace(V, "system", func() {
monitoring.ReportInt(V, "ticks", int64(cpuTicks.System))
monitoring.ReportInt(V, "ticks", int64(state.CPU.System.Ticks.ValueOr(0)))
monitoring.ReportNamespace(V, "time", func() {
monitoring.ReportInt(V, "ms", systemTime)
monitoring.ReportInt(V, "ms", int64(state.CPU.System.Ticks.ValueOr(0)))
})
})
monitoring.ReportNamespace(V, "total", func() {
monitoring.ReportFloat(V, "value", totalCPUUsage)
monitoring.ReportInt(V, "ticks", int64(cpuTicks.Total))
monitoring.ReportFloat(V, "value", state.CPU.Total.Value.ValueOr(0))
monitoring.ReportInt(V, "ticks", int64(state.CPU.Total.Ticks.ValueOr(0)))
monitoring.ReportNamespace(V, "time", func() {
monitoring.ReportInt(V, "ms", userTime+systemTime)
monitoring.ReportInt(V, "ms", int64(state.CPU.Total.Ticks.ValueOr(0)))
})
})
}

func getCPUUsage() (float64, *process.Ticks, error) {
state, err := getBeatProcessState()
if err != nil {
return 0.0, nil, err
}

iTotalCPUUsage, err := state.GetValue("cpu.total.value")
if err != nil {
return 0.0, nil, fmt.Errorf("error getting total CPU since start: %v", err)
}

totalCPUUsage, ok := iTotalCPUUsage.(float64)
if !ok {
return 0.0, nil, fmt.Errorf("error converting value of CPU usage since start to float64: %v", iTotalCPUUsage)
}

iTotalCPUUserTicks, err := state.GetValue("cpu.user.ticks")
if err != nil {
return 0.0, nil, fmt.Errorf("error getting number of user CPU ticks since start: %v", err)
}

totalCPUUserTicks, ok := iTotalCPUUserTicks.(uint64)
if !ok {
return 0.0, nil, fmt.Errorf("error converting value of user CPU ticks since start to uint64: %v", iTotalCPUUserTicks)
}

iTotalCPUSystemTicks, err := state.GetValue("cpu.system.ticks")
if err != nil {
return 0.0, nil, fmt.Errorf("error getting number of system CPU ticks since start: %v", err)
}

totalCPUSystemTicks, ok := iTotalCPUSystemTicks.(uint64)
if !ok {
return 0.0, nil, fmt.Errorf("error converting value of system CPU ticks since start to uint64: %v", iTotalCPUSystemTicks)
}

iTotalCPUTicks, err := state.GetValue("cpu.total.ticks")
if err != nil {
return 0.0, nil, fmt.Errorf("error getting total number of CPU ticks since start: %v", err)
}

totalCPUTicks, ok := iTotalCPUTicks.(uint64)
if !ok {
return 0.0, nil, fmt.Errorf("error converting total value of CPU ticks since start to uint64: %v", iTotalCPUTicks)
}

p := process.Ticks{
User: totalCPUUserTicks,
System: totalCPUSystemTicks,
Total: totalCPUTicks,
}

return totalCPUUsage, &p, nil
}

func reportSystemLoadAverage(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()
Expand Down Expand Up @@ -281,11 +195,7 @@ func reportBeatCgroups(_ monitoring.Mode, V monitoring.Visitor) {
V.OnRegistryStart()
defer V.OnRegistryFinished()

pid, err := process.GetSelfPid()
if err != nil {
logp.Err("error getting PID for self process: %v", err)
return
}
pid := os.Getpid()

cgroups, err := cgroup.NewReaderOptions(cgroup.ReaderOptions{
RootfsMountpoint: resolve.NewTestResolver("/"),
Expand Down
35 changes: 6 additions & 29 deletions libbeat/cmd/instance/metrics/metrics_file_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package metrics

import (
"fmt"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
Expand Down Expand Up @@ -49,40 +49,17 @@ func reportFDUsage(_ monitoring.Mode, V monitoring.Visitor) {
}

func getFDUsage() (open, hardLimit, softLimit uint64, err error) {
state, err := getBeatProcessState()
if err != nil {
return 0, 0, 0, err
}

iOpen, err := state.GetValue("fd.open")
state, err := beatProcessStats.GetSelf()
if err != nil {
return 0, 0, 0, fmt.Errorf("error getting number of open FD: %v", err)
return 0, 0, 0, errors.Wrap(err, "error fetching self process")
}

open, ok := iOpen.(uint64)
if !ok {
return 0, 0, 0, fmt.Errorf("error converting value of open FDs to uint64: %v", iOpen)
}
open = state.FD.Open.ValueOr(0)

iHardLimit, err := state.GetValue("fd.limit.hard")
if err != nil {
return 0, 0, 0, fmt.Errorf("error getting FD hard limit: %v", err)
}
hardLimit = state.FD.Limit.Hard.ValueOr(0)

hardLimit, ok = iHardLimit.(uint64)
if !ok {
return 0, 0, 0, fmt.Errorf("error converting values of FD hard limit: %v", iHardLimit)
}

iSoftLimit, err := state.GetValue("fd.limit.soft")
if err != nil {
return 0, 0, 0, fmt.Errorf("error getting FD hard limit: %v", err)
}

softLimit, ok = iSoftLimit.(uint64)
if !ok {
return 0, 0, 0, fmt.Errorf("error converting values of FD hard limit: %v", iSoftLimit)
}
softLimit = state.FD.Limit.Soft.ValueOr(0)

return open, hardLimit, softLimit, nil
}
94 changes: 94 additions & 0 deletions libbeat/metric/system/process/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package process

import (
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/metric/system/numcpu"
"github.com/elastic/beats/v7/libbeat/opt"
)

// unixTimeMsToTime converts a unix time given in milliseconds since Unix epoch
// to a common.Time value.
func unixTimeMsToTime(unixTimeMs uint64) string {
return common.Time(time.Unix(0, int64(unixTimeMs*1000000))).String()
}

func stripNullByte(buf []byte) string {
return string(buf[0 : len(buf)-1])
}

func stripNullByteRaw(buf []byte) []byte {
return buf[0 : len(buf)-1]
}

// GetProcMemPercentage returns process memory usage as a percent of total memory usage
func GetProcMemPercentage(proc ProcState, totalPhyMem uint64) opt.Float {
if totalPhyMem == 0 {
return opt.NewFloatNone()
}

perc := (float64(proc.Memory.Rss.Bytes.ValueOr(0)) / float64(totalPhyMem))

return opt.FloatWith(common.Round(perc, 4))
}

// isProcessInSlice looks up proc in the processes slice and returns if
// found or not
func isProcessInSlice(processes []ProcState, proc *ProcState) bool {
for _, p := range processes {
if p.Pid == proc.Pid {
return true
}
}
return false
}

// GetProcCPUPercentage returns the percentage of total CPU time consumed by
// the process during the period between the given samples. Two percentages are
// returned (these must be multiplied by 100). The first is a normalized based
// on the number of cores such that the value ranges on [0, 1]. The second is
// not normalized and the value ranges on [0, number_of_cores].
//
// Implementation note: The total system CPU time (including idle) is not
// provided so this method will resort to using the difference in wall-clock
// time multiplied by the number of cores as the total amount of CPU time
// available between samples. This could result in incorrect percentages if the
// wall-clock is adjusted (prior to Go 1.9) or the machine is suspended.
func GetProcCPUPercentage(s0, s1 ProcState) ProcState {
// Skip if we're missing the total ticks
if s0.CPU.Total.Ticks.IsZero() || s1.CPU.Total.Ticks.IsZero() {
return s1
}

timeDelta := s1.SampleTime.Sub(s0.SampleTime)
timeDeltaMillis := timeDelta / time.Millisecond
totalCPUDeltaMillis := int64(s1.CPU.Total.Ticks.ValueOr(0) - s0.CPU.Total.Ticks.ValueOr(0))
cmacknz marked this conversation as resolved.
Show resolved Hide resolved

pct := float64(totalCPUDeltaMillis) / float64(timeDeltaMillis)
normalizedPct := pct / float64(numcpu.NumCPU())

s1.CPU.Total.Norm.Pct = opt.FloatWith(common.Round(normalizedPct, common.DefaultDecimalPlacesCount))
s1.CPU.Total.Pct = opt.FloatWith(common.Round(pct, common.DefaultDecimalPlacesCount))
s1.CPU.Total.Value = opt.FloatWith(common.Round(float64(s1.CPU.Total.Ticks.ValueOr(0)), common.DefaultDecimalPlacesCount))

return s1

}
Loading