Skip to content
This repository has been archived by the owner on Jun 25, 2020. It is now read-only.

Report CounterEvents as cumulative metrics to Stackdriver #162

Merged
merged 1 commit into from
Dec 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions jobs/stackdriver-nozzle/spec
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ properties:
nozzle.foundation_name:
description: Name added as the 'foundation' label to all time series being sent to Stackdriver, useful for differentiating between multiple PCF instances in a project.
default: cf

nozzle.enable_cumulative_counters:
description: Enable reporting counter events as cumulative Stackdriver metrics. This requires all CounterEvent messages for a given metric to be routed to the same nozzle process (which is the case if you run a single copy of the nozzle).
default: false
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ case $1 in
export FOUNDATION_NAME=<%= p('nozzle.foundation_name', 'cf') %>
export LOGGING_BATCH_COUNT=<%= p('nozzle.logging_batch_count', '1000') %>
export LOGGING_BATCH_DURATION=<%= p('nozzle.logging_batch_duration', '30') %>
export ENABLE_CUMULATIVE_COUNTERS=<%= p('nozzle.enable_cumulative_counters', 'false') %>

<% if_p('gcp.project_id') do |prop| %>
export GCP_PROJECT_ID=<%= prop %>
Expand Down
8 changes: 7 additions & 1 deletion src/stackdriver-nozzle/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ func (a *App) newMetricSink(ctx context.Context, metricAdapter stackdriver.Metri
metricBuffer := metrics_pipeline.NewAutoCulledMetricsBuffer(ctx, a.logger, time.Duration(a.c.MetricsBufferDuration)*time.Second, metricAdapter)
a.bufferEmpty = metricBuffer.IsEmpty

return nozzle.NewMetricSink(a.logger, a.c.MetricPathPrefix, a.labelMaker, metricBuffer, nozzle.NewUnitParser(), a.c.RuntimeMetricRegex)
var counterTracker *nozzle.CounterTracker
if a.c.EnableCumulativeCounters {
ttl := time.Duration(a.c.CounterTrackerTTL) * time.Second
counterTracker = nozzle.NewCounterTracker(ctx, ttl, a.logger)
}

return nozzle.NewMetricSink(a.logger, a.c.MetricPathPrefix, a.labelMaker, metricBuffer, counterTracker, nozzle.NewUnitParser(), a.c.RuntimeMetricRegex)
}

