Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop data points with no value set #339

Merged
merged 1 commit into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@
}
],
"timeUnixNano":"1639079496964000000",
"asDouble":0
"flags":1
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,36 +366,6 @@
}
]
},
{
"metric": {
"type": "workload.googleapis.com/rabbitmq_channel_acks_uncommitted",
"labels": {
"job": "default/rabbitmq/0"
}
},
"resource": {
"type": "k8s_container",
"labels": {
"cluster_name": "rabbitmq-test-dev",
"container_name": "rabbitmq",
"location": "us-central1-c",
"namespace_name": "default",
"pod_name": "rabbitmq-server-0"
}
},
"metricKind": "GAUGE",
"valueType": "DOUBLE",
"points": [
{
"interval": {
"endTime": "1970-01-01T00:00:00Z"
},
"value": {
"doubleValue": 0
}
}
]
},
{
"metric": {
"type": "workload.googleapis.com/rabbitmq_consumer_prefetch",
Expand Down Expand Up @@ -6326,11 +6296,7 @@
}
}
]
}
]
},
{
dashpole marked this conversation as resolved.
Show resolved Hide resolved
"timeSeries": [
},
{
"metric": {
"type": "workload.googleapis.com/erlang_vm_msacc_emulator_seconds_total",
Expand Down Expand Up @@ -6363,7 +6329,11 @@
}
}
]
},
}
]
},
{
"timeSeries": [
{
"metric": {
"type": "workload.googleapis.com/erlang_vm_msacc_emulator_seconds_total",
Expand Down Expand Up @@ -6454,7 +6424,7 @@
"startTime": "1970-01-01T00:00:00Z"
},
"value": {
"int64Value": "203"
dashpole marked this conversation as resolved.
Show resolved Hide resolved
"int64Value": "202"
}
}
]
Expand Down
45 changes: 33 additions & 12 deletions exporter/collector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (m *metricMapper) metricToTimeSeries(
points := gauge.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.gaugePointToTimeSeries(resource, extraLabels, metric, gauge, points.At(i))
timeSeries = append(timeSeries, ts)
timeSeries = append(timeSeries, ts...)
}
case pdata.MetricDataTypeSummary:
summary := metric.Summary()
Expand All @@ -363,14 +363,14 @@ func (m *metricMapper) metricToTimeSeries(
points := hist.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.histogramToTimeSeries(resource, extraLabels, metric, hist, points.At(i))
timeSeries = append(timeSeries, ts)
timeSeries = append(timeSeries, ts...)
}
case pdata.MetricDataTypeExponentialHistogram:
eh := metric.ExponentialHistogram()
points := eh.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.exponentialHistogramToTimeSeries(resource, extraLabels, metric, eh, points.At(i))
timeSeries = append(timeSeries, ts)
timeSeries = append(timeSeries, ts...)
}
default:
m.obs.log.Error("Unsupported metric data type", zap.Any("data_type", metric.DataType()))
Expand All @@ -386,6 +386,10 @@ func (m *metricMapper) summaryPointToTimeSeries(
sum pdata.Summary,
point pdata.SummaryDataPoint,
) []*monitoringpb.TimeSeries {
if point.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
// Drop points without a value.
return nil
}
sumName, countName := summaryMetricNames(metric.Name())
startTime := timestamppb.New(point.StartTimestamp().AsTime())
endTime := timestamppb.New(point.Timestamp().AsTime())
Expand Down Expand Up @@ -599,13 +603,17 @@ func (m *metricMapper) histogramToTimeSeries(
metric pdata.Metric,
_ pdata.Histogram,
point pdata.HistogramDataPoint,
) *monitoringpb.TimeSeries {
) []*monitoringpb.TimeSeries {
if point.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
// Drop points without a value.
return nil
}
// We treat deltas as cumulatives w/ resets.
metricKind := metricpb.MetricDescriptor_CUMULATIVE
startTime := timestamppb.New(point.StartTimestamp().AsTime())
endTime := timestamppb.New(point.Timestamp().AsTime())
value := m.histogramPoint(point)
return &monitoringpb.TimeSeries{
return []*monitoringpb.TimeSeries{{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricKind,
Expand All @@ -624,7 +632,7 @@ func (m *metricMapper) histogramToTimeSeries(
extraLabels,
),
},
}
}}
}

func (m *metricMapper) exponentialHistogramToTimeSeries(
Expand All @@ -633,13 +641,17 @@ func (m *metricMapper) exponentialHistogramToTimeSeries(
metric pdata.Metric,
_ pdata.ExponentialHistogram,
point pdata.ExponentialHistogramDataPoint,
) *monitoringpb.TimeSeries {
) []*monitoringpb.TimeSeries {
if point.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
// Drop points without a value.
return nil
}
// We treat deltas as cumulatives w/ resets.
metricKind := metricpb.MetricDescriptor_CUMULATIVE
startTime := timestamppb.New(point.StartTimestamp().AsTime())
endTime := timestamppb.New(point.Timestamp().AsTime())
value := m.exponentialHistogramPoint(point)
return &monitoringpb.TimeSeries{
return []*monitoringpb.TimeSeries{{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricKind,
Expand All @@ -658,7 +670,7 @@ func (m *metricMapper) exponentialHistogramToTimeSeries(
extraLabels,
),
},
}
}}
}

