Skip to content
This repository has been archived by the owner on Jun 25, 2020. It is now read-only.

Commit

Permalink
Merge pull request #162 from knyar/pr-counters
Browse files Browse the repository at this point in the history
Report CounterEvents as cumulative metrics to Stackdriver
  • Loading branch information
johnsonj authored Dec 12, 2017
2 parents 2d702e1 + ae143de commit 5baad64
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 98 deletions.
4 changes: 4 additions & 0 deletions jobs/stackdriver-nozzle/spec
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ properties:
nozzle.foundation_name:
description: Name added as the 'foundation' label to all time series being sent to Stackdriver, useful for differentiating between multiple PCF instances in a project.
default: cf

nozzle.enable_cumulative_counters:
description: Enable reporting counter events as cumulative Stackdriver metrics. This requires all CounterEvent messages for a given metric to be routed to the same nozzle process (which is the case if you run a single copy of the nozzle).
default: false
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ case $1 in
export FOUNDATION_NAME=<%= p('nozzle.foundation_name', 'cf') %>
export LOGGING_BATCH_COUNT=<%= p('nozzle.logging_batch_count', '1000') %>
export LOGGING_BATCH_DURATION=<%= p('nozzle.logging_batch_duration', '30') %>
export ENABLE_CUMULATIVE_COUNTERS=<%= p('nozzle.enable_cumulative_counters', 'false') %>

<% if_p('gcp.project_id') do |prop| %>
export GCP_PROJECT_ID=<%= prop %>
Expand Down
8 changes: 7 additions & 1 deletion src/stackdriver-nozzle/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ func (a *App) newMetricSink(ctx context.Context, metricAdapter stackdriver.Metri
metricBuffer := metrics_pipeline.NewAutoCulledMetricsBuffer(ctx, a.logger, time.Duration(a.c.MetricsBufferDuration)*time.Second, metricAdapter)
a.bufferEmpty = metricBuffer.IsEmpty

return nozzle.NewMetricSink(a.logger, a.c.MetricPathPrefix, a.labelMaker, metricBuffer, nozzle.NewUnitParser(), a.c.RuntimeMetricRegex)
var counterTracker *nozzle.CounterTracker
if a.c.EnableCumulativeCounters {
ttl := time.Duration(a.c.CounterTrackerTTL) * time.Second
counterTracker = nozzle.NewCounterTracker(ctx, ttl, a.logger)
}

return nozzle.NewMetricSink(a.logger, a.c.MetricPathPrefix, a.labelMaker, metricBuffer, counterTracker, nozzle.NewUnitParser(), a.c.RuntimeMetricRegex)
}