func (a *App) newTelemetryReporter() telemetry.Reporter {
Expand Down
7 changes: 7 additions & 0 deletions src/stackdriver-nozzle/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type Config struct {
DebugNozzle bool `envconfig:"debug_nozzle"`
// By default 'origin' label is prepended to metric name, however for runtime metrics (defined here) we add it as a metric label instead.
RuntimeMetricRegex string `envconfig:"runtime_metric_regex" default:"^(numCPUS|numGoRoutines|memoryStats\\..*)$"`
// If enabled, CounterEvents will be reported as cumulative Stackdriver metrics instead of two gauges (<metric>.delta
// and <metric>.total). Reporting cumulative metrics involves nozzle keeping track of internal counter state, and
// requires deterministic routing of CounterEvents to nozzles (i.e. CounterEvent messages for a particular metric MUST
// always be routed to the same nozzle process); the easiest way to achieve that is to run a single copy of the nozzle.
EnableCumulativeCounters bool `envconfig:"enable_cumulative_counters"`
// Expire internal counter state if a given counter has not been seen for this many seconds.
CounterTrackerTTL int `envconfig:"counter_tracker_ttl" default:"130"`
}

func (c *Config) validate() error {
Expand Down
89 changes: 83 additions & 6 deletions src/stackdriver-nozzle/messages/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,113 @@ package messages

import (
"bytes"
"fmt"
"path"
"sort"
"time"

"github.com/cloudfoundry/sonde-go/events"
"github.com/golang/protobuf/ptypes/timestamp"
labelpb "google.golang.org/genproto/googleapis/api/label"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)

// Metric represents one of the metrics contained in an events.Envelope.
type Metric struct {
Name string
Labels map[string]string `json:"-"`
Value float64
IntValue int64
EventTime time.Time
StartTime time.Time `json:"-"`
Unit string // TODO Should this be "1" if it's empty?
Type events.Envelope_EventType `json:"-"`
}

func (m *Metric) IsCumulative() bool {
return m.Type == events.Envelope_CounterEvent
}

func (m *Metric) metricType() string {
return path.Join("custom.googleapis.com", m.Name)
}

// NeedsMetricDescriptor determines whether a custom metric descriptor needs to be created for this metric in Stackdriver.
// We do that if we need to set a custom unit, or mark metric as a cumulative.
func (m *Metric) NeedsMetricDescriptor() bool {
return m.Unit != "" || m.IsCumulative()
}

// MetricDescriptor returns a Stackdriver MetricDescriptor proto for this metric.
func (m *Metric) MetricDescriptor(projectName string) *metricpb.MetricDescriptor {
metricType := m.metricType()

var labelDescriptors []*labelpb.LabelDescriptor
for key := range m.Labels {
labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{
Key: key,
ValueType: labelpb.LabelDescriptor_STRING,
})
}

metricKind := metricpb.MetricDescriptor_GAUGE
valueType := metricpb.MetricDescriptor_DOUBLE
if m.IsCumulative() {
metricKind = metricpb.MetricDescriptor_CUMULATIVE
valueType = metricpb.MetricDescriptor_INT64
}

return &metricpb.MetricDescriptor{
Name: path.Join(projectName, "metricDescriptors", metricType),
Type: metricType,
Labels: labelDescriptors,
MetricKind: metricKind,
ValueType: valueType,
Unit: m.Unit,
Description: "stackdriver-nozzle created custom metric.",
DisplayName: m.Name,
}
}

// TimeSeries returns a Stackdriver TimeSeries proto for this metric value.
func (m *Metric) TimeSeries() *monitoringpb.TimeSeries {
var value *monitoringpb.TypedValue
if m.IsCumulative() {
value = &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{Int64Value: m.IntValue}}
} else {
value = &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: m.Value}}
}

point := &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
EndTime: &timestamp.Timestamp{Seconds: m.EventTime.Unix(), Nanos: int32(m.EventTime.Nanosecond())},
StartTime: &timestamp.Timestamp{Seconds: m.StartTime.Unix(), Nanos: int32(m.StartTime.Nanosecond())},
},
Value: value,
}
return &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: m.metricType(),
Labels: m.Labels,
},
Points: []*monitoringpb.Point{point},
}
}

func (m *Metric) Hash() string {
var b bytes.Buffer

// Extract keys to a slice and sort it
numKeys := len(m.Labels) + 1
keys := make([]string, numKeys, numKeys)
keys = append(keys, m.Name)
b.Write([]byte(m.Name))

// Extract label keys to a slice and sort it
keys := make([]string, 0, len(m.Labels))
for k := range m.Labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
b.Write([]byte(k))
b.Write([]byte(m.Labels[k]))
b.Write([]byte(fmt.Sprintf(",%s='%s'", k, m.Labels[k])))
}
return b.String()
}
176 changes: 176 additions & 0 deletions src/stackdriver-nozzle/nozzle/counter_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nozzle

import (
"context"
"expvar"
"math"
"sync"
"time"

"github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/telemetry"
"github.com/cloudfoundry/lager"
)

const maxExpirePeriod = 10 * time.Second

var countersExpiredCount *telemetry.Counter

func init() {
countersExpiredCount = telemetry.NewCounter(telemetry.Nozzle, "metrics.counters.expired")
}

type counterData struct {
startTime time.Time
totalValue *expvar.Int
lastValue uint64
lastSeenTime time.Time
lastEventTime time.Time
}

