Skip to content

Commit

Permalink
Change host metrics receiver to use the new metrics internal structs
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Aug 31, 2020
1 parent 52c9e7f commit db86c70
Show file tree
Hide file tree
Showing 55 changed files with 679 additions and 704 deletions.
24 changes: 20 additions & 4 deletions consumer/pdata/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,9 @@ func (mdt MetricDataType) String() string {
return ""
}

// Type returns the type of the data for this Metric.
// DataType returns the type of the data for this Metric.
// Calling this function on zero-initialized Metric will cause a panic.
func (ms Metric) DataType() MetricDataType {
if *ms.orig == nil || (*ms.orig).Data == nil {
return MetricDataTypeNone
}
switch (*ms.orig).Data.(type) {
case *otlpmetrics.Metric_IntGauge:
return MetricDataTypeIntGauge
Expand All @@ -99,6 +96,25 @@ func (ms Metric) DataType() MetricDataType {
return MetricDataTypeNone
}

// SetDataType clears any existing data and initialize it with an empty data of the given type.
// Calling this function on zero-initialized Metric will cause a panic.
func (ms Metric) SetDataType(ty MetricDataType) {
switch ty {
case MetricDataTypeIntGauge:
(*ms.orig).Data = &otlpmetrics.Metric_IntGauge{}
case MetricDataTypeDoubleGauge:
(*ms.orig).Data = &otlpmetrics.Metric_DoubleGauge{}
case MetricDataTypeIntSum:
(*ms.orig).Data = &otlpmetrics.Metric_IntSum{}
case MetricDataTypeDoubleSum:
(*ms.orig).Data = &otlpmetrics.Metric_DoubleSum{}
case MetricDataTypeIntHistogram:
(*ms.orig).Data = &otlpmetrics.Metric_IntHistogram{}
case MetricDataTypeDoubleHistogram:
(*ms.orig).Data = &otlpmetrics.Metric_DoubleHistogram{}
}
}

// IntGauge returns the data as IntGauge. This should be called iff DataType() == MetricDataTypeIntGauge.
// Calling this function on zero-initialized Metric will cause a panic.
func (ms Metric) IntGauge() IntGauge {
Expand Down
10 changes: 5 additions & 5 deletions receiver/hostmetricsreceiver/hostmetrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/dataold"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
)

Expand Down Expand Up @@ -162,7 +162,7 @@ func (hmr *receiver) scrapeMetrics(ctx context.Context) {
defer span.End()

var errors []error
metricData := dataold.NewMetricData()
metricData := data.NewMetricData()

if err := hmr.scrapeAndAppendHostMetrics(ctx, metricData); err != nil {
errors = append(errors, err)
Expand All @@ -176,13 +176,13 @@ func (hmr *receiver) scrapeMetrics(ctx context.Context) {
span.SetStatus(trace.Status{Code: trace.StatusCodeDataLoss, Message: fmt.Sprintf("Error(s) when scraping metrics: %v", componenterror.CombineErrors(errors))})
}

if err := hmr.consumer.ConsumeMetrics(ctx, pdatautil.MetricsFromOldInternalMetrics(metricData)); err != nil {
if err := hmr.consumer.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(metricData)); err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeDataLoss, Message: fmt.Sprintf("Unable to process metrics: %v", err)})
return
}
}

