Skip to content

Commit

Permalink
metric: extend manual histogram to support rotate
Browse files Browse the repository at this point in the history
This commit extends the ManualWindowHistogram to support RecordValue and
Rotate. Previously, it was necessary to maintain duplicate cumulative
histograms in order to batch update the manual histogram. This update
adds a quality of life feature, enabling recording to the
ManualWindowHistogram, then once finished, rotating the batch of
recorded values into the current window for the internal tsdb to query.

Touches: cockroachdb#98266

Release note: None
  • Loading branch information
kvoli committed Apr 3, 2023
1 parent 65d9792 commit 2b9d291
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 45 deletions.
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2757,7 +2757,11 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
Buckets: metric.IOLatencyBuckets,
}),
FlushUtilization: metric.NewGaugeFloat64(metaStorageFlushUtilization),
FsyncLatency: metric.NewManualWindowHistogram(metaStorageFsyncLatency, pebble.FsyncLatencyBuckets),
FsyncLatency: metric.NewManualWindowHistogram(
metaStorageFsyncLatency,
pebble.FsyncLatencyBuckets,
false, /* withRotate */
),

ReplicaReadBatchDroppedLatchesBeforeEval: metric.NewCounter(metaReplicaReadBatchDroppedLatchesBeforeEval),
ReplicaReadBatchWithoutInterleavingIter: metric.NewCounter(metaReplicaReadBatchWithoutInterleavingIter),
Expand Down
23 changes: 4 additions & 19 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3215,7 +3215,10 @@ func (s *Store) ComputeMetricsPeriodically(
if err != nil {
return m, err
}
windowFsyncLatency = subtractPrometheusMetrics(windowFsyncLatency, prevFsync)
metric.SubtractPrometheusHistograms(
windowFsyncLatency.GetHistogram(),
prevFsync.GetHistogram(),
)

s.metrics.FsyncLatency.Update(m.LogWriter.FsyncLatency, windowFsyncLatency.Histogram)
}
Expand Down Expand Up @@ -3250,24 +3253,6 @@ func (s *Store) ComputeMetricsPeriodically(
return m, nil
}

func subtractPrometheusMetrics(
curFsync *prometheusgo.Metric, prevFsync prometheusgo.Metric,
) *prometheusgo.Metric {
prevBuckets := prevFsync.Histogram.GetBucket()
curBuckets := curFsync.Histogram.GetBucket()

*curFsync.Histogram.SampleCount -= prevFsync.Histogram.GetSampleCount()
*curFsync.Histogram.SampleSum -= prevFsync.Histogram.GetSampleSum()

for idx, v := range prevBuckets {
if *curBuckets[idx].UpperBound != *v.UpperBound {
panic("Bucket Upperbounds don't match")
}
*curBuckets[idx].CumulativeCount -= *v.CumulativeCount
}
return curFsync
}

// ComputeMetrics immediately computes the current value of store metrics which
// cannot be computed incrementally. This method should be invoked periodically
// by a higher-level system which records store metrics.
Expand Down
128 changes: 103 additions & 25 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,45 +436,123 @@ var _ WindowedHistogram = (*ManualWindowHistogram)(nil)