func (m *metricMapper) sumPointToTimeSeries(
Expand All @@ -670,6 +682,11 @@ func (m *metricMapper) sumPointToTimeSeries(
) []*monitoringpb.TimeSeries {
metricKind := metricpb.MetricDescriptor_CUMULATIVE
var startTime *timestamppb.Timestamp
if point.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
// Drop points without a value. This may be a staleness marker from
// prometheus.
return nil
}
if sum.IsMonotonic() {
metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes())
normalizedPoint := m.normalizeNumberDataPoint(point, metricIdentifier)
Expand Down Expand Up @@ -754,11 +771,15 @@ func (m *metricMapper) gaugePointToTimeSeries(
metric pdata.Metric,
gauge pdata.Gauge,
point pdata.NumberDataPoint,
) *monitoringpb.TimeSeries {
) []*monitoringpb.TimeSeries {
if point.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
// Drop points without a value.
return nil
}
metricKind := metricpb.MetricDescriptor_GAUGE
value, valueType := numberDataPointToValue(point)

return &monitoringpb.TimeSeries{
return []*monitoringpb.TimeSeries{{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricKind,
Expand All @@ -776,7 +797,7 @@ func (m *metricMapper) gaugePointToTimeSeries(
extraLabels,
),
},
}
}}
}

// Returns any configured prefix to add to unknown metric name.
Expand Down
91 changes: 85 additions & 6 deletions exporter/collector/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,38 @@ func TestMetricToTimeSeries(t *testing.T) {
require.Equal(t, ts[1].Points[0].Value.GetDoubleValue(), 1.0, "Should normalize the point after the reset")
})

t.Run("Sum with no value", func(t *testing.T) {
mapper, shutdown := newTestMetricMapper()
defer shutdown()
metric := pdata.NewMetric()
metric.SetDataType(pdata.MetricDataTypeSum)
sum := metric.Sum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
startTs := pdata.NewTimestampFromTime(start)
endTs := pdata.NewTimestampFromTime(start.Add(time.Hour))
// Add three points
point := sum.DataPoints().AppendEmpty()
point.SetDoubleVal(10)
point.SetStartTimestamp(startTs)
point.SetTimestamp(endTs)
point = sum.DataPoints().AppendEmpty()
point.SetDoubleVal(15)
point.SetStartTimestamp(startTs)
point.SetTimestamp(endTs)
// The last point has no value
point = sum.DataPoints().AppendEmpty()
point.SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))

ts := mapper.metricToTimeSeries(
mr,
labels{},
metric,
)
require.Len(t, ts, 2, "Should create one timeseries for each sum point, but omit the stale point")
require.Same(t, ts[0].Resource, mr, "Should assign the passed in monitored resource")
})

t.Run("Gauge", func(t *testing.T) {
mapper, shutdown := newTestMetricMapper()
defer shutdown()
Expand All @@ -177,6 +209,27 @@ func TestMetricToTimeSeries(t *testing.T) {
require.Len(t, ts, 3, "Should create one timeseries for each gauge point")
require.Same(t, ts[0].Resource, mr, "Should assign the passed in monitored resource")
})

t.Run("Gauge with no value", func(t *testing.T) {
mapper, shutdown := newTestMetricMapper()
defer shutdown()
metric := pdata.NewMetric()
metric.SetDataType(pdata.MetricDataTypeGauge)
gauge := metric.Gauge()
// Add three points
gauge.DataPoints().AppendEmpty().SetIntVal(10)
gauge.DataPoints().AppendEmpty().SetIntVal(15)
// The last point has no value
gauge.DataPoints().AppendEmpty().SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))

ts := mapper.metricToTimeSeries(
mr,
labels{},
metric,
)
require.Len(t, ts, 2, "Should create one timeseries for each gauge point, except the point without a value")
require.Same(t, ts[0].Resource, mr, "Should assign the passed in monitored resource")
})
}

func TestMergeLabels(t *testing.T) {
Expand Down Expand Up @@ -223,7 +276,12 @@ func TestHistogramPointToTimeSeries(t *testing.T) {
exemplar.SetSpanID(pdata.NewSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}))
exemplar.FilteredAttributes().InsertString("test", "extra")

ts := mapper.histogramToTimeSeries(mr, labels{}, metric, hist, point)
// Add a second point with no value
hist.DataPoints().AppendEmpty().SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))

