Skip to content

Commit

Permalink
Add support for CgroupsV2 in beats, migrate away from gosigar (#27242)
Browse files Browse the repository at this point in the history
* finish first round of cgroupsv2 support

* fix tests

* remove old code, fix PctOpt

* fix imports

* go mod tidy

* try to fix make notice again

* try again

* remove unneeded test files

* fix test paths

* fix crossbuild issues

* fix tests, remove debug statement

* change metadata, fight with mapping defs

* fix v1 test, remove debug line

* somewhat hacky fix for fields issues

* fix v1 fetch, update fields again

* fix omitempty issue

* make update

* remove older test

* fix tests, cgv1 logic

* remove old debug statement

* fix issue with how ubuntu mixes cgroups

* clean up error handling in libbeat

* changes based on feedback, increased error verbosity to try to fix baffling CI errors

* fix fields, add more error messages for weird CI bug

* I give up, add tons of debug statements

* fix issue with docker containers running under hybrid cgroups

* fix hostfs state check

* fix more broken tests

* fix names, log levels

* more changes, docs, test

* still making the mapping checks happy

* fix libbeat code I broke
  • Loading branch information
fearful-symmetry authored Aug 12, 2021
1 parent 2650f38 commit d898533
Show file tree
Hide file tree
Showing 277 changed files with 5,970 additions and 695 deletions.
71 changes: 61 additions & 10 deletions libbeat/cmd/instance/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/metric/system/cgroup"
"github.com/elastic/beats/v7/libbeat/metric/system/cpu"
"github.com/elastic/beats/v7/libbeat/metric/system/process"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/paths"
"github.com/elastic/gosigar/cgroup"
)

var (
Expand Down Expand Up @@ -298,11 +298,26 @@ func reportBeatCgroups(_ monitoring.Mode, V monitoring.Visitor) {
}
return
}
selfStats, err := cgroups.GetStatsForProcess(pid)

cgv, err := cgroups.CgroupsVersion(pid)
if err != nil {
logp.Err("error getting cgroup stats: %v", err)
logp.Err("error determining cgroups version: %v", err)
return
}

if cgv == cgroup.CgroupsV1 {
reportMetricsCGV1(pid, cgroups, V)
} else {
reportMetricsCGV2(pid, cgroups, V)
}

}

func reportMetricsCGV1(pid int, cgroups *cgroup.Reader, V monitoring.Visitor) {
selfStats, err := cgroups.GetV1StatsForProcess(pid)
if err != nil {
logp.Err("error getting cgroup stats: %v", err)
}
// GetStatsForProcess returns a nil selfStats and no error when there's no stats
if selfStats == nil {
return
Expand All @@ -315,17 +330,17 @@ func reportBeatCgroups(_ monitoring.Mode, V monitoring.Visitor) {
}
monitoring.ReportNamespace(V, "cfs", func() {
monitoring.ReportNamespace(V, "period", func() {
monitoring.ReportInt(V, "us", int64(cpu.CFS.PeriodMicros))
monitoring.ReportInt(V, "us", int64(cpu.CFS.PeriodMicros.Us))
})
monitoring.ReportNamespace(V, "quota", func() {
monitoring.ReportInt(V, "us", int64(cpu.CFS.QuotaMicros))
monitoring.ReportInt(V, "us", int64(cpu.CFS.QuotaMicros.Us))
})
})
monitoring.ReportNamespace(V, "stats", func() {
monitoring.ReportInt(V, "periods", int64(cpu.Stats.Periods))
monitoring.ReportNamespace(V, "throttled", func() {
monitoring.ReportInt(V, "periods", int64(cpu.Stats.ThrottledPeriods))
monitoring.ReportInt(V, "ns", int64(cpu.Stats.ThrottledTimeNanos))
monitoring.ReportInt(V, "periods", int64(cpu.Stats.Throttled.Periods))
monitoring.ReportInt(V, "ns", int64(cpu.Stats.Throttled.Us))
})
})
})
Expand All @@ -337,7 +352,7 @@ func reportBeatCgroups(_ monitoring.Mode, V monitoring.Visitor) {
monitoring.ReportString(V, "id", cpuacct.ID)
}
monitoring.ReportNamespace(V, "total", func() {
monitoring.ReportInt(V, "ns", int64(cpuacct.TotalNanos))
monitoring.ReportInt(V, "ns", int64(cpuacct.Total.NS))
})
})
}
Expand All @@ -349,12 +364,48 @@ func reportBeatCgroups(_ monitoring.Mode, V monitoring.Visitor) {
}
monitoring.ReportNamespace(V, "mem", func() {
monitoring.ReportNamespace(V, "limit", func() {
monitoring.ReportInt(V, "bytes", int64(memory.Mem.Limit))
monitoring.ReportInt(V, "bytes", int64(memory.Mem.Limit.Bytes))
})
monitoring.ReportNamespace(V, "usage", func() {
monitoring.ReportInt(V, "bytes", int64(memory.Mem.Usage.Bytes))
})
})
})
}
}

