Skip to content

Commit

Permalink
[receiver/hostmetrics] Migrate disk scraper to the new metrics builder
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Dec 20, 2021
1 parent 4854bf4 commit 58c4fb2
Show file tree
Hide file tree
Showing 14 changed files with 569 additions and 393 deletions.
2 changes: 1 addition & 1 deletion receiver/hostmetricsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestLoadConfig(t *testing.T) {
},
Scrapers: map[string]internal.Config{
cpuscraper.TypeStr: (&cpuscraper.Factory{}).CreateDefaultConfig(),
diskscraper.TypeStr: &diskscraper.Config{},
diskscraper.TypeStr: (&diskscraper.Factory{}).CreateDefaultConfig(),
loadscraper.TypeStr: &loadscraper.Config{},
filesystemscraper.TypeStr: &filesystemscraper.Config{},
memoryscraper.TypeStr: &memoryscraper.Config{},
Expand Down
2 changes: 1 addition & 1 deletion receiver/hostmetricsreceiver/hostmetrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestGatherMetrics_EndToEnd(t *testing.T) {
},
Scrapers: map[string]internal.Config{
cpuscraper.TypeStr: scraperFactories[cpuscraper.TypeStr].CreateDefaultConfig(),
diskscraper.TypeStr: &diskscraper.Config{},
diskscraper.TypeStr: scraperFactories[diskscraper.TypeStr].CreateDefaultConfig(),
filesystemscraper.TypeStr: &filesystemscraper.Config{},
loadscraper.TypeStr: &loadscraper.Config{},
memoryscraper.TypeStr: &memoryscraper.Config{},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
//go:build !windows
// +build !windows

//go:generate mdatagen metadata.yaml
//go:generate mdatagen --experimental-gen metadata.yaml

package diskscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/diskscraper"
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ package diskscraper // import "github.com/open-telemetry/opentelemetry-collector
import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/diskscraper/internal/metadata"
)

// Config relating to Disk Metric Scraper.
type Config struct {
internal.ConfigSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Metrics allows to customize scraped metrics representation.
Metrics metadata.MetricsSettings `mapstructure:"metrics"`

// Include specifies a filter on the devices that should be included from the generated metrics.
// Exclude specifies a filter on the devices that should be excluded from the generated metrics.
// If neither `include` or `exclude` are set, metrics will be generated for all devices.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
type scraper struct {
config *Config
startTime pdata.Timestamp
mb *metadata.MetricsBuilder
includeFS filterset.FilterSet
excludeFS filterset.FilterSet

Expand Down Expand Up @@ -79,6 +80,7 @@ func (s *scraper) start(context.Context, component.Host) error {
}

s.startTime = pdata.Timestamp(bootTime * 1e9)
s.mb = metadata.NewMetricsBuilder(s.config.Metrics, metadata.WithStartTime(s.startTime))
return nil
}

Expand All @@ -97,81 +99,51 @@ func (s *scraper) scrape(_ context.Context) (pdata.Metrics, error) {

if len(ioCounters) > 0 {
metrics.EnsureCapacity(metricsLen)
initializeDiskIOMetric(metrics.AppendEmpty(), s.startTime, now, ioCounters)
initializeDiskOperationsMetric(metrics.AppendEmpty(), s.startTime, now, ioCounters)
initializeDiskIOTimeMetric(metrics.AppendEmpty(), s.startTime, now, ioCounters)
initializeDiskOperationTimeMetric(metrics.AppendEmpty(), s.startTime, now, ioCounters)
initializeDiskPendingOperationsMetric(metrics.AppendEmpty(), now, ioCounters)
appendSystemSpecificMetrics(metrics, s.startTime, now, ioCounters)
s.recordDiskIOMetric(now, ioCounters)
s.recordDiskOperationsMetric(now, ioCounters)
s.recordDiskIOTimeMetric(now, ioCounters)
s.recordDiskOperationTimeMetric(now, ioCounters)
s.recordDiskPendingOperationsMetric(now, ioCounters)
s.recordSystemSpecificDataPoints(now, ioCounters)
s.mb.Emit(metrics)
}

return md, nil
}

func initializeDiskIOMetric(metric pdata.Metric, startTime, now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
metadata.Metrics.SystemDiskIo.Init(metric)

idps := metric.Sum().DataPoints()
idps.EnsureCapacity(2 * len(ioCounters))

func (s *scraper) recordDiskIOMetric(now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
initializeNumberDataPointAsInt(idps.AppendEmpty(), startTime, now, device, metadata.AttributeDirection.Read, int64(ioCounter.ReadBytes))
initializeNumberDataPointAsInt(idps.AppendEmpty(), startTime, now, device, metadata.AttributeDirection.Write, int64(ioCounter.WriteBytes))
s.mb.RecordSystemDiskIoDataPoint(now, int64(ioCounter.ReadBytes), device, metadata.AttributeDirection.Read)
s.mb.RecordSystemDiskIoDataPoint(now, int64(ioCounter.WriteBytes), device, metadata.AttributeDirection.Write)
}
}

func initializeDiskOperationsMetric(metric pdata.Metric, startTime, now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
metadata.Metrics.SystemDiskOperations.Init(metric)

idps := metric.Sum().DataPoints()
idps.EnsureCapacity(2 * len(ioCounters))

func (s *scraper) recordDiskOperationsMetric(now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
initializeNumberDataPointAsInt(idps.AppendEmpty(), startTime, now, device, metadata.AttributeDirection.Read, int64(ioCounter.ReadCount))
initializeNumberDataPointAsInt(idps.AppendEmpty(), startTime, now, device, metadata.AttributeDirection.Write, int64(ioCounter.WriteCount))
s.mb.RecordSystemDiskOperationsDataPoint(now, int64(ioCounter.ReadCount), device, metadata.AttributeDirection.Read)
s.mb.RecordSystemDiskOperationsDataPoint(now, int64(ioCounter.WriteCount), device, metadata.AttributeDirection.Write)
}
}

func initializeDiskIOTimeMetric(metric pdata.Metric, startTime, now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
metadata.Metrics.SystemDiskIoTime.Init(metric)

ddps := metric.Sum().DataPoints()
ddps.EnsureCapacity(len(ioCounters))

func (s *scraper) recordDiskIOTimeMetric(now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
initializeNumberDataPointAsDouble(ddps.AppendEmpty(), startTime, now, device, "", float64(ioCounter.IoTime)/1e3)
s.mb.RecordSystemDiskIoTimeDataPoint(now, float64(ioCounter.IoTime)/1e3, device)
}
}

func initializeDiskOperationTimeMetric(metric pdata.Metric, startTime, now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
metadata.Metrics.SystemDiskOperationTime.Init(metric)

ddps := metric.Sum().DataPoints()
ddps.EnsureCapacity(2 * len(ioCounters))

func (s *scraper) recordDiskOperationTimeMetric(now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
initializeNumberDataPointAsDouble(ddps.AppendEmpty(), startTime, now, device, metadata.AttributeDirection.Read, float64(ioCounter.ReadTime)/1e3)
initializeNumberDataPointAsDouble(ddps.AppendEmpty(), startTime, now, device, metadata.AttributeDirection.Write, float64(ioCounter.WriteTime)/1e3)
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(ioCounter.ReadTime)/1e3, device, metadata.AttributeDirection.Read)
s.mb.RecordSystemDiskOperationTimeDataPoint(now, float64(ioCounter.WriteTime)/1e3, device, metadata.AttributeDirection.Write)
}
}

func initializeDiskPendingOperationsMetric(metric pdata.Metric, now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
metadata.Metrics.SystemDiskPendingOperations.Init(metric)

idps := metric.Sum().DataPoints()
idps.EnsureCapacity(len(ioCounters))

func (s *scraper) recordDiskPendingOperationsMetric(now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
initializeDiskPendingDataPoint(idps.AppendEmpty(), now, device, int64(ioCounter.IopsInProgress))
s.mb.RecordSystemDiskPendingOperationsDataPoint(now, int64(ioCounter.IopsInProgress), device)
}
}

func initializeDiskPendingDataPoint(dataPoint pdata.NumberDataPoint, now pdata.Timestamp, deviceLabel string, value int64) {
dataPoint.Attributes().InsertString(metadata.Attributes.Device, deviceLabel)
dataPoint.SetTimestamp(now)
dataPoint.SetIntVal(value)
}

func (s *scraper) filterByDevice(ioCounters map[string]disk.IOCountersStat) map[string]disk.IOCountersStat {
if s.includeFS == nil && s.excludeFS == nil {
return ioCounters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ import (

const systemSpecificMetricsLen = 0

func appendSystemSpecificMetrics(metrics pdata.MetricSlice, startTime, now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
func (s *scraper) recordSystemSpecificDataPoints(now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,20 @@ import (

const systemSpecificMetricsLen = 2

func appendSystemSpecificMetrics(metrics pdata.MetricSlice, startTime, now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
initializeDiskWeightedIOTimeMetric(metrics.AppendEmpty(), startTime, now, ioCounters)
initializeDiskMergedMetric(metrics.AppendEmpty(), startTime, now, ioCounters)
func (s *scraper) recordSystemSpecificDataPoints(now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
s.recordDiskWeightedIOTimeMetric(now, ioCounters)
s.recordDiskMergedMetric(now, ioCounters)
}

func initializeDiskWeightedIOTimeMetric(metric pdata.Metric, startTime, now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
metadata.Metrics.SystemDiskWeightedIoTime.Init(metric)

ddps := metric.Sum().DataPoints()
ddps.EnsureCapacity(len(ioCounters))

func (s *scraper) recordDiskWeightedIOTimeMetric(now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
initializeNumberDataPointAsDouble(ddps.AppendEmpty(), startTime, now, device, "", float64(ioCounter.WeightedIO)/1e3)
s.mb.RecordSystemDiskWeightedIoTimeDataPoint(now, float64(ioCounter.WeightedIO)/1e3, device)
}
}

func initializeDiskMergedMetric(metric pdata.Metric, startTime, now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
metadata.Metrics.SystemDiskMerged.Init(metric)

idps := metric.Sum().DataPoints()
idps.EnsureCapacity(2 * len(ioCounters))

func (s *scraper) recordDiskMergedMetric(now pdata.Timestamp, ioCounters map[string]disk.IOCountersStat) {
for device, ioCounter := range ioCounters {
initializeNumberDataPointAsInt(idps.AppendEmpty(), startTime, now, device, metadata.AttributeDirection.Read, int64(ioCounter.MergedReadCount))
initializeNumberDataPointAsInt(idps.AppendEmpty(), startTime, now, device, metadata.AttributeDirection.Write, int64(ioCounter.MergedWriteCount))
s.mb.RecordSystemDiskMergedDataPoint(now, int64(ioCounter.MergedReadCount), device, metadata.AttributeDirection.Read)
s.mb.RecordSystemDiskMergedDataPoint(now, int64(ioCounter.MergedWriteCount), device, metadata.AttributeDirection.Write)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package diskscraper
import (
"context"
"errors"
"runtime"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -37,41 +36,63 @@ func TestScrape(t *testing.T) {
bootTimeFunc func() (uint64, error)
newErrRegex string
initializationErr string
expectMetrics bool
expectMetrics int
expectedStartTime pdata.Timestamp
}

testCases := []testCase{
{
name: "Standard",
expectMetrics: true,
config: Config{Metrics: metadata.DefaultMetricsSettings()},
expectMetrics: metricsLen,
},
{
name: "Validate Start Time",
config: Config{Metrics: metadata.DefaultMetricsSettings()},
bootTimeFunc: func() (uint64, error) { return 100, nil },
expectMetrics: true,
expectMetrics: metricsLen,
expectedStartTime: 100 * 1e9,
},
{
name: "Boot Time Error",
config: Config{Metrics: metadata.DefaultMetricsSettings()},
bootTimeFunc: func() (uint64, error) { return 0, errors.New("err1") },
initializationErr: "err1",
expectMetrics: metricsLen,
},
{
name: "Include Filter that matches nothing",
config: Config{Include: MatchConfig{filterset.Config{MatchType: "strict"}, []string{"@*^#&*$^#)"}}},
expectMetrics: false,
name: "Include Filter that matches nothing",
config: Config{
Metrics: metadata.DefaultMetricsSettings(),
Include: MatchConfig{filterset.Config{MatchType: "strict"}, []string{"@*^#&*$^#)"}},
},
expectMetrics: 0,
},
{
name: "Invalid Include Filter",
config: Config{Include: MatchConfig{Devices: []string{"test"}}},
name: "Invalid Include Filter",
config: Config{
Metrics: metadata.DefaultMetricsSettings(),
Include: MatchConfig{Devices: []string{"test"}},
},
newErrRegex: "^error creating device include filters:",
},
{
name: "Invalid Exclude Filter",
config: Config{Exclude: MatchConfig{Devices: []string{"test"}}},
name: "Invalid Exclude Filter",
config: Config{
Metrics: metadata.DefaultMetricsSettings(),
Exclude: MatchConfig{Devices: []string{"test"}},
},
newErrRegex: "^error creating device exclude filters:",
},
{
name: "Disable one metric",
config: (func() Config {
config := Config{Metrics: metadata.DefaultMetricsSettings()}
config.Metrics.SystemDiskIo.Enabled = false
return config
})(),
expectMetrics: metricsLen - 1,
},
}

for _, test := range testCases {
Expand All @@ -98,32 +119,45 @@ func TestScrape(t *testing.T) {
md, err := scraper.scrape(context.Background())
require.NoError(t, err, "Failed to scrape metrics: %v", err)

if !test.expectMetrics {
assert.Equal(t, 0, md.MetricCount())
return
}

assert.Equal(t, test.expectMetrics, md.MetricCount())
metrics := md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics()
assert.Equal(t, metricsLen, metrics.Len())

assertInt64DiskMetricValid(t, metrics.At(0), metadata.Metrics.SystemDiskIo.New(), test.expectedStartTime)
assertInt64DiskMetricValid(t, metrics.At(1), metadata.Metrics.SystemDiskOperations.New(), test.expectedStartTime)
assertDoubleDiskMetricValid(t, metrics.At(2), metadata.Metrics.SystemDiskIoTime.New(), false, test.expectedStartTime)
assertDoubleDiskMetricValid(t, metrics.At(3), metadata.Metrics.SystemDiskOperationTime.New(), true, test.expectedStartTime)
assertDiskPendingOperationsMetricValid(t, metrics.At(4))

if runtime.GOOS == "linux" {
assertDoubleDiskMetricValid(t, metrics.At(5), metadata.Metrics.SystemDiskWeightedIoTime.New(), false, test.expectedStartTime)
assertInt64DiskMetricValid(t, metrics.At(6), metadata.Metrics.SystemDiskMerged.New(), test.expectedStartTime)
assert.Equal(t, test.expectMetrics, metrics.Len())

reportedMetricsCount := map[string]int{}
for i := 0; i < metrics.Len(); i++ {
metric := metrics.At(i)
reportedMetricsCount[metric.Name()]++
switch metric.Name() {
case "system.disk.io":
assertInt64DiskMetricValid(t, metric, test.expectedStartTime)
case "system.disk.io_time":
assertDoubleDiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.operation_time":
assertDoubleDiskMetricValid(t, metric, true, test.expectedStartTime)
case "system.disk.operations":
assertInt64DiskMetricValid(t, metric, test.expectedStartTime)
case "system.disk.weighted.io.time":
assertDoubleDiskMetricValid(t, metric, false, test.expectedStartTime)
case "system.disk.merged":
assertInt64DiskMetricValid(t, metric, test.expectedStartTime)
case "system.disk.pending_operations":
assertDiskPendingOperationsMetricValid(t, metric)
case "system.disk.weighted_io_time":
assertDoubleDiskMetricValid(t, metric, false, test.expectedStartTime)
default:
assert.Failf(t, "unexpected-metric", "metric %q is not expected", metric.Name())
}
}
for m, c := range reportedMetricsCount {
assert.Equal(t, 1, c, "metric %q reported %d times", m, c)
}

internal.AssertSameTimeStampForAllMetrics(t, metrics)
})
}
}

func assertInt64DiskMetricValid(t *testing.T, metric pdata.Metric, expectedDescriptor pdata.Metric, startTime pdata.Timestamp) {
internal.AssertDescriptorEqual(t, expectedDescriptor, metric)
func assertInt64DiskMetricValid(t *testing.T, metric pdata.Metric, startTime pdata.Timestamp) {
if startTime != 0 {
internal.AssertSumMetricStartTimeEquals(t, metric, startTime)
}
Expand All @@ -135,8 +169,7 @@ func assertInt64DiskMetricValid(t *testing.T, metric pdata.Metric, expectedDescr
internal.AssertSumMetricHasAttributeValue(t, metric, 1, "direction", pdata.NewAttributeValueString(metadata.AttributeDirection.Write))
}

func assertDoubleDiskMetricValid(t *testing.T, metric pdata.Metric, expectedDescriptor pdata.Metric, expectDirectionLabels bool, startTime pdata.Timestamp) {
internal.AssertDescriptorEqual(t, expectedDescriptor, metric)
func assertDoubleDiskMetricValid(t *testing.T, metric pdata.Metric, expectDirectionLabels bool, startTime pdata.Timestamp) {
if startTime != 0 {
internal.AssertSumMetricStartTimeEquals(t, metric, startTime)
}
Expand All @@ -155,7 +188,6 @@ func assertDoubleDiskMetricValid(t *testing.T, metric pdata.Metric, expectedDesc
}

func assertDiskPendingOperationsMetricValid(t *testing.T, metric pdata.Metric) {
internal.AssertDescriptorEqual(t, metadata.Metrics.SystemDiskPendingOperations.New(), metric)
assert.GreaterOrEqual(t, metric.Sum().DataPoints().Len(), 1)
internal.AssertSumMetricHasAttribute(t, metric, 0, "device")
}
Loading

0 comments on commit 58c4fb2

Please sign in to comment.