Skip to content

Commit

Permalink
normalize sums
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Feb 11, 2022
1 parent 1a8c2c3 commit 1086209
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 26 deletions.
96 changes: 96 additions & 0 deletions exporter/collector/internal/datapointstorage/datapointcache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datapointstorage

import (
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/model/pdata"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
)

const gcInterval = 20 * time.Minute

type Cache map[string]usedPoint

type usedPoint struct {
point *pdata.NumberDataPoint
used bool
}

// New instantiates a cache and starts background processes
func NewCache(shutdown <-chan struct{}) Cache {
c := make(Cache)
go func() {
for {
c.gc(shutdown, time.NewTicker(gcInterval).C)
}
}()
return c
}

// Get retrieves the point associated with the identifier, and whether
// or not it was found
func (c Cache) Get(identifier string) (*pdata.NumberDataPoint, bool) {
point, found := c[identifier]
if found {
point.used = true
c[identifier] = point
}
return point.point, found
}

// Set assigns the point to the identifier in the cache
func (c Cache) Set(identifier string, point *pdata.NumberDataPoint) {
c[identifier] = usedPoint{point, true}
}

// gc garbage collects the cache after the ticker ticks
func (c Cache) gc(shutdown <-chan struct{}, tickerCh <-chan time.Time) {
select {
case <-shutdown:
return
case <-tickerCh:
// garbage collect the cache
for id, point := range c {
if point.used {
// for points that have been used, mark them as unused
point.used = false
c[id] = point
} else {
// for points that have not been used, delete points
delete(c, id)
}
}
}
}

// Identifier returns the unique string identifier for a metric
func Identifier(resource *monitoredrespb.MonitoredResource, metric pdata.Metric, labels pdata.AttributeMap) string {
var b strings.Builder

// Resource identifiers
fmt.Fprintf(&b, "%v", resource.GetLabels())

// Metric identifiers
fmt.Fprintf(&b, " - %s", metric.Name())
labels.Sort().Range(func(k string, v pdata.AttributeValue) bool {
fmt.Fprintf(&b, " %s=%s", k, v.AsString())
return true
})
return b.String()
}
110 changes: 110 additions & 0 deletions exporter/collector/internal/datapointstorage/datapointcache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datapointstorage

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/model/pdata"
)

func TestSetAndGet(t *testing.T) {
c := make(Cache)
c.Set("foo", nil)
point, found := c.Get("foo")
assert.Nil(t, point)
assert.True(t, found)

point, found = c.Get("bar")
assert.Nil(t, point)
assert.False(t, found)

setPoint := pdata.NewNumberDataPoint()
c.Set("bar", &setPoint)

point, found = c.Get("bar")
assert.Equal(t, point, &setPoint)
assert.True(t, found)
}

func TestShutdown(t *testing.T) {
shutdown := make(chan struct{})
c := make(Cache)
close(shutdown)
// gc should return after shutdown is closed
c.gc(shutdown, make(chan time.Time))
}

func TestGC(t *testing.T) {
shutdown := make(chan struct{})
c := make(Cache)
fakeTicker := make(chan time.Time)

c.Set("bar", nil)

// bar exists since we just set it
usedPoint, found := c["bar"]
assert.True(t, usedPoint.used)
assert.True(t, found)

// first gc tick marks bar stale
go func() {
fakeTicker <- time.Now()
}()
c.gc(shutdown, fakeTicker)
usedPoint, found = c["bar"]
assert.False(t, usedPoint.used)
assert.True(t, found)

// second gc tick removes bar
go func() {
fakeTicker <- time.Now()
}()
c.gc(shutdown, fakeTicker)
_, found = c["bar"]
assert.False(t, found)
}

func TestGetPreventsGC(t *testing.T) {
shutdown := make(chan struct{})
c := make(Cache)
fakeTicker := make(chan time.Time)

setPoint := pdata.NewNumberDataPoint()
c.Set("bar", &setPoint)

// bar exists since we just set it
_, found := c["bar"]
assert.True(t, found)

// first gc tick marks bar stale
go func() {
fakeTicker <- time.Now()
}()
c.gc(shutdown, fakeTicker)
// calling Get() marks it fresh again.
_, found = c.Get("bar")
assert.True(t, found)

// second gc tick does not remove bar
go func() {
fakeTicker <- time.Now()
}()
c.gc(shutdown, fakeTicker)
_, found = c["bar"]
assert.True(t, found)
}
81 changes: 69 additions & 12 deletions exporter/collector/metricsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/internal/datapointstorage"
)