// NewManualWindowHistogram is a prometheus-backed histogram. Depending on the
// value of the buckets parameter, this is suitable for recording any kind of
// quantity. The histogram is very similar to Histogram produced by NewHistogram
// with the main difference being that Histogram supports collecting values over
// time using the Histogram.RecordValue whereas this histogram does not support
// collecting values. Instead, this histogram supports replacing the cumulative
// and windowed histogram. This means that it is the responsibility of the
// creator of this histogram to replace the values by calling
// ManualWindowHistogram.Update.
func NewManualWindowHistogram(meta Metadata, buckets []float64) *ManualWindowHistogram {
// quantity. The histogram is very similar to Histogram produced by
// NewHistogram with the main difference being that Histogram supports
// collecting values over time using the Histogram.RecordValue whereas this
// histogram provides limited support RecordValue, the caller is responsible
// for calling Rotate, after recording is complete or manually providing the
// cumulative and current windowed histogram via Update. This means that it is
// the responsibility of the creator of this histogram to replace the values by
// either calling ManualWindowHistogram.Update or
// ManualWindowHistogram.RecordValue and ManualWindowHistogram.Rotate. If
// NewManualWindowHistogram is called withRotate as true, only the RecordValue
// and Rotate method may be used; withRotate as false, only Update may be used.
// TODO(kvoli,aadityasondhi): The two ways to use this histogram is a hack and
// "temporary", rationalize the interface. Tracked in #98622.
func NewManualWindowHistogram(
meta Metadata, buckets []float64, withRotate bool,
) *ManualWindowHistogram {
opts := prometheus.HistogramOpts{
Buckets: buckets,
}
cum := prometheus.NewHistogram(opts)
prev := &prometheusgo.Metric{}
if err := cum.Write(prev); err != nil {
panic(err.Error())
}

h := &ManualWindowHistogram{
Metadata: meta,
cum: cum,
windowedHistogram: nil,
Metadata: meta,
rotating: withRotate,
cum: cum,
prev: prev.GetHistogram(),
cur: nil,
}
return h
}

// ManualWindowHistogram is a prometheus-backed histogram. Internally there are
// two sets of histograms: one is the cumulative set (i.e. data is never
// evicted) which is a prometheus.Histogram and the other is the windowed set
// (which keeps only recently collected samples) which is a
// prometheusgo.Histogram. Both histograms must be updated by the client by
// calling ManualWindowHistogram.Update.
// three sets of histograms: one is the cumulative set (i.e. data is never
// evicted) which is a prometheus.Histogram, the cumulative histogram value
// when last rotated and the current histogram, which is windowed. Both the
// previous and current histograms are prometheusgo.Histograms. Both histograms
// must be updated by the client by calling either ManualWindowHistogram.Update
// or ManualWindowHistogram.RecordValue and subsequently Rotate.
type ManualWindowHistogram struct {
Metadata
syncutil.RWMutex
cum prometheus.Histogram
windowedHistogram *prometheusgo.Histogram
rotating bool
cum prometheus.Histogram
prev, cur *prometheusgo.Histogram
}

// Update replaces the cumulative and window histograms.
func (mwh *ManualWindowHistogram) Update(cum prometheus.Histogram, window *prometheusgo.Histogram) {
// Update replaces the cumulative and current windowed histograms.
func (mwh *ManualWindowHistogram) Update(cum prometheus.Histogram, cur *prometheusgo.Histogram) {
mwh.Lock()
defer mwh.Unlock()
if mwh.rotating {
panic("Unexpected call to Update with rotate enabled")
}

mwh.cum = cum
mwh.windowedHistogram = window
mwh.cur = cur
}

// RecordValue records a value to the cumulative histogram. The value is only
// added to the current window histogram once Rotate is called.
func (mwh *ManualWindowHistogram) RecordValue(val float64) {
mwh.Lock()
defer mwh.Unlock()
if !mwh.rotating {
panic("Unexpected call to RecordValue with rotate disabled")
}
mwh.cum.Observe(val)
}

// SubtractPrometheusHistograms subtracts the prev histogram from the cur
// histogram, in place modifying the cur histogram. The bucket boundaries must
// be identical for both prev and cur.
func SubtractPrometheusHistograms(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) {
prevBuckets := prev.GetBucket()
curBuckets := cur.GetBucket()

*cur.SampleCount -= prev.GetSampleCount()
*cur.SampleSum -= prev.GetSampleSum()

for idx, v := range prevBuckets {
if *curBuckets[idx].UpperBound != *v.UpperBound {
panic("Bucket Upperbounds don't match")
}
*curBuckets[idx].CumulativeCount -= *v.CumulativeCount
}
}

// Rotate sets the current windowed histogram (cur) to be the delta of the
// cumulative histogram at the last rotation (prev) and the cumulative
// histogram currently (cum).
func (mwh *ManualWindowHistogram) Rotate() error {
mwh.Lock()
defer mwh.Unlock()

if !mwh.rotating {
panic("Unexpected call to RecordValue with rotate disabled")
}

cur := &prometheusgo.Metric{}
if err := mwh.cum.Write(cur); err != nil {
return err
}

SubtractPrometheusHistograms(cur.GetHistogram(), mwh.prev)
mwh.cur = cur.GetHistogram()
prev := &prometheusgo.Metric{}

if err := mwh.cum.Write(prev); err != nil {
return err
}
mwh.prev = prev.GetHistogram()

return nil
}

// GetMetadata returns the metric's metadata including the Prometheus
Expand Down Expand Up @@ -506,14 +584,14 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric {
func (mwh *ManualWindowHistogram) TotalCountWindowed() int64 {
mwh.RLock()
defer mwh.RUnlock()
return int64(mwh.windowedHistogram.GetSampleCount())
return int64(mwh.cur.GetSampleCount())
}

// TotalSumWindowed implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) TotalSumWindowed() float64 {
mwh.RLock()
defer mwh.RUnlock()
return mwh.windowedHistogram.GetSampleSum()
return mwh.cur.GetSampleSum()
}

// ValueAtQuantileWindowed implements the WindowedHistogram interface.
Expand All @@ -523,10 +601,10 @@ func (mwh *ManualWindowHistogram) TotalSumWindowed() float64 {
func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 {
mwh.RLock()
defer mwh.RUnlock()
if mwh.windowedHistogram == nil {
if mwh.cur == nil {
return 0
}
return ValueAtQuantileWindowed(mwh.windowedHistogram, q)
return ValueAtQuantileWindowed(mwh.cur, q)
}

// A Counter holds a single mutable atomic value.
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func TestManualWindowHistogram(t *testing.T) {
h := NewManualWindowHistogram(
Metadata{},
buckets,
false, /* withRotate */
)

// should return 0 if no observations are made
Expand Down Expand Up @@ -254,6 +255,11 @@ func TestManualWindowHistogram(t *testing.T) {
t.Fatalf("expected differs from actual: %s", pretty.Diff(exp, act))
}

// Rotate and RecordValue are not supported when using Update. See comment on
// NewManualWindowHistogram.
require.Panics(t, func() { h.RecordValue(0) })
require.Panics(t, func() { _ = h.Rotate() })

require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0))
require.Equal(t, 1.0, h.ValueAtQuantileWindowed(10))
require.Equal(t, 17.5, h.ValueAtQuantileWindowed(50))
Expand Down

0 comments on commit 2b9d291

Please sign in to comment.