tsl := mapper.histogramToTimeSeries(mr, labels{}, metric, hist, point)
assert.Len(t, tsl, 1)
ts := tsl[0]
// Verify aspects
assert.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind)
assert.Equal(t, metricpb.MetricDescriptor_DISTRIBUTION, ts.ValueType)
Expand Down Expand Up @@ -289,7 +347,12 @@ func TestExponentialHistogramPointToTimeSeries(t *testing.T) {
exemplar.SetSpanID(pdata.NewSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}))
exemplar.FilteredAttributes().InsertString("test", "extra")

ts := mapper.exponentialHistogramToTimeSeries(mr, labels{}, metric, hist, point)
// Add a second point with no value
hist.DataPoints().AppendEmpty().SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))

tsl := mapper.exponentialHistogramToTimeSeries(mr, labels{}, metric, hist, point)
assert.Len(t, tsl, 1)
ts := tsl[0]
// Verify aspects
assert.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind)
assert.Equal(t, metricpb.MetricDescriptor_DISTRIBUTION, ts.ValueType)
Expand Down Expand Up @@ -396,6 +459,8 @@ func TestSumPointToTimeSeries(t *testing.T) {
metric.SetDataType(pdata.MetricDataTypeSum)
sum := metric.Sum()
point := sum.DataPoints().AppendEmpty()
// Add a second point with no value
sum.DataPoints().AppendEmpty().SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))
return metric, sum, point
}

Expand Down Expand Up @@ -522,6 +587,8 @@ func TestGaugePointToTimeSeries(t *testing.T) {
metric.SetDataType(pdata.MetricDataTypeGauge)
gauge := metric.Gauge()
point := gauge.DataPoints().AppendEmpty()
// Add a second point with no value
gauge.DataPoints().AppendEmpty().SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))
return metric, gauge, point
}

Expand All @@ -534,7 +601,9 @@ func TestGaugePointToTimeSeries(t *testing.T) {
end := start.Add(time.Hour)
point.SetTimestamp(pdata.NewTimestampFromTime(end))

ts := mapper.gaugePointToTimeSeries(mr, labels{}, metric, gauge, point)
tsl := mapper.gaugePointToTimeSeries(mr, labels{}, metric, gauge, point)
assert.Len(t, tsl, 1)
ts := tsl[0]
assert.Equal(t, ts.MetricKind, metricpb.MetricDescriptor_GAUGE)
assert.Equal(t, ts.ValueType, metricpb.MetricDescriptor_INT64)
assert.Equal(t, ts.Unit, unit)
Expand All @@ -553,19 +622,25 @@ func TestGaugePointToTimeSeries(t *testing.T) {

// Test double as well
point.SetDoubleVal(float64(value))
ts = mapper.gaugePointToTimeSeries(mr, labels{}, metric, gauge, point)
tsl = mapper.gaugePointToTimeSeries(mr, labels{}, metric, gauge, point)
assert.Len(t, tsl, 1)
ts = tsl[0]
assert.Equal(t, ts.MetricKind, metricpb.MetricDescriptor_GAUGE)
assert.Equal(t, ts.ValueType, metricpb.MetricDescriptor_DOUBLE)
assert.Equal(t, ts.Points[0].Value.GetDoubleValue(), float64(value))

// Add extra labels
extraLabels := map[string]string{"foo": "bar"}
ts = mapper.gaugePointToTimeSeries(mr, labels(extraLabels), metric, gauge, point)
tsl = mapper.gaugePointToTimeSeries(mr, labels(extraLabels), metric, gauge, point)
assert.Len(t, tsl, 1)
ts = tsl[0]
assert.Equal(t, ts.Metric.Labels, extraLabels)

// Full set of labels
point.Attributes().InsertString("baz", "bar")
ts = mapper.gaugePointToTimeSeries(mr, labels(extraLabels), metric, gauge, point)
tsl = mapper.gaugePointToTimeSeries(mr, labels(extraLabels), metric, gauge, point)
assert.Len(t, tsl, 1)
ts = tsl[0]
assert.Equal(t, ts.Metric.Labels, map[string]string{"foo": "bar", "baz": "bar"})
}

Expand All @@ -592,6 +667,10 @@ func TestSummaryPointToTimeSeries(t *testing.T) {
end := start.Add(time.Hour)
point.SetStartTimestamp(pdata.NewTimestampFromTime(start))
point.SetTimestamp(pdata.NewTimestampFromTime(end))

// Add a second point with no value
summary.DataPoints().AppendEmpty().SetFlags(pdata.MetricDataPointFlags(pdata.MetricDataPointFlagNoRecordedValue))

ts := mapper.metricToTimeSeries(mr, labels{}, metric)
assert.Len(t, ts, 3)
sumResult := ts[0]
Expand Down