func reportMetricsCGV2(pid int, cgroups *cgroup.Reader, V monitoring.Visitor) {
selfStats, err := cgroups.GetV2StatsForProcess(pid)
if err != nil {
logp.Err("error getting cgroup stats: %v", err)
return
}
if cpu := selfStats.CPU; cpu != nil {
monitoring.ReportNamespace(V, "cpu", func() {
if cpu.ID != "" {
monitoring.ReportString(V, "id", cpu.ID)
}
monitoring.ReportNamespace(V, "stats", func() {
monitoring.ReportInt(V, "periods", int64(cpu.Stats.Periods.ValueOr(0)))
monitoring.ReportNamespace(V, "throttled", func() {
monitoring.ReportInt(V, "periods", int64(cpu.Stats.Throttled.Periods.ValueOr(0)))
monitoring.ReportInt(V, "ns", int64(cpu.Stats.Throttled.Us.ValueOr(0)))
})
})
})
}

if memory := selfStats.Memory; memory != nil {
monitoring.ReportNamespace(V, "memory", func() {
if memory.ID != "" {
monitoring.ReportString(V, "id", memory.ID)
}
monitoring.ReportNamespace(V, "mem", func() {
monitoring.ReportNamespace(V, "usage", func() {
monitoring.ReportInt(V, "bytes", int64(memory.Mem.Usage))
monitoring.ReportInt(V, "bytes", int64(memory.Mem.Usage.Bytes))
})
})
})
}

}
88 changes: 88 additions & 0 deletions libbeat/metric/system/cgroup/cgcommon/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cgcommon

import (
"bufio"
"fmt"
"os"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/opt"
)

// CPUUsage wraps the CPU usage time values for the CPU controller metrics
type CPUUsage struct {
NS uint64 `json:"ns" struct:"ns"`
Pct opt.Float `json:"pct,omitempty" struct:"pct,omitempty"`
Norm opt.PctOpt `json:"norm,omitempty" struct:"norm,omitempty"`
}

// Pressure contains load metrics for a controller,
// Broken apart into 10, 60, and 300 second samples,
// as well as a total time in US
type Pressure struct {
Ten opt.Pct `json:"10,omitempty" struct:"10,omitempty"`
Sixty opt.Pct `json:"60,omitempty" struct:"60,omitempty"`
ThreeHundred opt.Pct `json:"300,omitempty" struct:"300,omitempty"`
Total opt.Uint `json:"total,omitempty" struct:"total,omitempty"`
}

// IsZero implements the IsZero interface for Pressure
// This is "all or nothing", as pressure stats don't exist on certain systems
// If `total` doesn't exist, that means there's no pressure metrics.
func (p Pressure) IsZero() bool {
return p.Total.IsZero()
}