// self-observability reporting meters/tracers/loggers.
Expand Down Expand Up @@ -78,8 +80,9 @@ type MetricsExporter struct {
// metricMapper is the part that transforms metrics. Separate from MetricsExporter since it has
// all pure functions.
type metricMapper struct {
obs selfObservability
cfg Config
obs selfObservability
cfg Config
sumCache datapointstorage.Cache
}

// Constants we use when translating summary metrics into GCP.
Expand Down Expand Up @@ -152,19 +155,24 @@ func NewGoogleCloudMetricsExporter(
return nil, err
}
obs := selfObservability{log: log}
shutdown := make(chan struct{})
mExp := &MetricsExporter{
cfg: cfg,
client: client,
obs: obs,
mapper: metricMapper{obs, cfg},
mapper: metricMapper{
obs,
cfg,
datapointstorage.NewCache(shutdown),
},
// We create a buffered channel for metric descriptors.
// MetricDescritpors are asychronously sent and optimistic.
// We only get Unit/Description/Display name from them, so it's ok
// to drop / conserve resources for sending timeseries.
metricDescriptorC: make(chan *metricpb.MetricDescriptor, cfg.MetricConfig.CreateMetricDescriptorBufferSize),
mdCache: make(map[string]*metricpb.MetricDescriptor),
timeSeriesC: make(chan *monitoringpb.TimeSeries),
shutdownC: make(chan struct{}),
shutdownC: shutdown,
}

// Fire up the metric descriptor exporter.
Expand Down Expand Up @@ -351,7 +359,7 @@ func (m *metricMapper) metricToTimeSeries(
points := sum.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.sumPointToTimeSeries(resource, extraLabels, metric, sum, points.At(i))
timeSeries = append(timeSeries, ts)
timeSeries = append(timeSeries, ts...)
}
case pdata.MetricDataTypeGauge:
gauge := metric.Gauge()
Expand Down Expand Up @@ -677,16 +685,27 @@ func (m *metricMapper) sumPointToTimeSeries(
metric pdata.Metric,
sum pdata.Sum,
point pdata.NumberDataPoint,
) *monitoringpb.TimeSeries {
) []*monitoringpb.TimeSeries {
metricKind := metricpb.MetricDescriptor_CUMULATIVE
startTime := timestamppb.New(point.StartTimestamp().AsTime())
if !sum.IsMonotonic() {
var normalizationPoint *pdata.NumberDataPoint
if sum.IsMonotonic() {
metricIdentifier := datapointstorage.Identifier(resource, metric, point.Attributes())
var keep bool
normalizationPoint, keep = m.normalizeMetric(point, metricIdentifier)
if !keep {
return nil
}
if normalizationPoint != nil {
startTime = timestamppb.New(normalizationPoint.StartTimestamp().AsTime())
}
} else {
metricKind = metricpb.MetricDescriptor_GAUGE
startTime = nil
}
value, valueType := numberDataPointToValue(point)
value, valueType := numberDataPointToValue(point, normalizationPoint)

return &monitoringpb.TimeSeries{
return []*monitoringpb.TimeSeries{{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricKind,
Expand All @@ -705,7 +724,36 @@ func (m *metricMapper) sumPointToTimeSeries(
extraLabels,
),
},
}}
}

// normalizeMetric returns the point that a metric should be normalized against,
// and whether or not the point should be kept
func (m *metricMapper) normalizeMetric(point pdata.NumberDataPoint, metricIdentifier string) (*pdata.NumberDataPoint, bool) {
var normalizationPoint *pdata.NumberDataPoint
start, ok := m.sumCache.Get(metricIdentifier)
if ok {
if !start.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) {
// We found a cached start timestamp that wouldn't produce a valid point.
// Drop it and log.
m.obs.log.Info(
"data point being processed older than last recorded reset, will not be emitted",
zap.String("lastRecordedReset", start.Timestamp().String()),
zap.String("dataPoint", point.Timestamp().String()),
)
return nil, false
}
normalizationPoint = start
}
if (!ok && point.StartTimestamp().AsTime().IsZero()) || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) {
// This is the first time we've seen this metric, or we received
// an explicit reset point as described in
// https://github.com/open-telemetry/opentelemetry-specification/blob/9555f9594c7ffe5dc333b53da5e0f880026cead1/specification/metrics/datamodel.md#resets-and-gaps
// Record it in history and drop the point.
m.sumCache.Set(metricIdentifier, &point)
return nil, false
}
return normalizationPoint, true
}

func (m *metricMapper) gaugePointToTimeSeries(
Expand All @@ -716,7 +764,7 @@ func (m *metricMapper) gaugePointToTimeSeries(
point pdata.NumberDataPoint,
) *monitoringpb.TimeSeries {
metricKind := metricpb.MetricDescriptor_GAUGE
value, valueType := numberDataPointToValue(point)
value, valueType := numberDataPointToValue(point, nil)

return &monitoringpb.TimeSeries{
Resource: resource,
Expand Down Expand Up @@ -756,15 +804,24 @@ func (m *metricMapper) metricNameToType(name string) string {

func numberDataPointToValue(
point pdata.NumberDataPoint,
normalizationPoint *pdata.NumberDataPoint,
) (*monitoringpb.TypedValue, metricpb.MetricDescriptor_ValueType) {
if point.Type() == pdata.MetricValueTypeInt {
val := point.IntVal()
if normalizationPoint != nil {
val -= normalizationPoint.IntVal()
}
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: point.IntVal(),
Int64Value: val,
}},
metricpb.MetricDescriptor_INT64
}
val := point.DoubleVal()
if normalizationPoint != nil {
val -= normalizationPoint.DoubleVal()
}
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: point.DoubleVal(),
DoubleValue: val,
}},
metricpb.MetricDescriptor_DOUBLE
}
Expand Down
Loading

0 comments on commit 1086209

Please sign in to comment.