Skip to content

Commit

Permalink
metric: use cumulative count instead of windowed count in tsdb
Browse files Browse the repository at this point in the history
Previously, we were writing the windowed count of a histogram into tsdb.
This meant that on each tick, the count reset. This is useful for
calculating averages and quantiles using windowed histograms, but makes
little sense to record for `count`.

This patch now uses the cumulative count in tsdb.

This patch also adds a `-sum` field to maintain a record of the
cumulative sum along with the cumulative counts.

Fixes #98745

Release note (bug fix): Timeseries metric counts will now show
cumulative counts for a histogram rather than the windowed count. A
`-sum` timeseries is also exported to keep track of the cumulative sum
of all samples in the histogram.
  • Loading branch information
aadityasondhi committed May 11, 2023
1 parent 9af73d5 commit b5bac44
Show file tree
Hide file tree
Showing 15 changed files with 142 additions and 94 deletions.
13 changes: 8 additions & 5 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ func TestConnector_dialTenantCluster(t *testing.T) {
// Assert existing calls.
require.Equal(t, 1, dialSQLServerCount)
require.Equal(t, 1, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(1))
count, _ := c.DialTenantLatency.Total()
require.Equal(t, count, int64(1))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))

// Invoke dial tenant with a failure to ReportFailure. Final error
Expand All @@ -453,7 +454,8 @@ func TestConnector_dialTenantCluster(t *testing.T) {
// Assert existing calls.
require.Equal(t, 2, dialSQLServerCount)
require.Equal(t, 2, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(2))
count, _ = c.DialTenantLatency.Total()
require.Equal(t, count, int64(2))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))
})

Expand All @@ -478,8 +480,8 @@ func TestConnector_dialTenantCluster(t *testing.T) {
conn, err := c.dialTenantCluster(ctx, nil /* requester */)
require.EqualError(t, err, "baz")
require.Nil(t, conn)

require.Equal(t, c.DialTenantLatency.TotalCount(), int64(1))
count, _ := c.DialTenantLatency.Total()
require.Equal(t, count, int64(1))
require.Equal(t, c.DialTenantRetries.Count(), int64(0))
})

Expand Down Expand Up @@ -551,7 +553,8 @@ func TestConnector_dialTenantCluster(t *testing.T) {
require.Equal(t, 3, addrLookupFnCount)
require.Equal(t, 2, dialSQLServerCount)
require.Equal(t, 1, reportFailureFnCount)
require.Equal(t, c.DialTenantLatency.TotalCount(), int64(1))
count, _ := c.DialTenantLatency.Total()
require.Equal(t, count, int64(1))
require.Equal(t, c.DialTenantRetries.Count(), int64(2))
})

Expand Down
28 changes: 16 additions & 12 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,8 @@ func TestProxyAgainstSecureCRDB(t *testing.T) {
})
}
require.Equal(t, int64(4), s.metrics.SuccessfulConnCount.Count())
require.Equal(t, int64(4), s.metrics.ConnectionLatency.TotalCount())
count, _ := s.metrics.ConnectionLatency.Total()
require.Equal(t, int64(4), count)
require.Equal(t, int64(2), s.metrics.AuthFailedCount.Count())
require.Equal(t, int64(1), s.metrics.RoutingErrCount.Count())
}
Expand Down Expand Up @@ -611,7 +612,8 @@ func TestProxyTLSClose(t *testing.T) {
_ = conn.Close(ctx)

require.Equal(t, int64(1), s.metrics.SuccessfulConnCount.Count())
require.Equal(t, int64(1), s.metrics.ConnectionLatency.TotalCount())
count, _ := s.metrics.ConnectionLatency.Total()
require.Equal(t, int64(1), count)
require.Equal(t, int64(0), s.metrics.AuthFailedCount.Count())
}

Expand Down Expand Up @@ -718,7 +720,8 @@ func TestInsecureProxy(t *testing.T) {
})
require.Equal(t, int64(1), s.metrics.AuthFailedCount.Count())
require.Equal(t, int64(1), s.metrics.SuccessfulConnCount.Count())
require.Equal(t, int64(1), s.metrics.ConnectionLatency.TotalCount())
count, _ := s.metrics.ConnectionLatency.Total()
require.Equal(t, int64(1), count)
}

