This repository has been archived by the owner on Jun 25, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathcounter_tracker.go
174 lines (160 loc) · 5.8 KB
/
counter_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/*
* Copyright 2017 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nozzle
import (
"context"
"expvar"
"math"
"sync"
"time"
"github.com/cloudfoundry-community/stackdriver-tools/src/stackdriver-nozzle/telemetry"
"github.com/cloudfoundry/lager"
)
const maxExpirePeriod = 10 * time.Second
var countersExpiredCount *telemetry.Counter
func init() {
countersExpiredCount = telemetry.NewCounter(telemetry.Nozzle, "metrics.counters.expired")
}
type counterData struct {
startTime time.Time
totalValue *expvar.Int
lastValue uint64
lastSeenTime time.Time
lastEventTime time.Time
}
// counterTracker is used to provide a "start time" for each loggregator counter metric exported by the nozzle.
//
// Stackdriver requires each point for a cumulative metric to include "start time" in addition to the actual event time
// (aka "end time"): https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#point
// Typically start time would correspond to the time when the actual process exporting the metric started. This ensures
// that when a process is restarted (and counter gets reset to 0), start time increases.
//
// Since binaries that export counter events to loggregator only provide event time, the nozzle needs to determine start
// time for each metric itself. To do that, counterTracker keeps its own counter for each metric, which corresponds to the
// total number of events since the metric was first seen by the nozzle (which is exported as the start time).
//
// As an example, a series of incoming CounterEvents with total values of [100, 110, 115, 150] will be exported by the
// nozzle as [10, 15, 50] (first point seen by the nozzle is discarded, because each point reported to Stackdriver needs
// to cover non-zero time interval between start time and end time).
//
// If counterTracker detects the total value for a given counter decrease, it will interpret this as a counter reset. This
// will not result in the Stackdriver cumulative metric being reset as well; for example, incoming CounterEvents with total
// values of [100, 110, 115, 10, 17] will be exported by the nozzle as [10, 15, 25, 32].
//
// counterTracker will regularly remove internal state for metrics that have not been seen for a while. This is done to
// conserve memory, and also to ensure that old values do not re-surface if a given counter stops being exported for some
// period of time.
type counterTracker struct {
counters map[string]*counterData
mu *sync.Mutex // protects `counters`
ttl time.Duration
logger lager.Logger
ticker *time.Ticker
ctx context.Context
}
// NewCounterTracker creates and returns a counter tracker.
func NewCounterTracker(ctx context.Context, ttl time.Duration, logger lager.Logger) *counterTracker {
expirePeriod := time.Duration(ttl.Nanoseconds() / 2)
if expirePeriod > maxExpirePeriod {
expirePeriod = maxExpirePeriod
}
c := &counterTracker{
counters: map[string]*counterData{},
mu: &sync.Mutex{},
ttl: ttl,
logger: logger,
ticker: time.NewTicker(expirePeriod),
ctx: ctx,
}
go func() {
for {
select {
case <-c.ticker.C:
c.expire()
case <-c.ctx.Done():
c.ticker.Stop()
return
}
}
}()
return c
}
// Update accepts a counter name, event time and a value, and returns the total value for the counter along with its
// start time. Counter name provided needs to uniquely identify the time series (so it needs to include metric name as
// well as all metric label values).
// At least two values need to be observed for a given counter to determine the total value, so for the first observed
// value, 0 will be returned as a value. It should not be reported to Stackdriver, since points should cover non-zero
// time interval.
func (t *counterTracker) Update(name string, value uint64, eventTime time.Time) (int64, time.Time) {
t.mu.Lock()
defer t.mu.Unlock()
c, present := t.counters[name]
if !present {
c = t.newCounterData(name, eventTime)
t.counters[name] = c
} else {
var delta uint64
if c.lastValue > value {
// Counter has been reset.
delta = value
} else {
delta = value - c.lastValue
}
if uint64(c.totalValue.Value())+delta > math.MaxInt64 {
// Accumulated value overflows int64, we need to reset the counter.
c.totalValue.Set(int64(delta))
c.startTime = c.lastEventTime
} else {
c.totalValue.Add(int64(delta))
}
}
c.lastValue = value
c.lastSeenTime = time.Now()
c.lastEventTime = eventTime
return c.totalValue.Value(), c.startTime
}
func (t *counterTracker) newCounterData(name string, eventTime time.Time) *counterData {
var v *expvar.Int
existing := expvar.Get(name)
if existing != nil {
// There was a previous counter with this name; use it instead, but reset value to 0.
v = existing.(*expvar.Int)
v.Set(0)
} else {
v = expvar.NewInt(name)
}
// Initialize counter state for a new counter.
return &counterData{
totalValue: v,
startTime: eventTime,
}
}
func (t *counterTracker) expire() {
t.mu.Lock()
defer t.mu.Unlock()
for name, counter := range t.counters {
if time.Now().Sub(counter.lastSeenTime) > t.ttl {
t.counters[name].totalValue.Set(-1)
t.logger.Info("counterTracker", lager.Data{
"info": "removing expired counter",
"name": name,
"counter": counter,
})
delete(t.counters, name)
countersExpiredCount.Increment()
}
}
}