func (a *App) newTelemetryReporter() telemetry.Reporter {
Expand Down
7 changes: 7 additions & 0 deletions src/stackdriver-nozzle/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ type Config struct {
DebugNozzle bool `envconfig:"debug_nozzle"`
// By default 'origin' label is prepended to metric name, however for runtime metrics (defined here) we add it as a metric label instead.
RuntimeMetricRegex string `envconfig:"runtime_metric_regex" default:"^(numCPUS|numGoRoutines|memoryStats\\..*)$"`
// If enabled, CounterEvents will be reported as cumulative Stackdriver metrics instead of two gauges (<metric>.delta
// and <metric>.total). Reporting cumulative metrics involves nozzle keeping track of internal counter state, and
// requires deterministic routing of CounterEvents to nozzles (i.e. CounterEvent messages for a particular metric MUST
// always be routed to the same nozzle process); the easiest way to achieve that is to run a single copy of the nozzle.
EnableCumulativeCounters bool `envconfig:"enable_cumulative_counters"`
// Expire internal counter state if a given counter has not been seen for this many seconds.
CounterTrackerTTL int `envconfig:"counter_tracker_ttl" default:"130"`
}

func (c *Config) validate() error {
Expand Down
89 changes: 83 additions & 6 deletions src/stackdriver-nozzle/messages/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,113 @@ package messages

import (
"bytes"
"fmt"
"path"
"sort"
"time"

"github.com/cloudfoundry/sonde-go/events"
"github.com/golang/protobuf/ptypes/timestamp"
labelpb "google.golang.org/genproto/googleapis/api/label"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)

// Metric represents one of the metrics contained in an events.Envelope.
type Metric struct {
Name string
Labels map[string]string `json:"-"`
Value float64
IntValue int64
EventTime time.Time
StartTime time.Time `json:"-"`
Unit string // TODO Should this be "1" if it's empty?
Type events.Envelope_EventType `json:"-"`
}

func (m *Metric) IsCumulative() bool {
return m.Type == events.Envelope_CounterEvent
}

func (m *Metric) metricType() string {
return path.Join("custom.googleapis.com", m.Name)
}

// NeedsMetricDescriptor determines whether a custom metric descriptor needs to be created for this metric in Stackdriver.
// We do that if we need to set a custom unit, or mark metric as a cumulative.
func (m *Metric) NeedsMetricDescriptor() bool {
return m.Unit != "" || m.IsCumulative()
}

// MetricDescriptor returns a Stackdriver MetricDescriptor proto for this metric.
func (m *Metric) MetricDescriptor(projectName string) *metricpb.MetricDescriptor {
metricType := m.metricType()

var labelDescriptors []*labelpb.LabelDescriptor
for key := range m.Labels {
labelDescriptors = append(labelDescriptors, &labelpb.LabelDescriptor{
Key: key,
ValueType: labelpb.LabelDescriptor_STRING,
})
}

metricKind := metricpb.MetricDescriptor_GAUGE
valueType := metricpb.MetricDescriptor_DOUBLE
if m.IsCumulative() {
metricKind = metricpb.MetricDescriptor_CUMULATIVE
valueType = metricpb.MetricDescriptor_INT64
}

return &metricpb.MetricDescriptor{
Name: path.Join(projectName, "metricDescriptors", metricType),
Type: metricType,
Labels: labelDescriptors,
MetricKind: metricKind,
ValueType: valueType,
Unit: m.Unit,
Description: "stackdriver-nozzle created custom metric.",
DisplayName: m.Name,
}
}

// TimeSeries returns a Stackdriver TimeSeries proto for this metric value.
func (m *Metric) TimeSeries() *monitoringpb.TimeSeries {
var value *monitoringpb.TypedValue
if m.IsCumulative() {
value = &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{Int64Value: m.IntValue}}
} else {
value = &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: m.Value}}
}

point := &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
EndTime: &timestamp.Timestamp{Seconds: m.EventTime.Unix(), Nanos: int32(m.EventTime.Nanosecond())},
StartTime: &timestamp.Timestamp{Seconds: m.StartTime.Unix(), Nanos: int32(m.StartTime.Nanosecond())},
},
Value: value,
}
return &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: m.metricType(),
Labels: m.Labels,
},
Points: []*monitoringpb.Point{point},
}
}

func (m *Metric) Hash() string {
var b bytes.Buffer

// Extract keys to a slice and sort it
numKeys := len(m.Labels) + 1
keys := make([]string, numKeys, numKeys)
keys = append(keys, m.Name)
b.Write([]byte(m.Name))

// Extract label keys to a slice and sort it
keys := make([]string, 0, len(m.Labels))
for k := range m.Labels {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
b.Write([]byte(k))
b.Write([]byte(m.Labels[k]))
b.Write([]byte(fmt.Sprintf(",%s='%s'", k, m.Labels[k])))
}
return b.String()
}
176 changes: 176 additions & 0 deletions src/stackdriver-nozzle/nozzle/counter_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nozzle

import (
"context"
"expvar"
"math"
"sync"
"time"

"github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/telemetry"
"github.com/cloudfoundry/lager"
)

const maxExpirePeriod = 10 * time.Second

var countersExpiredCount *telemetry.Counter

func init() {
countersExpiredCount = telemetry.NewCounter(telemetry.Nozzle, "metrics.counters.expired")
}

type counterData struct {
startTime time.Time
totalValue *expvar.Int
lastValue uint64
lastSeenTime time.Time
lastEventTime time.Time
}