// GetPressure takes the path of a *.pressure file and returns a
// map of the pressure (IO contension) stats for the cgroup
// on CPU controllers, the only key will be "some"
// on IO controllers, the keys will be "some" and "full"
// See https://github.com/torvalds/linux/blob/master/Documentation/accounting/psi.rst
func GetPressure(path string) (map[string]Pressure, error) {
pressureData := make(map[string]Pressure)

f, err := os.Open(path)
// pass along any OS open errors directly
if err != nil {
return pressureData, err
}
defer f.Close()

sc := bufio.NewScanner(f)
for sc.Scan() {
var stallTime string
data := Pressure{}
var total uint64
matched, err := fmt.Sscanf(sc.Text(), "%s avg10=%f avg60=%f avg300=%f total=%d", &stallTime, &data.Ten.Pct, &data.Sixty.Pct, &data.ThreeHundred.Pct, &total)
if err != nil {
return pressureData, errors.Wrapf(err, "error scanning file: %s", path)
}
// Assume that if we didn't match at least three numbers, something has gone wrong
if matched < 3 {
return pressureData, fmt.Errorf("Error: only matched %d fields from file %s", matched, path)
}
data.Total = opt.UintWith(total)
pressureData[stallTime] = data

}

return pressureData, nil
}
51 changes: 51 additions & 0 deletions libbeat/metric/system/cgroup/cgcommon/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cgcommon

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/opt"
)

func TestPressure(t *testing.T) {
v2Path := "../testdata/docker/sys/fs/cgroup/system.slice/docker-1c8fa019edd4b9d4b2856f4932c55929c5c118c808ed5faee9a135ca6e84b039.scope"

pressureData, err := GetPressure(filepath.Join(v2Path, "io.pressure"))
assert.NoError(t, err, "error in getPressure")

goodP := map[string]Pressure{
"some": {
Ten: opt.Pct{Pct: 3.00},
Sixty: opt.Pct{Pct: 2.10},
ThreeHundred: opt.Pct{Pct: 4.00},
Total: opt.UintWith(1154482),
},
"full": {
Ten: opt.Pct{Pct: 10},
Sixty: opt.Pct{Pct: 30},
ThreeHundred: opt.Pct{Pct: 0.5},
Total: opt.UintWith(1154482),
},
}

assert.Equal(t, goodP, pressureData, "pressure stats not equal")
}
84 changes: 84 additions & 0 deletions libbeat/metric/system/cgroup/cgcommon/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cgcommon

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
)

var (

// ErrInvalidFormat indicates a malformed key/value pair on a line.
ErrInvalidFormat = errors.New("error invalid key/value format")
)

// ParseUintFromFile reads a single uint value from a file.
func ParseUintFromFile(path ...string) (uint64, error) {
value, err := ioutil.ReadFile(filepath.Join(path...))
if err != nil {
// Not all features are implemented/enabled by each OS.
if os.IsNotExist(err) {
return 0, nil
}
return 0, err
}

return ParseUint(value)
}

// ParseUint reads a single uint value. It will trip any whitespace before
// attempting to parse string. If the value is negative it will return 0.
func ParseUint(value []byte) (uint64, error) {
strValue := string(bytes.TrimSpace(value))
uintValue, err := strconv.ParseUint(strValue, 10, 64)
if err != nil {
// Munge negative values to 0.
intValue, intErr := strconv.ParseInt(strValue, 10, 64)
if intErr == nil && intValue < 0 {
return 0, nil
} else if intErr != nil && intErr.(*strconv.NumError).Err == strconv.ErrRange && intValue < 0 {
return 0, nil
}

return 0, err
}

return uintValue, nil
}

// ParseCgroupParamKeyValue parses a cgroup param and returns the key name and value.
func ParseCgroupParamKeyValue(t string) (string, uint64, error) {
parts := strings.Fields(t)
if len(parts) != 2 {
return "", 0, ErrInvalidFormat
}

value, err := ParseUint([]byte(parts[1]))
if err != nil {
return "", 0, fmt.Errorf("unable to convert param value (%q) to uint64: %v", parts[1], err)
}

return parts[0], value, nil
}
Loading

0 comments on commit d898533

Please sign in to comment.