// CounterTracker is used to provide a "start time" for each loggregator counter metric exported by the nozzle.
//
// Stackdriver requires each point for a cumulative metric to include "start time" in addition to the actual event time
// (aka "end time"): https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#point
// Typically start time would correspond to the time when the actual process exporting the metric started. This ensures
// that when a process is restarted (and counter gets reset to 0), start time increases.
//
// Since binaries that export counter events to loggregator only provide event time, the nozzle needs to determine start
// time for each metric itself. To do that, CounterTracker keeps its own counter for each metric, which corresponds to the
// total number of events since the metric was first seen by the nozzle (which is exported as the start time).
//
// As an example, a series of incoming CounterEvents with total values of [100, 110, 115, 150] will be exported by the
// nozzle as [10, 15, 50] (first point seen by the nozzle is discarded, because each point reported to Stackdriver needs
// to cover non-zero time interval between start time and end time).
//
// If CounterTracker detects the total value for a given counter decrease, it will interpret this as a counter reset. This
// will not result in the Stackdriver cumulative metric being reset as well; for example, incoming CounterEvents with total
// values of [100, 110, 115, 10, 17] will be exported by the nozzle as [10, 15, 25, 32].
//
// CounterTracker will regularly remove internal state for metrics that have not been seen for a while. This is done to
// conserve memory, and also to ensure that old values do not re-surface if a given counter stops being exported for some
// period of time.
type CounterTracker struct {
counters map[string]*counterData
mu *sync.Mutex // protects `counters`
ttl time.Duration
logger lager.Logger
ticker *time.Ticker
ctx context.Context
}

// NewCounterTracker creates and returns a counter tracker.
func NewCounterTracker(ctx context.Context, ttl time.Duration, logger lager.Logger) *CounterTracker {
expirePeriod := time.Duration(ttl.Nanoseconds() / 2)
if expirePeriod > maxExpirePeriod {
expirePeriod = maxExpirePeriod
}
c := &CounterTracker{
counters: map[string]*counterData{},
mu: &sync.Mutex{},
ttl: ttl,
logger: logger,
ticker: time.NewTicker(expirePeriod),
ctx: ctx,
}
go func() {
for {
select {
case <-c.ticker.C:
c.expire()
case <-c.ctx.Done():
c.ticker.Stop()
return
}
}
}()
return c
}

// Update accepts a counter name, event time and a value, and returns the total value for the counter along with its
// start time. Counter name provided needs to uniquely identify the time series (so it needs to include metric name as
// well as all metric label values).
// At least two values need to be observed for a given counter to determine the total value, so for the first observed
// value, 0 will be returned as the total, and end time will be equal to event time. Such points should not be reported
// to Stackdriver, since it expects points covering non-zero time interval.
func (t *CounterTracker) Update(name string, value uint64, eventTime time.Time) (int64, time.Time) {
t.mu.Lock()
defer t.mu.Unlock()

c, present := t.counters[name]
if !present {
c = t.newCounterData(name, eventTime)
t.counters[name] = c
} else {
var delta uint64
if c.lastValue > value {
// Counter has been reset.
delta = value
} else {
delta = value - c.lastValue
}
if uint64(c.totalValue.Value())+delta > math.MaxInt64 {
// Accumulated value overflows int64, we need to reset the counter.
c.totalValue.Set(int64(delta))
c.startTime = c.lastEventTime
} else {
c.totalValue.Add(int64(delta))
}
}
c.lastValue = value
c.lastSeenTime = time.Now()
c.lastEventTime = eventTime
return c.totalValue.Value(), c.startTime
}

func (t *CounterTracker) newCounterData(name string, eventTime time.Time) *counterData {
var v *expvar.Int
existing := expvar.Get(name)
if existing != nil {
// There was a previous counter with this name; use it instead, but reset value to 0.
v = existing.(*expvar.Int)
v.Set(0)
} else {
v = expvar.NewInt(name)
}
// Initialize counter state for a new counter.
return &counterData{
totalValue: v,
startTime: eventTime,
}
}

func (t *CounterTracker) expire() {
t.mu.Lock()
defer t.mu.Unlock()

for name, counter := range t.counters {
if time.Now().Sub(counter.lastSeenTime) > t.ttl {
t.logger.Info("CounterTracker", lager.Data{
"info": "removing expired counter",
"name": name,
"counter": counter,
"value": t.counters[name].totalValue.Value(),
})
// Reset values to -1 to make expired counters visible in /debug/vars.
t.counters[name].totalValue.Set(-1)
delete(t.counters, name)
countersExpiredCount.Increment()
}
}
}
Loading