// CounterTracker is used to provide a "start time" for each loggregator counter metric exported by the nozzle.
//
// Stackdriver requires each point for a cumulative metric to include "start time" in addition to the actual event time
// (aka "end time"): https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#point
// Typically start time would correspond to the time when the actual process exporting the metric started. This ensures
// that when a process is restarted (and counter gets reset to 0), start time increases.
//
// Since binaries that export counter events to loggregator only provide event time, the nozzle needs to determine start
// time for each metric itself. To do that, CounterTracker keeps its own counter for each metric, which corresponds to the
// total number of events since the metric was first seen by the nozzle (which is exported as the start time).
//
// As an example, a series of incoming CounterEvents with total values of [100, 110, 115, 150] will be exported by the
// nozzle as [10, 15, 50] (first point seen by the nozzle is discarded, because each point reported to Stackdriver needs
// to cover non-zero time interval between start time and end time).
//
// If CounterTracker detects the total value for a given counter decrease, it will interpret this as a counter reset. This
// will not result in the Stackdriver cumulative metric being reset as well; for example, incoming CounterEvents with total
// values of [100, 110, 115, 10, 17] will be exported by the nozzle as [10, 15, 25, 32].
//
// CounterTracker will regularly remove internal state for metrics that have not been seen for a while. This is done to
// conserve memory, and also to ensure that old values do not re-surface if a given counter stops being exported for some
// period of time.
type CounterTracker struct {
counters map[string]*counterData
mu *sync.Mutex // protects `counters`
ttl time.Duration
logger lager.Logger
ticker *time.Ticker
ctx context.Context
}

// NewCounterTracker creates and returns a counter tracker.
func NewCounterTracker(ctx context.Context, ttl time.Duration, logger lager.Logger) *CounterTracker {
expirePeriod := time.Duration(ttl.Nanoseconds() / 2)
if expirePeriod > maxExpirePeriod {
expirePeriod = maxExpirePeriod
}
c := &CounterTracker{
counters: map[string]*counterData{},
mu: &sync.Mutex{},
ttl: ttl,
logger: logger,
ticker: time.NewTicker(expirePeriod),
ctx: ctx,
}
go func() {
for {
select {
case <-c.ticker.C:
c.expire()
case <-c.ctx.Done():
c.ticker.Stop()
return
}
}
}()
return c
}

// Update accepts a counter name, event time and a value, and returns the total value for the counter along with its
// start time. Counter name provided needs to uniquely identify the time series (so it needs to include metric name as
// well as all metric label values).
// At least two values need to be observed for a given counter to determine the total value, so for the first observed
// value, 0 will be returned as the total, and end time will be equal to event time. Such points should not be reported
// to Stackdriver, since it expects points covering non-zero time interval.
func (t *CounterTracker) Update(name string, value uint64, eventTime time.Time) (int64, time.Time) {
t.mu.Lock()
defer t.mu.Unlock()

c, present := t.counters[name]
if !present {
c = t.newCounterData(name, eventTime)
t.counters[name] = c
} else {
var delta uint64
if c.lastValue > value {
// Counter has been reset.
delta = value
} else {
delta = value - c.lastValue
}
if uint64(c.totalValue.Value())+delta > math.MaxInt64 {
// Accumulated value overflows int64, we need to reset the counter.
c.totalValue.Set(int64(delta))
c.startTime = c.lastEventTime
} else {
c.totalValue.Add(int64(delta))
}
}
c.lastValue = value
c.lastSeenTime = time.Now()
c.lastEventTime = eventTime
return c.totalValue.Value(), c.startTime
}

func (t *CounterTracker) newCounterData(name string, eventTime time.Time) *counterData {
var v *expvar.Int
existing := expvar.Get(name)
if existing != nil {
// There was a previous counter with this name; use it instead, but reset value to 0.
v = existing.(*expvar.Int)
v.Set(0)
} else {
v = expvar.NewInt(name)
}
// Initialize counter state for a new counter.
return &counterData{
totalValue: v,
startTime: eventTime,
}
}

func (t *CounterTracker) expire() {
t.mu.Lock()
defer t.mu.Unlock()

for name, counter := range t.counters {
if time.Now().Sub(counter.lastSeenTime) > t.ttl {
t.logger.Info("CounterTracker", lager.Data{
"info": "removing expired counter",
"name": name,
"counter": counter,
"value": t.counters[name].totalValue.Value(),
})
// Reset values to -1 to make expired counters visible in /debug/vars.
t.counters[name].totalValue.Set(-1)
delete(t.counters, name)
countersExpiredCount.Increment()
}
}
}
Loading

0 comments on commit 5baad64

Please sign in to comment.