func TestErroneousFrontend(t *testing.T) {
Expand Down Expand Up @@ -827,7 +830,8 @@ func TestProxyRefuseConn(t *testing.T) {
_ = te.TestConnectErr(ctx, t, url, codeProxyRefusedConnection, "too many attempts")
require.Equal(t, int64(1), s.metrics.RefusedConnCount.Count())
require.Equal(t, int64(0), s.metrics.SuccessfulConnCount.Count())
require.Equal(t, int64(0), s.metrics.ConnectionLatency.TotalCount())
count, _ := s.metrics.ConnectionLatency.Total()
require.Equal(t, int64(0), count)
require.Equal(t, int64(0), s.metrics.AuthFailedCount.Count())
}

Expand Down Expand Up @@ -1643,10 +1647,10 @@ func TestConnectionMigration(t *testing.T) {
proxy.metrics.ConnMigrationErrorRecoverableCount.Count() +
proxy.metrics.ConnMigrationErrorFatalCount.Count()
require.Equal(t, totalAttempts, proxy.metrics.ConnMigrationAttemptedCount.Count())
require.Equal(t, totalAttempts,
proxy.metrics.ConnMigrationAttemptedLatency.TotalCount())
require.Equal(t, totalAttempts,
proxy.metrics.ConnMigrationTransferResponseMessageSize.TotalCount())
count, _ := proxy.metrics.ConnMigrationAttemptedLatency.Total()
require.Equal(t, totalAttempts, count)
count, _ = proxy.metrics.ConnMigrationTransferResponseMessageSize.Total()
require.Equal(t, totalAttempts, count)
}

transferConnWithRetries := func(t *testing.T, f *forwarder) error {
Expand Down Expand Up @@ -1966,12 +1970,12 @@ func TestConnectionMigration(t *testing.T) {
f.metrics.ConnMigrationErrorRecoverableCount.Count() +
f.metrics.ConnMigrationErrorFatalCount.Count()
require.Equal(t, totalAttempts, f.metrics.ConnMigrationAttemptedCount.Count())
require.Equal(t, totalAttempts,
f.metrics.ConnMigrationAttemptedLatency.TotalCount())
count, _ := f.metrics.ConnMigrationAttemptedLatency.Total()
require.Equal(t, totalAttempts, count)
// Here, we get a transfer timeout in response, so the message size
// should not be recorded.
require.Equal(t, totalAttempts-1,
f.metrics.ConnMigrationTransferResponseMessageSize.TotalCount())
count, _ = f.metrics.ConnMigrationTransferResponseMessageSize.Total()
require.Equal(t, totalAttempts-1, count)
})

// All connections should eventually be terminated.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3613,7 +3613,8 @@ func TestTxnCoordSenderRetries(t *testing.T) {
require.Equal(t, exp.expClientRefreshFailure, metrics.ClientRefreshFail.Count() != 0, "TxnMetrics.ClientRefreshFail")
require.Equal(t, exp.expClientAutoRetryAfterRefresh, metrics.ClientRefreshAutoRetries.Count() != 0, "TxnMetrics.ClientRefreshAutoRetries")
require.Equal(t, exp.expServerRefresh, metrics.ServerRefreshSuccess.Count() != 0, "TxnMetrics.ServerRefreshSuccess")
require.Equal(t, exp.expClientRestart, metrics.Restarts.TotalSum() != 0, "TxnMetrics.Restarts")
_, restartsSum := metrics.Restarts.Total()
require.Equal(t, exp.expClientRestart, restartsSum != 0, "TxnMetrics.Restarts")
require.Equal(t, exp.expOnePhaseCommit, metrics.Commits1PC.Count() != 0, "TxnMetrics.Commits1PC")
}
for _, tc := range testCases {
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,15 +1149,17 @@ func checkTxnMetrics(
func checkTxnMetricsOnce(
metrics kvcoord.TxnMetrics, name string, commits, commits1PC, aborts, restarts int64,
) error {
durationCounts, _ := metrics.Durations.Total()
restartsCounts, _ := metrics.Restarts.Total()
testcases := []struct {
name string
a, e int64
}{
{"commits", metrics.Commits.Count(), commits},
{"commits1PC", metrics.Commits1PC.Count(), commits1PC},
{"aborts", metrics.Aborts.Count(), aborts},
{"durations", metrics.Durations.TotalCount(), commits + aborts},
{"restarts", metrics.Restarts.TotalCount(), restarts},
{"durations", durationCounts, commits + aborts},
{"restarts", restartsCounts, restarts},
}

for _, tc := range testcases {
Expand Down Expand Up @@ -1374,7 +1376,8 @@ func TestTxnDurations(t *testing.T) {
// introducing spurious errors or being overly lax.
//
// TODO(cdo): look into cause of variance.
if a, e := hist.TotalCount(), int64(puts); a != e {
count, _ := hist.Total()
if a, e := count, int64(puts); a != e {
t.Fatalf("durations %d != expected %d", a, e)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ func TestSchedulerLoop(t *testing.T) {
return nil
})

require.Equal(t, int64(3), m.RaftSchedulerLatency.TotalCount())
count, _ := m.RaftSchedulerLatency.Total()
require.Equal(t, int64(3), count)
}

// Verify that when we enqueue the same range multiple times for the same
Expand Down
9 changes: 6 additions & 3 deletions pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,12 @@ type registryRecorder struct {
func extractValue(name string, mtr interface{}, fn func(string, float64)) error {
switch mtr := mtr.(type) {
case metric.WindowedHistogram:
n := float64(mtr.TotalCountWindowed())
fn(name+"-count", n)
avg := mtr.TotalSumWindowed() / n
// Use cumulative stats here
count, sum := mtr.Total()
fn(name+"-count", float64(count))
fn(name+"-sum", sum)
// Use windowed stats for avg and quantiles
avg := mtr.MeanWindowed()
if math.IsNaN(avg) || math.IsInf(avg, +1) || math.IsInf(avg, -1) {
avg = 0
}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/status/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ func TestMetricsRecorder(t *testing.T) {
addExpected(reg.prefix, data.name+q.Suffix, reg.source, 100, 10, reg.isNode)
}
addExpected(reg.prefix, data.name+"-count", reg.source, 100, 1, reg.isNode)
addExpected(reg.prefix, data.name+"-sum", reg.source, 100, 9, reg.isNode)
addExpected(reg.prefix, data.name+"-avg", reg.source, 100, 9, reg.isNode)
default:
t.Fatalf("unexpected: %+v", data)
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/asciitsdb/asciitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ func (t *TSDB) Scrape(ctx context.Context) {

switch mtr := val.(type) {
case metric.WindowedHistogram:
n := float64(mtr.TotalCountWindowed())
if _, ok := t.mu.points[name]; !ok {
return
}
t.mu.points[name+"-count"] = append(t.mu.points[name+"-count"], n)
avg := mtr.TotalSumWindowed() / n
count, _ := mtr.Total()
t.mu.points[name+"-count"] = append(t.mu.points[name+"-count"], float64(count))
avg := mtr.MeanWindowed()
if math.IsNaN(avg) || math.IsInf(avg, +1) || math.IsInf(avg, -1) {
avg = 0
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/util/metric/aggmetric/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,24 @@ func (a *AggHistogram) GetMetadata() metric.Metadata { return a.h.GetMetadata()
// Inspect is part of the metric.Iterable interface.
func (a *AggHistogram) Inspect(f func(interface{})) { f(a) }

// TotalCountWindowed is part of the metric.WindowedHistogram interface
func (a *AggHistogram) TotalCountWindowed() int64 {
return a.h.TotalCountWindowed()
// TotalWindowed is part of the metric.WindowedHistogram interface
func (a *AggHistogram) TotalWindowed() (int64, float64) {
return a.h.TotalWindowed()
}

// TotalSumWindowed is part of the metric.WindowedHistogram interface
func (a *AggHistogram) TotalSumWindowed() float64 {
return a.h.TotalSumWindowed()
// Total is part of the metric.WindowedHistogram interface
func (a *AggHistogram) Total() (int64, float64) {
return a.h.Total()
}

// MeanWindowed is part of the metric.WindowedHistogram interface
func (a *AggHistogram) MeanWindowed() float64 {
return a.h.MeanWindowed()
}

// Mean is part of the metric.WindowedHistogram interface
func (a *AggHistogram) Mean() float64 {
return a.h.Mean()
}

// ValueAtQuantileWindowed is part of the metric.WindowedHistogram interface
Expand Down
27 changes: 13 additions & 14 deletions pkg/util/metric/hdrhistogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,12 @@ func (h *HdrHistogram) RecordValue(v int64) {
}
}

// TotalCount returns the (cumulative) number of samples.
func (h *HdrHistogram) TotalCount() int64 {
// Total returns the (cumulative) number of samples and sum of samples.
func (h *HdrHistogram) Total() (int64, float64) {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.cumulative.TotalCount()
totalSum := float64(h.mu.cumulative.TotalCount()) * h.mu.cumulative.Mean()
return h.mu.cumulative.TotalCount(), totalSum
}

// Min returns the minimum.
Expand Down Expand Up @@ -168,14 +169,10 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric {
}
}

// TotalCountWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalCountWindowed() int64 {
return int64(h.ToPrometheusMetricWindowed().Histogram.GetSampleCount())
}

// TotalSumWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalSumWindowed() float64 {
return h.ToPrometheusMetricWindowed().Histogram.GetSampleSum()
// TotalWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalWindowed() (int64, float64) {
pHist := h.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
}

func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
Expand Down Expand Up @@ -240,7 +237,9 @@ func (h *HdrHistogram) Mean() float64 {
return h.mu.cumulative.Mean()
}

// TotalSum returns the (cumulative) sum of samples.
func (h *HdrHistogram) TotalSum() float64 {
return h.ToPrometheusMetric().Histogram.GetSampleSum()
func (h *HdrHistogram) MeanWindowed() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.sliding.Current.Mean()
}
71 changes: 41 additions & 30 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,16 @@ type PrometheusIterable interface {
// and values at specific quantiles from "windowed" histograms and record that
// data directly. These windows could be arbitrary and overlapping.
type WindowedHistogram interface {
// TotalCountWindowed returns the number of samples in the current window.
TotalCountWindowed() int64
// TotalSumWindowed returns the number of samples in the current window.
TotalSumWindowed() float64
// TotalWindowed returns the number of samples and their sum (respectively)
// in the current window.
TotalWindowed() (int64, float64)
// Total returns the number of samples and their sum (respectively) in the
// cumulative histogram.
Total() (int64, float64)
// MeanWindowed returns the average of the samples in the current window.
MeanWindowed() float64
// Mean returns the average of the sample in teh cumulative histogram.
Mean() float64
// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the windowed histogram.
ValueAtQuantileWindowed(q float64) float64
Expand Down Expand Up @@ -313,10 +319,7 @@ type IHistogram interface {
WindowedHistogram

RecordValue(n int64)
TotalCount() int64
TotalSum() float64
TotalCountWindowed() int64
TotalSumWindowed() float64
Total() (int64, float64)
Mean() float64
}

Expand Down Expand Up @@ -390,24 +393,16 @@ func (h *Histogram) Inspect(f func(interface{})) {
f(h)
}

// TotalCount returns the (cumulative) number of samples.
func (h *Histogram) TotalCount() int64 {
return int64(h.ToPrometheusMetric().Histogram.GetSampleCount())
}

// TotalCountWindowed implements the WindowedHistogram interface.
func (h *Histogram) TotalCountWindowed() int64 {
return int64(h.ToPrometheusMetricWindowed().Histogram.GetSampleCount())
// Total returns the (cumulative) number of samples and the sum of all samples.
func (h *Histogram) Total() (int64, float64) {
pHist := h.ToPrometheusMetric().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
}

// TotalSum returns the (cumulative) sum of samples.
func (h *Histogram) TotalSum() float64 {
return h.ToPrometheusMetric().Histogram.GetSampleSum()
}

// TotalSumWindowed implements the WindowedHistogram interface.
func (h *Histogram) TotalSumWindowed() float64 {
return h.ToPrometheusMetricWindowed().Histogram.GetSampleSum()
// TotalWindowed implements the WindowedHistogram interface.
func (h *Histogram) TotalWindowed() (int64, float64) {
pHist := h.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
}

// Mean returns the (cumulative) mean of samples.
Expand All @@ -416,6 +411,12 @@ func (h *Histogram) Mean() float64 {
return pm.Histogram.GetSampleSum() / float64(pm.Histogram.GetSampleCount())
}

// MeanWindowed implements the WindowedHistogram interface.
func (h *Histogram) MeanWindowed() float64 {
pHist := h.ToPrometheusMetricWindowed().Histogram
return pHist.GetSampleSum() / float64(pHist.GetSampleCount())
}

// ValueAtQuantileWindowed implements the WindowedHistogram interface.
//
// https://github.com/prometheus/prometheus/blob/d9162189/promql/quantile.go#L75
Expand Down Expand Up @@ -591,18 +592,28 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// TotalCountWindowed implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) TotalCountWindowed() int64 {
// TotalWindowed implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) TotalWindowed() (int64, float64) {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return int64(mwh.mu.cur.GetSampleCount())
return int64(mwh.mu.cur.GetSampleCount()), mwh.mu.cur.GetSampleSum()
}

// Total implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) Total() (int64, float64) {
h := mwh.ToPrometheusMetric().Histogram
return int64(h.GetSampleCount()), h.GetSampleSum()
}

// TotalSumWindowed implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) TotalSumWindowed() float64 {
func (mwh *ManualWindowHistogram) MeanWindowed() float64 {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return mwh.mu.cur.GetSampleSum()
return mwh.mu.cur.GetSampleSum() / float64(mwh.mu.cur.GetSampleCount())
}

func (mwh *ManualWindowHistogram) Mean() float64 {
h := mwh.ToPrometheusMetric().Histogram
return h.GetSampleSum() / float64(h.GetSampleCount())
}

// ValueAtQuantileWindowed implements the WindowedHistogram interface.
Expand Down
Loading

0 comments on commit b5bac44

Please sign in to comment.