diff --git a/jobs/stackdriver-nozzle/spec b/jobs/stackdriver-nozzle/spec index b0af480b..d8ac3ce5 100644 --- a/jobs/stackdriver-nozzle/spec +++ b/jobs/stackdriver-nozzle/spec @@ -67,3 +67,7 @@ properties: nozzle.foundation_name: description: Name added as the 'foundation' label to all time series being sent to Stackdriver, useful for differentiating between multiple PCF instances in a project. default: cf + + nozzle.enable_cumulative_counters: + description: Enable reporting counter events as cumulative Stackdriver metrics. This requires all CounterEvent messages for a given metric to be routed to the same nozzle process (which is the case if you run a single copy of the nozzle). + default: false \ No newline at end of file diff --git a/jobs/stackdriver-nozzle/templates/stackdriver-nozzle-ctl.erb b/jobs/stackdriver-nozzle/templates/stackdriver-nozzle-ctl.erb index df276fa3..3a1709e4 100644 --- a/jobs/stackdriver-nozzle/templates/stackdriver-nozzle-ctl.erb +++ b/jobs/stackdriver-nozzle/templates/stackdriver-nozzle-ctl.erb @@ -38,6 +38,7 @@ case $1 in export FOUNDATION_NAME=<%= p('nozzle.foundation_name', 'cf') %> export LOGGING_BATCH_COUNT=<%= p('nozzle.logging_batch_count', '1000') %> export LOGGING_BATCH_DURATION=<%= p('nozzle.logging_batch_duration', '30') %> + export ENABLE_CUMULATIVE_COUNTERS=<%= p('nozzle.enable_cumulative_counters', 'false') %> <% if_p('gcp.project_id') do |prop| %> export GCP_PROJECT_ID=<%= prop %> diff --git a/src/stackdriver-nozzle/app/builder.go b/src/stackdriver-nozzle/app/builder.go index 183c45d6..fdad8fdc 100644 --- a/src/stackdriver-nozzle/app/builder.go +++ b/src/stackdriver-nozzle/app/builder.go @@ -129,7 +129,13 @@ func (a *App) newMetricSink(ctx context.Context, metricAdapter stackdriver.Metri metricBuffer := metrics_pipeline.NewAutoCulledMetricsBuffer(ctx, a.logger, time.Duration(a.c.MetricsBufferDuration)*time.Second, metricAdapter) a.bufferEmpty = metricBuffer.IsEmpty - return nozzle.NewMetricSink(a.logger, a.c.MetricPathPrefix, a.labelMaker, metricBuffer, nozzle.NewUnitParser(), a.c.RuntimeMetricRegex) + var counterTracker *nozzle.CounterTracker + if a.c.EnableCumulativeCounters { + ttl := time.Duration(a.c.CounterTrackerTTL) * time.Second + counterTracker = nozzle.NewCounterTracker(ctx, ttl, a.logger) + } + + return nozzle.NewMetricSink(a.logger, a.c.MetricPathPrefix, a.labelMaker, metricBuffer, counterTracker, nozzle.NewUnitParser(), a.c.RuntimeMetricRegex) } func (a *App) newTelemetryReporter() telemetry.Reporter { diff --git a/src/stackdriver-nozzle/config/config.go b/src/stackdriver-nozzle/config/config.go index 1bc36544..df286897 100644 --- a/src/stackdriver-nozzle/config/config.go +++ b/src/stackdriver-nozzle/config/config.go @@ -74,6 +74,13 @@ type Config struct { DebugNozzle bool `envconfig:"debug_nozzle"` // By default 'origin' label is prepended to metric name, however for runtime metrics (defined here) we add it as a metric label instead. RuntimeMetricRegex string `envconfig:"runtime_metric_regex" default:"^(numCPUS|numGoRoutines|memoryStats\\..*)$"` + // If enabled, CounterEvents will be reported as cumulative Stackdriver metrics instead of two gauges (.delta + // and .total). Reporting cumulative metrics involves nozzle keeping track of internal counter state, and + // requires deterministic routing of CounterEvents to nozzles (i.e. CounterEvent messages for a particular metric MUST + // always be routed to the same nozzle process); the easiest way to achieve that is to run a single copy of the nozzle. + EnableCumulativeCounters bool `envconfig:"enable_cumulative_counters"` + // Expire internal counter state if a given counter has not been seen for this many seconds. + CounterTrackerTTL int `envconfig:"counter_tracker_ttl" default:"130"` } func (c *Config) validate() error { diff --git a/src/stackdriver-nozzle/messages/metric.go b/src/stackdriver-nozzle/messages/metric.go index 197dd13b..80483601 100644 --- a/src/stackdriver-nozzle/messages/metric.go +++ b/src/stackdriver-nozzle/messages/metric.go @@ -2,10 +2,16 @@ package messages import ( "bytes" + "fmt" + "path" "sort" "time" "github.com/cloudfoundry/sonde-go/events" + "github.com/golang/protobuf/ptypes/timestamp" + labelpb "google.golang.org/genproto/googleapis/api/label" + metricpb "google.golang.org/genproto/googleapis/api/metric" + monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" ) // Metric represents one of the metrics contained in an events.Envelope. @@ -13,25 +19,96 @@ type Metric struct { Name string Labels map[string]string `json:"-"` Value float64 + IntValue int64 EventTime time.Time + StartTime time.Time `json:"-"` Unit string // TODO Should this be "1" if it's empty? Type events.Envelope_EventType `json:"-"` } +func (m *Metric) IsCumulative() bool { + return m.Type == events.Envelope_CounterEvent +} + +func (m *Metric) metricType() string { + return path.Join("custom.googleapis.com", m.Name) +} + +// NeedsMetricDescriptor determines whether a custom metric descriptor needs to be created for this metric in Stackdriver. +// We do that if we need to set a custom unit, or mark metric as a cumulative. +func (m *Metric) NeedsMetricDescriptor() bool { + return m.Unit != "" || m.IsCumulative() +} + +// MetricDescriptor returns a Stackdriver MetricDescriptor proto for this metric. +func (m *Metric) MetricDescriptor(projectName string) *metricpb.MetricDescriptor { + metricType := m.metricType() + + var labelDescriptors []*labelpb.LabelDescriptor + for key := range m.Labels { + labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{ + Key: key, + ValueType: labelpb.LabelDescriptor_STRING, + }) + } + + metricKind := metricpb.MetricDescriptor_GAUGE + valueType := metricpb.MetricDescriptor_DOUBLE + if m.IsCumulative() { + metricKind = metricpb.MetricDescriptor_CUMULATIVE + valueType = metricpb.MetricDescriptor_INT64 + } + + return &metricpb.MetricDescriptor{ + Name: path.Join(projectName, "metricDescriptors", metricType), + Type: metricType, + Labels: labelDescriptors, + MetricKind: metricKind, + ValueType: valueType, + Unit: m.Unit, + Description: "stackdriver-nozzle created custom metric.", + DisplayName: m.Name, + } +} + +// TimeSeries returns a Stackdriver TimeSeries proto for this metric value. +func (m *Metric) TimeSeries() *monitoringpb.TimeSeries { + var value *monitoringpb.TypedValue + if m.IsCumulative() { + value = &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{Int64Value: m.IntValue}} + } else { + value = &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: m.Value}} + } + + point := &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + EndTime: ×tamp.Timestamp{Seconds: m.EventTime.Unix(), Nanos: int32(m.EventTime.Nanosecond())}, + StartTime: ×tamp.Timestamp{Seconds: m.StartTime.Unix(), Nanos: int32(m.StartTime.Nanosecond())}, + }, + Value: value, + } + return &monitoringpb.TimeSeries{ + Metric: &metricpb.Metric{ + Type: m.metricType(), + Labels: m.Labels, + }, + Points: []*monitoringpb.Point{point}, + } +} + func (m *Metric) Hash() string { var b bytes.Buffer - // Extract keys to a slice and sort it - numKeys := len(m.Labels) + 1 - keys := make([]string, numKeys, numKeys) - keys = append(keys, m.Name) + b.Write([]byte(m.Name)) + + // Extract label keys to a slice and sort it + keys := make([]string, 0, len(m.Labels)) for k := range m.Labels { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { - b.Write([]byte(k)) - b.Write([]byte(m.Labels[k])) + b.Write([]byte(fmt.Sprintf(",%s='%s'", k, m.Labels[k]))) } return b.String() } diff --git a/src/stackdriver-nozzle/nozzle/counter_tracker.go b/src/stackdriver-nozzle/nozzle/counter_tracker.go new file mode 100644 index 00000000..7010fd4a --- /dev/null +++ b/src/stackdriver-nozzle/nozzle/counter_tracker.go @@ -0,0 +1,176 @@ +/* + * Copyright 2017 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nozzle + +import ( + "context" + "expvar" + "math" + "sync" + "time" + + "github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/telemetry" + "github.com/cloudfoundry/lager" +) + +const maxExpirePeriod = 10 * time.Second + +var countersExpiredCount *telemetry.Counter + +func init() { + countersExpiredCount = telemetry.NewCounter(telemetry.Nozzle, "metrics.counters.expired") +} + +type counterData struct { + startTime time.Time + totalValue *expvar.Int + lastValue uint64 + lastSeenTime time.Time + lastEventTime time.Time +} + +// CounterTracker is used to provide a "start time" for each loggregator counter metric exported by the nozzle. +// +// Stackdriver requires each point for a cumulative metric to include "start time" in addition to the actual event time +// (aka "end time"): https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#point +// Typically start time would correspond to the time when the actual process exporting the metric started. This ensures +// that when a process is restarted (and counter gets reset to 0), start time increases. +// +// Since binaries that export counter events to loggregator only provide event time, the nozzle needs to determine start +// time for each metric itself. To do that, CounterTracker keeps its own counter for each metric, which corresponds to the +// total number of events since the metric was first seen by the nozzle (which is exported as the start time). +// +// As an example, a series of incoming CounterEvents with total values of [100, 110, 115, 150] will be exported by the +// nozzle as [10, 15, 50] (first point seen by the nozzle is discarded, because each point reported to Stackdriver needs +// to cover non-zero time interval between start time and end time). +// +// If CounterTracker detects the total value for a given counter decrease, it will interpret this as a counter reset. This +// will not result in the Stackdriver cumulative metric being reset as well; for example, incoming CounterEvents with total +// values of [100, 110, 115, 10, 17] will be exported by the nozzle as [10, 15, 25, 32]. +// +// CounterTracker will regularly remove internal state for metrics that have not been seen for a while. This is done to +// conserve memory, and also to ensure that old values do not re-surface if a given counter stops being exported for some +// period of time. +type CounterTracker struct { + counters map[string]*counterData + mu *sync.Mutex // protects `counters` + ttl time.Duration + logger lager.Logger + ticker *time.Ticker + ctx context.Context +} + +// NewCounterTracker creates and returns a counter tracker. +func NewCounterTracker(ctx context.Context, ttl time.Duration, logger lager.Logger) *CounterTracker { + expirePeriod := time.Duration(ttl.Nanoseconds() / 2) + if expirePeriod > maxExpirePeriod { + expirePeriod = maxExpirePeriod + } + c := &CounterTracker{ + counters: map[string]*counterData{}, + mu: &sync.Mutex{}, + ttl: ttl, + logger: logger, + ticker: time.NewTicker(expirePeriod), + ctx: ctx, + } + go func() { + for { + select { + case <-c.ticker.C: + c.expire() + case <-c.ctx.Done(): + c.ticker.Stop() + return + } + } + }() + return c +} + +// Update accepts a counter name, event time and a value, and returns the total value for the counter along with its +// start time. Counter name provided needs to uniquely identify the time series (so it needs to include metric name as +// well as all metric label values). +// At least two values need to be observed for a given counter to determine the total value, so for the first observed +// value, 0 will be returned as the total, and end time will be equal to event time. Such points should not be reported +// to Stackdriver, since it expects points covering non-zero time interval. +func (t *CounterTracker) Update(name string, value uint64, eventTime time.Time) (int64, time.Time) { + t.mu.Lock() + defer t.mu.Unlock() + + c, present := t.counters[name] + if !present { + c = t.newCounterData(name, eventTime) + t.counters[name] = c + } else { + var delta uint64 + if c.lastValue > value { + // Counter has been reset. + delta = value + } else { + delta = value - c.lastValue + } + if uint64(c.totalValue.Value())+delta > math.MaxInt64 { + // Accumulated value overflows int64, we need to reset the counter. + c.totalValue.Set(int64(delta)) + c.startTime = c.lastEventTime + } else { + c.totalValue.Add(int64(delta)) + } + } + c.lastValue = value + c.lastSeenTime = time.Now() + c.lastEventTime = eventTime + return c.totalValue.Value(), c.startTime +} + +func (t *CounterTracker) newCounterData(name string, eventTime time.Time) *counterData { + var v *expvar.Int + existing := expvar.Get(name) + if existing != nil { + // There was a previous counter with this name; use it instead, but reset value to 0. + v = existing.(*expvar.Int) + v.Set(0) + } else { + v = expvar.NewInt(name) + } + // Initialize counter state for a new counter. + return &counterData{ + totalValue: v, + startTime: eventTime, + } +} + +func (t *CounterTracker) expire() { + t.mu.Lock() + defer t.mu.Unlock() + + for name, counter := range t.counters { + if time.Now().Sub(counter.lastSeenTime) > t.ttl { + t.logger.Info("CounterTracker", lager.Data{ + "info": "removing expired counter", + "name": name, + "counter": counter, + "value": t.counters[name].totalValue.Value(), + }) + // Reset values to -1 to make expired counters visible in /debug/vars. + t.counters[name].totalValue.Set(-1) + delete(t.counters, name) + countersExpiredCount.Increment() + } + } +} diff --git a/src/stackdriver-nozzle/nozzle/counter_tracker_test.go b/src/stackdriver-nozzle/nozzle/counter_tracker_test.go new file mode 100644 index 00000000..668286e5 --- /dev/null +++ b/src/stackdriver-nozzle/nozzle/counter_tracker_test.go @@ -0,0 +1,106 @@ +/* + * Copyright 2017 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nozzle + +import ( + "context" + "math" + "time" + + "github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/mocks" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func testCounterTracker(subject *CounterTracker, name string, baseTime time.Time, incoming []uint64, expected []int64) { + for idx, value := range incoming { + ts := baseTime.Add(time.Duration(idx) * time.Millisecond) + total, st := subject.Update(name, value, ts) + if idx == 0 { + // First seen value initializes the counter. + Expect(total).To(BeNumerically("==", 0)) + } else { + Expect(total).To(BeNumerically("==", expected[idx-1])) + Expect(st).To(BeTemporally("~", baseTime)) + } + } +} + +var _ = Describe("CounterTracker", func() { + var ( + subject *CounterTracker + counterTTL time.Duration + logger *mocks.MockLogger + ) + + BeforeEach(func() { + logger = &mocks.MockLogger{} + counterTTL = time.Duration(50) * time.Millisecond + countersExpiredCount.Set(0) + }) + + It("increments counters and handles counter resets", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + subject = NewCounterTracker(ctx, counterTTL, logger) + + incomingTotals := []uint64{10, 15, 25, 40, 10, 20} + expectedTotals := []int64{5, 15, 30, 40, 50} + testCounterTracker(subject, "metric", time.Now(), incomingTotals, expectedTotals) + }) + + It("expires old counters", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + subject = NewCounterTracker(ctx, counterTTL, logger) + + incomingTotals := []uint64{150, 165, 165, 170, 200, 200} + expectedTotals := []int64{15, 15, 20, 50, 50} + + testCounterTracker(subject, "metric2", time.Now(), incomingTotals, expectedTotals) + Eventually(countersExpiredCount.IntValue).Should(Equal(1)) + testCounterTracker(subject, "metric2", time.Now(), incomingTotals, expectedTotals) + }) + + It("handles int64 overflows", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + subject = NewCounterTracker(ctx, counterTTL, logger) + + baseTime := time.Now() + incoming := []uint64{150, 165, 165, math.MaxInt64, math.MaxInt64 + 400, math.MaxInt64 + 450} + expected := []int64{15, 15, 15 + math.MaxInt64 - 165, 400, 450} + + for idx, value := range incoming { + ts := baseTime.Add(time.Duration(idx) * time.Millisecond) + total, st := subject.Update("metric3", value, ts) + if idx == 0 { + // First seen value initializes the counter. + Expect(total).To(BeNumerically("==", 0)) + continue + } else { + Expect(total).To(BeNumerically("==", expected[idx-1]), "iteration %d", idx) + } + // Value at iteration 4 is more than MaxInt64, so start time gets reset. + if idx < 4 { + Expect(st).To(BeTemporally("~", baseTime), "iteration %d", idx) + } else { + Expect(st).To(BeTemporally("~", baseTime.Add(4*time.Millisecond)), "iteration %d", idx) + } + } + }) +}) diff --git a/src/stackdriver-nozzle/nozzle/metric_sink.go b/src/stackdriver-nozzle/nozzle/metric_sink.go index fb1c5862..f610e5f2 100644 --- a/src/stackdriver-nozzle/nozzle/metric_sink.go +++ b/src/stackdriver-nozzle/nozzle/metric_sink.go @@ -29,7 +29,7 @@ import ( ) // NewLogSink returns a Sink that can receive sonde Events, translate them and send them to a stackdriver.MetricAdapter -func NewMetricSink(logger lager.Logger, pathPrefix string, labelMaker LabelMaker, metricAdapter stackdriver.MetricAdapter, unitParser UnitParser, runtimeMetricRegex string) (Sink, error) { +func NewMetricSink(logger lager.Logger, pathPrefix string, labelMaker LabelMaker, metricAdapter stackdriver.MetricAdapter, ct *CounterTracker, unitParser UnitParser, runtimeMetricRegex string) (Sink, error) { r, err := regexp.Compile(runtimeMetricRegex) if err != nil { return nil, fmt.Errorf("cannot compile runtime metric regex: %v", err) @@ -39,6 +39,7 @@ func NewMetricSink(logger lager.Logger, pathPrefix string, labelMaker LabelMaker labelMaker: labelMaker, metricAdapter: metricAdapter, unitParser: unitParser, + counterTracker: ct, logger: logger, runtimeMetricRe: r, }, nil @@ -49,6 +50,7 @@ type metricSink struct { labelMaker LabelMaker metricAdapter stackdriver.MetricAdapter unitParser UnitParser + counterTracker *CounterTracker logger lager.Logger runtimeMetricRe *regexp.Regexp } @@ -97,34 +99,62 @@ func (ms *metricSink) Receive(envelope *events.Envelope) { Type: eventType, Value: valueMetric.GetValue(), EventTime: eventTime, + StartTime: eventTime, Unit: ms.unitParser.Parse(valueMetric.GetUnit()), }} case events.Envelope_ContainerMetric: containerMetric := envelope.GetContainerMetric() metrics = []*messages.Metric{ - {Name: metricPrefix + "diskBytesQuota", Labels: labels, Type: eventType, Value: float64(containerMetric.GetDiskBytesQuota()), EventTime: eventTime}, - {Name: metricPrefix + "cpuPercentage", Labels: labels, Type: eventType, Value: float64(containerMetric.GetCpuPercentage()), EventTime: eventTime}, - {Name: metricPrefix + "diskBytes", Labels: labels, Type: eventType, Value: float64(containerMetric.GetDiskBytes()), EventTime: eventTime}, - {Name: metricPrefix + "memoryBytes", Labels: labels, Type: eventType, Value: float64(containerMetric.GetMemoryBytes()), EventTime: eventTime}, - {Name: metricPrefix + "memoryBytesQuota", Labels: labels, Type: eventType, Value: float64(containerMetric.GetMemoryBytesQuota()), EventTime: eventTime}, + {Name: metricPrefix + "diskBytesQuota", Value: float64(containerMetric.GetDiskBytesQuota())}, + {Name: metricPrefix + "cpuPercentage", Value: float64(containerMetric.GetCpuPercentage())}, + {Name: metricPrefix + "diskBytes", Value: float64(containerMetric.GetDiskBytes())}, + {Name: metricPrefix + "memoryBytes", Value: float64(containerMetric.GetMemoryBytes())}, + {Name: metricPrefix + "memoryBytesQuota", Value: float64(containerMetric.GetMemoryBytesQuota())}, + } + for _, metric := range metrics { + metric.Labels = labels + metric.Type = eventType + metric.EventTime = eventTime + metric.StartTime = eventTime } case events.Envelope_CounterEvent: counterEvent := envelope.GetCounterEvent() - metrics = []*messages.Metric{ - { - Name: fmt.Sprintf("%s%v.delta", metricPrefix, counterEvent.GetName()), - Labels: labels, - Type: eventType, - Value: float64(counterEvent.GetDelta()), - EventTime: eventTime, - }, - { - Name: fmt.Sprintf("%s%v.total", metricPrefix, counterEvent.GetName()), + if ms.counterTracker == nil { + // When there is no counter tracker, report CounterEvent metrics as two gauges: 'delta' and 'total'. + metrics = []*messages.Metric{ + { + Name: fmt.Sprintf("%s%v.delta", metricPrefix, counterEvent.GetName()), + Labels: labels, + Type: events.Envelope_ValueMetric, + Value: float64(counterEvent.GetDelta()), + EventTime: eventTime, + StartTime: eventTime, + }, + { + Name: fmt.Sprintf("%s%v.total", metricPrefix, counterEvent.GetName()), + Labels: labels, + Type: events.Envelope_ValueMetric, + Value: float64(counterEvent.GetTotal()), + EventTime: eventTime, + StartTime: eventTime, + }, + } + } else { + // Create a partial metric struct (lacking IntValue and StartTime) to allow determining metric.Hash (used as + // the counter name) based on metric name and labels. + metric := &messages.Metric{ + Name: metricPrefix + counterEvent.GetName(), Labels: labels, Type: eventType, - Value: float64(counterEvent.GetTotal()), EventTime: eventTime, - }, + } + total, st := ms.counterTracker.Update(metric.Hash(), counterEvent.GetTotal(), eventTime) + // Stackdriver expects non-zero time intervals, so only add a metric if event time is older than start time. + if eventTime.After(st) { + metric.StartTime = st + metric.IntValue = total + metrics = append(metrics, metric) + } } default: ms.logger.Error("metricSink.Receive", fmt.Errorf("unknown event type: %v", envelope.EventType)) diff --git a/src/stackdriver-nozzle/nozzle/metric_sink_test.go b/src/stackdriver-nozzle/nozzle/metric_sink_test.go index 0f0b86b3..646a68d1 100644 --- a/src/stackdriver-nozzle/nozzle/metric_sink_test.go +++ b/src/stackdriver-nozzle/nozzle/metric_sink_test.go @@ -17,6 +17,7 @@ package nozzle import ( + "context" "errors" "time" @@ -41,21 +42,23 @@ func (m *mockUnitParser) Parse(unit string) string { var _ = Describe("MetricSink", func() { var ( - subject Sink - metricBuffer *mocks.MetricsBuffer - unitParser *mockUnitParser - logger *mocks.MockLogger - err error + subject Sink + metricBuffer *mocks.MetricsBuffer + unitParser *mockUnitParser + logger *mocks.MockLogger + labelMaker LabelMaker + counterTracker *CounterTracker + err error ) BeforeEach(func() { appInfoRepository := &mocks.AppInfoRepository{AppInfoMap: map[string]cloudfoundry.AppInfo{}} - labelMaker := NewLabelMaker(appInfoRepository, "foobar") + labelMaker = NewLabelMaker(appInfoRepository, "foobar") metricBuffer = &mocks.MetricsBuffer{} unitParser = &mockUnitParser{} logger = &mocks.MockLogger{} - subject, err = NewMetricSink(logger, "firehose", labelMaker, metricBuffer, unitParser, "^runtimeMetric\\..*") + subject, err = NewMetricSink(logger, "firehose", labelMaker, metricBuffer, counterTracker, unitParser, "^runtimeMetric\\..*") Expect(err).To(BeNil()) }) @@ -89,7 +92,9 @@ var _ = Describe("MetricSink", func() { "Name": Equal("firehose/origin.valueMetricName"), "Labels": Equal(map[string]string{"foundation": "foobar"}), "Value": Equal(123.456), - "EventTime": Ignore(), + "IntValue": BeNumerically("==", 0), + "EventTime": BeTemporally("~", eventTime), + "StartTime": BeTemporally("~", eventTime), "Unit": Equal("{foo}"), "Type": Equal(eventType), })) @@ -126,7 +131,9 @@ var _ = Describe("MetricSink", func() { "Name": Equal("firehose/runtimeMetric.foobar"), "Labels": Equal(map[string]string{"foundation": "foobar", "origin": "myOrigin"}), "Value": Equal(123.456), - "EventTime": Ignore(), + "IntValue": BeNumerically("==", 0), + "EventTime": BeTemporally("~", eventTime), + "StartTime": BeTemporally("~", eventTime), "Unit": Ignore(), "Type": Ignore(), })) @@ -175,11 +182,11 @@ var _ = Describe("MetricSink", func() { labels := map[string]string{"foundation": "foobar", "instanceIndex": "0"} Expect(metrics).To(MatchAllElements(eventName, Elements{ - "firehose/origin.diskBytesQuota": MatchAllFields(Fields{"Name": Ignore(), "Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(1073741824)), "EventTime": Ignore(), "Unit": Equal("")}), - "firehose/origin.cpuPercentage": MatchAllFields(Fields{"Name": Ignore(), "Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(0.061651273460637)), "EventTime": Ignore(), "Unit": Equal("")}), - "firehose/origin.diskBytes": MatchAllFields(Fields{"Name": Ignore(), "Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(164634624)), "EventTime": Ignore(), "Unit": Equal("")}), - "firehose/origin.memoryBytes": MatchAllFields(Fields{"Name": Ignore(), "Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(16601088)), "EventTime": Ignore(), "Unit": Equal("")}), - "firehose/origin.memoryBytesQuota": MatchAllFields(Fields{"Name": Ignore(), "Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(33554432)), "EventTime": Ignore(), "Unit": Equal("")}), + "firehose/origin.diskBytesQuota": MatchFields(IgnoreExtras, Fields{"Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(1073741824)), "Unit": Equal("")}), + "firehose/origin.cpuPercentage": MatchFields(IgnoreExtras, Fields{"Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(0.061651273460637)), "Unit": Equal("")}), + "firehose/origin.diskBytes": MatchFields(IgnoreExtras, Fields{"Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(164634624)), "Unit": Equal("")}), + "firehose/origin.memoryBytes": MatchFields(IgnoreExtras, Fields{"Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(16601088)), "Unit": Equal("")}), + "firehose/origin.memoryBytesQuota": MatchFields(IgnoreExtras, Fields{"Labels": Equal(labels), "Type": Equal(metricType), "Value": Equal(float64(33554432)), "Unit": Equal("")}), })) }) @@ -217,21 +224,91 @@ var _ = Describe("MetricSink", func() { "Name": Ignore(), "Labels": Equal(map[string]string{"foundation": "foobar"}), "Value": Equal(float64(654321)), - "EventTime": Ignore(), + "IntValue": BeNumerically("==", 0), + "EventTime": BeTemporally("~", eventTime), + "StartTime": BeTemporally("~", eventTime), "Unit": Equal(""), - "Type": Equal(eventType), + "Type": Equal(events.Envelope_ValueMetric), }), "firehose/origin.counterName.total": MatchAllFields(Fields{ "Name": Ignore(), "Labels": Equal(map[string]string{"foundation": "foobar"}), "Value": Equal(float64(123456)), - "EventTime": Ignore(), + "IntValue": BeNumerically("==", 0), + "EventTime": BeTemporally("~", eventTime), + "StartTime": BeTemporally("~", eventTime), "Unit": Equal(""), - "Type": Equal(eventType), + "Type": Equal(events.Envelope_ValueMetric), }), })) }) + Context("with CounterTracker enabled", func() { + BeforeEach(func() { + counterTracker = NewCounterTracker(context.TODO(), time.Duration(5)*time.Second, logger) + subject, err = NewMetricSink(logger, "firehose", labelMaker, metricBuffer, counterTracker, unitParser, "^runtimeMetric\\..*") + Expect(err).To(BeNil()) + }) + + It("creates cumulative metrics for CounterEvent", func() { + eventTime := time.Now() + + eventType := events.Envelope_CounterEvent + origin := "origin" + name := "counterName" + + // List of {delta, total} events to produce. + eventValues := [][]uint64{ + {5, 105}, + {10, 115}, + {10, 125}, + {5, 5}, // counter reset + {20, 25}, + } + + for idx, values := range eventValues { + ts := eventTime.UnixNano() + int64(time.Second)*int64(idx) // Events are 1 second apart. + delta := values[0] + total := values[1] + subject.Receive(&events.Envelope{ + Origin: &origin, + EventType: &eventType, + Timestamp: &ts, + CounterEvent: &events.CounterEvent{ + Name: &name, + Delta: &delta, + Total: &total, + }, + }) + } + + metrics := metricBuffer.PostedMetrics + Expect(metrics).To(HaveLen(4)) + eventName := func(element interface{}) string { + return element.(messages.Metric).Name + } + Expect(metrics).To(MatchElements(eventName, AllowDuplicates, Elements{ + "firehose/origin.counterName": MatchAllFields(Fields{ + "Name": Ignore(), + "Labels": Equal(map[string]string{"foundation": "foobar"}), + "Value": BeNumerically("==", 0), + "IntValue": Ignore(), + "EventTime": Ignore(), + "StartTime": BeTemporally("~", eventTime), + "Unit": Equal(""), + "Type": Equal(eventType), + }), + })) + expectedTotals := []float64{10, 20, 25, 45} + for idx, total := range expectedTotals { + Expect(metrics[idx]).To(MatchFields(IgnoreExtras, Fields{ + "IntValue": BeNumerically("==", total), + "EventTime": BeTemporally("~", eventTime.Add(time.Duration(idx+1)*time.Second)), + })) + } + }) + }) + It("returns error when envelope contains unhandled event type", func() { eventType := events.Envelope_HttpStartStop envelope := &events.Envelope{ diff --git a/src/stackdriver-nozzle/stackdriver/metric_adapter.go b/src/stackdriver-nozzle/stackdriver/metric_adapter.go index 94c8ef9b..577524a8 100644 --- a/src/stackdriver-nozzle/stackdriver/metric_adapter.go +++ b/src/stackdriver-nozzle/stackdriver/metric_adapter.go @@ -22,14 +22,10 @@ import ( "path" "strings" "sync" - "time" "github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/messages" "github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/telemetry" "github.com/cloudfoundry/lager" - "github.com/golang/protobuf/ptypes/timestamp" - labelpb "google.golang.org/genproto/googleapis/api/label" - metricpb "google.golang.org/genproto/googleapis/api/metric" monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3" ) @@ -145,15 +141,7 @@ func (ma *metricAdapter) buildTimeSeries(metrics []*messages.Metric) []*monitori firehoseEventsCount.Increment() timeSeriesCount.Increment() - metricType := path.Join("custom.googleapis.com", metric.Name) - timeSeries := monitoringpb.TimeSeries{ - Metric: &metricpb.Metric{ - Type: metricType, - Labels: metric.Labels, - }, - Points: points(metric.Value, metric.EventTime), - } - timeSerieses = append(timeSerieses, &timeSeries) + timeSerieses = append(timeSerieses, metric.TimeSeries()) } return timeSerieses @@ -161,29 +149,10 @@ func (ma *metricAdapter) buildTimeSeries(metrics []*messages.Metric) []*monitori func (ma *metricAdapter) CreateMetricDescriptor(metric *messages.Metric) error { projectName := path.Join("projects", ma.projectID) - metricType := path.Join("custom.googleapis.com", metric.Name) - metricName := path.Join(projectName, "metricDescriptors", metricType) - - var labelDescriptors []*labelpb.LabelDescriptor - for key := range metric.Labels { - labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{ - Key: key, - ValueType: labelpb.LabelDescriptor_STRING, - }) - } req := &monitoringpb.CreateMetricDescriptorRequest{ - Name: projectName, - MetricDescriptor: &metricpb.MetricDescriptor{ - Name: metricName, - Type: metricType, - Labels: labelDescriptors, - MetricKind: metricpb.MetricDescriptor_GAUGE, - ValueType: metricpb.MetricDescriptor_DOUBLE, - Unit: metric.Unit, - Description: "stackdriver-nozzle created custom metric.", - DisplayName: metric.Name, // TODO - }, + Name: projectName, + MetricDescriptor: metric.MetricDescriptor(projectName), } descriptorReqs.Increment() @@ -213,7 +182,7 @@ func (ma *metricAdapter) fetchMetricDescriptorNames() error { } func (ma *metricAdapter) ensureMetricDescriptor(metric *messages.Metric) error { - if metric.Unit == "" { + if !metric.NeedsMetricDescriptor() { return nil } @@ -231,22 +200,3 @@ func (ma *metricAdapter) ensureMetricDescriptor(metric *messages.Metric) error { ma.descriptors[metric.Name] = struct{}{} return nil } - -func points(value float64, eventTime time.Time) []*monitoringpb.Point { - timeStamp := timestamp.Timestamp{ - Seconds: eventTime.Unix(), - Nanos: int32(eventTime.Nanosecond()), - } - point := &monitoringpb.Point{ - Interval: &monitoringpb.TimeInterval{ - EndTime: &timeStamp, - StartTime: &timeStamp, - }, - Value: &monitoringpb.TypedValue{ - Value: &monitoringpb.TypedValue_DoubleValue{ - DoubleValue: value, - }, - }, - } - return []*monitoringpb.Point{point} -} diff --git a/src/stackdriver-nozzle/stackdriver/metric_adapter_test.go b/src/stackdriver-nozzle/stackdriver/metric_adapter_test.go index 46013730..f162154e 100644 --- a/src/stackdriver-nozzle/stackdriver/metric_adapter_test.go +++ b/src/stackdriver-nozzle/stackdriver/metric_adapter_test.go @@ -26,6 +26,7 @@ import ( "github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/messages" "github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/mocks" + "github.com/cloudfoundry/sonde-go/events" . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" @@ -146,11 +147,16 @@ var _ = Describe("MetricAdapter", func() { Name: "metricWithoutUnit", Labels: labels, }, + { + Name: "someCounter", + Labels: labels, + Type: events.Envelope_CounterEvent, + }, } subject.PostMetrics(metrics) - Expect(client.DescriptorReqs).To(HaveLen(1)) + Expect(client.DescriptorReqs).To(HaveLen(2)) req := client.DescriptorReqs[0] Expect(req.Name).To(Equal("projects/my-awesome-project")) Expect(req.MetricDescriptor).To(Equal(&metricpb.MetricDescriptor{ @@ -163,6 +169,8 @@ var _ = Describe("MetricAdapter", func() { Description: "stackdriver-nozzle created custom metric.", DisplayName: "metricWithUnit", })) + Expect(client.DescriptorReqs[1].MetricDescriptor.MetricKind).To(Equal(metricpb.MetricDescriptor_CUMULATIVE)) + Expect(client.DescriptorReqs[1].MetricDescriptor.ValueType).To(Equal(metricpb.MetricDescriptor_INT64)) }) It("only creates the same descriptor once", func() {