func (hmr *receiver) scrapeAndAppendHostMetrics(ctx context.Context, metricData dataold.MetricData) error {
func (hmr *receiver) scrapeAndAppendHostMetrics(ctx context.Context, metricData data.MetricData) error {
if len(hmr.hostMetricScrapers) == 0 {
return nil
}
Expand All @@ -202,7 +202,7 @@ func (hmr *receiver) scrapeAndAppendHostMetrics(ctx context.Context, metricData
return componenterror.CombineErrors(errors)
}

func (hmr *receiver) scrapeAndAppendResourceMetrics(ctx context.Context, metricData dataold.MetricData) error {
func (hmr *receiver) scrapeAndAppendResourceMetrics(ctx context.Context, metricData data.MetricData) error {
if len(hmr.resourceMetricScrapers) == 0 {
return nil
}
Expand Down
33 changes: 16 additions & 17 deletions receiver/hostmetricsreceiver/hostmetrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/dataold"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/cpuscraper"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/diskscraper"
Expand Down Expand Up @@ -141,7 +140,7 @@ func TestGatherMetrics_EndToEnd(t *testing.T) {
}

func assertIncludesStandardMetrics(t *testing.T, got pdata.Metrics) {
md := pdatautil.MetricsToOldInternalMetrics(got)
md := pdatautil.MetricsToInternalMetrics(got)

// get the first ResourceMetrics object
rms := md.ResourceMetrics()
Expand All @@ -165,7 +164,7 @@ func assertIncludesResourceMetrics(t *testing.T, got pdata.Metrics) {
return
}

md := pdatautil.MetricsToOldInternalMetrics(got)
md := pdatautil.MetricsToInternalMetrics(got)

// get the superset of metrics returned by all resource metrics (excluding the first)
returnedMetrics := make(map[string]struct{})
Expand All @@ -183,16 +182,16 @@ func assertIncludesResourceMetrics(t *testing.T, got pdata.Metrics) {
}
}

func getMetricSlice(t *testing.T, rm dataold.ResourceMetrics) dataold.MetricSlice {
func getMetricSlice(t *testing.T, rm pdata.ResourceMetrics) pdata.MetricSlice {
ilms := rm.InstrumentationLibraryMetrics()
require.Equal(t, 1, ilms.Len())
return ilms.At(0).Metrics()
}

func getReturnedMetricNames(metrics dataold.MetricSlice) map[string]struct{} {
func getReturnedMetricNames(metrics pdata.MetricSlice) map[string]struct{} {
metricNames := make(map[string]struct{})
for i := 0; i < metrics.Len(); i++ {
metricNames[metrics.At(i).MetricDescriptor().Name()] = struct{}{}
metricNames[metrics.At(i).Name()] = struct{}{}
}
return metricNames
}
Expand All @@ -212,30 +211,30 @@ type mockFactory struct{ mock.Mock }
type mockScraper struct{ mock.Mock }

func (m *mockFactory) CreateDefaultConfig() internal.Config { return &mockConfig{} }
func (m *mockFactory) CreateMetricsScraper(ctx context.Context, logger *zap.Logger, cfg internal.Config) (internal.Scraper, error) {
func (m *mockFactory) CreateMetricsScraper(context.Context, *zap.Logger, internal.Config) (internal.Scraper, error) {
args := m.MethodCalled("CreateMetricsScraper")
return args.Get(0).(internal.Scraper), args.Error(1)
}

func (m *mockScraper) Initialize(ctx context.Context) error { return nil }
func (m *mockScraper) Close(ctx context.Context) error { return nil }
func (m *mockScraper) ScrapeMetrics(ctx context.Context) (dataold.MetricSlice, error) {
return dataold.NewMetricSlice(), errors.New("err1")
func (m *mockScraper) Initialize(context.Context) error { return nil }
func (m *mockScraper) Close(context.Context) error { return nil }
func (m *mockScraper) ScrapeMetrics(context.Context) (pdata.MetricSlice, error) {
return pdata.NewMetricSlice(), errors.New("err1")
}

type mockResourceFactory struct{ mock.Mock }
type mockResourceScraper struct{ mock.Mock }

func (m *mockResourceFactory) CreateDefaultConfig() internal.Config { return &mockConfig{} }
func (m *mockResourceFactory) CreateMetricsScraper(ctx context.Context, logger *zap.Logger, cfg internal.Config) (internal.ResourceScraper, error) {
func (m *mockResourceFactory) CreateMetricsScraper(context.Context, *zap.Logger, internal.Config) (internal.ResourceScraper, error) {
args := m.MethodCalled("CreateMetricsScraper")
return args.Get(0).(internal.ResourceScraper), args.Error(1)
}

func (m *mockResourceScraper) Initialize(ctx context.Context) error { return nil }
func (m *mockResourceScraper) Close(ctx context.Context) error { return nil }
func (m *mockResourceScraper) ScrapeMetrics(ctx context.Context) (dataold.ResourceMetricsSlice, error) {
return dataold.NewResourceMetricsSlice(), errors.New("err2")
func (m *mockResourceScraper) Initialize(context.Context) error { return nil }
func (m *mockResourceScraper) Close(context.Context) error { return nil }
func (m *mockResourceScraper) ScrapeMetrics(context.Context) (pdata.ResourceMetricsSlice, error) {
return pdata.NewResourceMetricsSlice(), errors.New("err2")
}

func TestGatherMetrics_ScraperKeyConfigError(t *testing.T) {
Expand Down Expand Up @@ -301,7 +300,7 @@ func TestGatherMetrics_Error(t *testing.T) {

// expect to get one empty resource metrics entry
require.Equal(t, 1, len(got))
rm := pdatautil.MetricsToOldInternalMetrics(got[0]).ResourceMetrics()
rm := pdatautil.MetricsToInternalMetrics(got[0]).ResourceMetrics()
require.Equal(t, 1, rm.Len())
ilm := rm.At(0).InstrumentationLibraryMetrics()
require.Equal(t, 1, ilm.Len())
Expand Down
6 changes: 3 additions & 3 deletions receiver/hostmetricsreceiver/internal/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

"go.uber.org/zap"

"go.opentelemetry.io/collector/internal/dataold"
"go.opentelemetry.io/collector/consumer/pdata"
)

// BaseScraper gathers metrics from the host machine.
Expand All @@ -45,7 +45,7 @@ type Scraper interface {
// ScrapeMetrics returns relevant scraped metrics. If errors occur
// scraping some metrics, an error should be returned, but any
// metrics that were successfully scraped should still be returned.
ScrapeMetrics(ctx context.Context) (dataold.MetricSlice, error)
ScrapeMetrics(ctx context.Context) (pdata.MetricSlice, error)
}

// ScraperFactory can create a MetricScraper.
Expand All @@ -66,7 +66,7 @@ type ResourceScraper interface {
// If errors occur scraping some metrics, an error should be
// returned, but any metrics that were successfully scraped
// should still be returned.
ScrapeMetrics(ctx context.Context) (dataold.ResourceMetricsSlice, error)
ScrapeMetrics(ctx context.Context) (pdata.ResourceMetricsSlice, error)
}

// ResourceScraperFactory can create a ResourceScraper.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package cpuscraper

import (
"go.opentelemetry.io/collector/internal/dataold"
"go.opentelemetry.io/collector/consumer/pdata"
)

// labels
Expand All @@ -40,12 +40,16 @@ const (

// descriptors

var cpuTimeDescriptor = func() dataold.MetricDescriptor {
descriptor := dataold.NewMetricDescriptor()
descriptor.InitEmpty()
descriptor.SetName("system.cpu.time")
descriptor.SetDescription("Total CPU seconds broken down by different states.")
descriptor.SetUnit("s")
descriptor.SetType(dataold.MetricTypeMonotonicDouble)
return descriptor
var cpuTimeDescriptor = func() pdata.Metric {
metric := pdata.NewMetric()
metric.InitEmpty()
metric.SetName("system.cpu.time")
metric.SetDescription("Total CPU seconds broken down by different states.")
metric.SetUnit("s")
metric.SetDataType(pdata.MetricDataTypeDoubleSum)
sum := metric.DoubleSum()
sum.InitEmpty()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pdata.AggregationTemporalityCumulative)
return metric
}()
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/shirou/gopsutil/host"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/dataold"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
)

Expand Down Expand Up @@ -58,8 +57,8 @@ func (s *scraper) Close(_ context.Context) error {
}

// ScrapeMetrics
func (s *scraper) ScrapeMetrics(_ context.Context) (dataold.MetricSlice, error) {
metrics := dataold.NewMetricSlice()
func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) {
metrics := pdata.NewMetricSlice()

now := internal.TimeToUnixNano(time.Now())
cpuTimes, err := s.times( /*percpu=*/ true)
Expand All @@ -72,10 +71,10 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (dataold.MetricSlice, error)
return metrics, nil
}

func initializeCPUTimeMetric(metric dataold.Metric, startTime, now pdata.TimestampUnixNano, cpuTimes []cpu.TimesStat) {
cpuTimeDescriptor.CopyTo(metric.MetricDescriptor())
func initializeCPUTimeMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, cpuTimes []cpu.TimesStat) {
cpuTimeDescriptor.CopyTo(metric)

ddps := metric.DoubleDataPoints()
ddps := metric.DoubleSum().DataPoints()
ddps.Resize(len(cpuTimes) * cpuStatesLen)
for i, cpuTime := range cpuTimes {
appendCPUTimeStateDataPoints(ddps, i*cpuStatesLen, startTime, now, cpuTime)
Expand All @@ -84,7 +83,7 @@ func initializeCPUTimeMetric(metric dataold.Metric, startTime, now pdata.Timesta

const gopsCPUTotal string = "cpu-total"

func initializeCPUTimeDataPoint(dataPoint dataold.DoubleDataPoint, startTime, now pdata.TimestampUnixNano, cpuLabel string, stateLabel string, value float64) {
func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime, now pdata.TimestampUnixNano, cpuLabel string, stateLabel string, value float64) {
labelsMap := dataPoint.LabelsMap()
// ignore cpu label if reporting "total" cpu usage
if cpuLabel != gopsCPUTotal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"github.com/shirou/gopsutil/cpu"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/dataold"
)

const cpuStatesLen = 8

func appendCPUTimeStateDataPoints(ddps dataold.DoubleDataPointSlice, startIdx int, startTime, now pdata.TimestampUnixNano, cpuTime cpu.TimesStat) {
func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startIdx int, startTime, now pdata.TimestampUnixNano, cpuTime cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(startIdx+0), startTime, now, cpuTime.CPU, userStateLabelValue, cpuTime.User)
initializeCPUTimeDataPoint(ddps.At(startIdx+1), startTime, now, cpuTime.CPU, systemStateLabelValue, cpuTime.System)
initializeCPUTimeDataPoint(ddps.At(startIdx+2), startTime, now, cpuTime.CPU, idleStateLabelValue, cpuTime.Idle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"github.com/shirou/gopsutil/cpu"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/dataold"
)

const cpuStatesLen = 4

func appendCPUTimeStateDataPoints(ddps dataold.DoubleDataPointSlice, startIdx int, startTime, now pdata.TimestampUnixNano, cpuTime cpu.TimesStat) {
func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startIdx int, startTime, now pdata.TimestampUnixNano, cpuTime cpu.TimesStat) {
initializeCPUTimeDataPoint(ddps.At(startIdx+0), startTime, now, cpuTime.CPU, userStateLabelValue, cpuTime.User)
initializeCPUTimeDataPoint(ddps.At(startIdx+1), startTime, now, cpuTime.CPU, systemStateLabelValue, cpuTime.System)
initializeCPUTimeDataPoint(ddps.At(startIdx+2), startTime, now, cpuTime.CPU, idleStateLabelValue, cpuTime.Idle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/dataold"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
)

Expand Down Expand Up @@ -98,22 +97,22 @@ func TestScrapeMetrics(t *testing.T) {
}
}

func assertCPUMetricValid(t *testing.T, metric dataold.Metric, descriptor dataold.MetricDescriptor, startTime pdata.TimestampUnixNano) {
internal.AssertDescriptorEqual(t, descriptor, metric.MetricDescriptor())
func assertCPUMetricValid(t *testing.T, metric pdata.Metric, descriptor pdata.Metric, startTime pdata.TimestampUnixNano) {
internal.AssertDescriptorEqual(t, descriptor, metric)
if startTime != 0 {
internal.AssertDoubleMetricStartTimeEquals(t, metric, startTime)
internal.AssertDoubleSumMetricStartTimeEquals(t, metric, startTime)
}
assert.GreaterOrEqual(t, metric.DoubleDataPoints().Len(), 4*runtime.NumCPU())
internal.AssertDoubleMetricLabelExists(t, metric, 0, cpuLabelName)
internal.AssertDoubleMetricLabelHasValue(t, metric, 0, stateLabelName, userStateLabelValue)
internal.AssertDoubleMetricLabelHasValue(t, metric, 1, stateLabelName, systemStateLabelValue)
internal.AssertDoubleMetricLabelHasValue(t, metric, 2, stateLabelName, idleStateLabelValue)
internal.AssertDoubleMetricLabelHasValue(t, metric, 3, stateLabelName, interruptStateLabelValue)
assert.GreaterOrEqual(t, metric.DoubleSum().DataPoints().Len(), 4*runtime.NumCPU())
internal.AssertDoubleSumMetricLabelExists(t, metric, 0, cpuLabelName)
internal.AssertDoubleSumMetricLabelHasValue(t, metric, 0, stateLabelName, userStateLabelValue)
internal.AssertDoubleSumMetricLabelHasValue(t, metric, 1, stateLabelName, systemStateLabelValue)
internal.AssertDoubleSumMetricLabelHasValue(t, metric, 2, stateLabelName, idleStateLabelValue)
internal.AssertDoubleSumMetricLabelHasValue(t, metric, 3, stateLabelName, interruptStateLabelValue)
}

func assertCPUMetricHasLinuxSpecificStateLabels(t *testing.T, metric dataold.Metric) {
internal.AssertDoubleMetricLabelHasValue(t, metric, 4, stateLabelName, niceStateLabelValue)
internal.AssertDoubleMetricLabelHasValue(t, metric, 5, stateLabelName, softIRQStateLabelValue)
internal.AssertDoubleMetricLabelHasValue(t, metric, 6, stateLabelName, stealStateLabelValue)
internal.AssertDoubleMetricLabelHasValue(t, metric, 7, stateLabelName, waitStateLabelValue)
func assertCPUMetricHasLinuxSpecificStateLabels(t *testing.T, metric pdata.Metric) {
internal.AssertDoubleSumMetricLabelHasValue(t, metric, 4, stateLabelName, niceStateLabelValue)
internal.AssertDoubleSumMetricLabelHasValue(t, metric, 5, stateLabelName, softIRQStateLabelValue)
internal.AssertDoubleSumMetricLabelHasValue(t, metric, 6, stateLabelName, stealStateLabelValue)
internal.AssertDoubleSumMetricLabelHasValue(t, metric, 7, stateLabelName, waitStateLabelValue)
}
Loading

0 comments on commit db86c70

Please sign in to comment.