Skip to content

Commit

Permalink
schedulerlatency: export Go scheduling latency metric
Browse files Browse the repository at this point in the history
And record data into CRDB's internal time-series database. Informs
\#82743 and #87823. To export scheduling latencies to prometheus, we
choose an exponential bucketing scheme with base multiple of 1.1, and
the output range bounded to [50us, 100ms). This makes for ~70 buckets.
It's worth noting that the default histogram buckets used in Go are
not fit for our purposes. If we care about improving it, we could
consider patching the runtime.

  bucket[  0] width=0s boundary=[-Inf, 0s)
  bucket[  1] width=1ns boundary=[0s, 1ns)
  bucket[  2] width=1ns boundary=[1ns, 2ns)
  bucket[  3] width=1ns boundary=[2ns, 3ns)
  bucket[  4] width=1ns boundary=[3ns, 4ns)
  ...
  bucket[270] width=16.384µs boundary=[737.28µs, 753.664µs)
  bucket[271] width=16.384µs boundary=[753.664µs, 770.048µs)
  bucket[272] width=278.528µs boundary=[770.048µs, 1.048576ms)
  bucket[273] width=32.768µs boundary=[1.048576ms, 1.081344ms)
  bucket[274] width=32.768µs boundary=[1.081344ms, 1.114112ms)
  ...
  bucket[717] width=1h13m18.046511104s boundary=[53h45m14.046488576s, 54h58m32.09299968s)
  bucket[718] width=1h13m18.046511104s boundary=[54h58m32.09299968s, 56h11m50.139510784s)
  bucket[719] width=1h13m18.046511104s boundary=[56h11m50.139510784s, 57h25m8.186021888s)
  bucket[720] width=57h25m8.186021888s boundary=[57h25m8.186021888s, +Inf)

Release note: None
  • Loading branch information
irfansharif committed Sep 12, 2022
1 parent a441ed7 commit 11a8072
Show file tree
Hide file tree
Showing 12 changed files with 1,686 additions and 87 deletions.
6 changes: 4 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,9 @@ func (s *Server) PreStart(ctx context.Context) error {
}
})

if err := schedulerlatency.StartSampler(ctx, s.st, s.stopper); err != nil {
if err := schedulerlatency.StartSampler(
ctx, s.st, s.stopper, s.registry, base.DefaultMetricsSampleInterval,
); err != nil {
return err
}

Expand Down Expand Up @@ -1469,7 +1471,7 @@ func (s *Server) PreStart(ctx context.Context) error {
})

// We can now add the node registry.
s.recorder.AddNode(
s.recorder.AddNode( // XXX: Has to occur before
s.registry,
s.node.Descriptor,
s.node.startedAt,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ type registryRecorder struct {

func extractValue(name string, mtr interface{}, fn func(string, float64)) error {
switch mtr := mtr.(type) {
case *metric.Histogram:
case metric.WindowedHistogram:
n := float64(mtr.TotalCountWindowed())
fn(name+"-count", n)
avg := mtr.TotalSumWindowed() / n
Expand Down
6 changes: 6 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3515,6 +3515,12 @@ var charts = []sectionDescription{
"admission.scheduler_latency_listener.p99_nanos",
},
},
{
Title: "Scheduler Latency",
Metrics: []string{
"go.scheduler_latency",
},
},
{
Title: "Elastic CPU Durations",
Metrics: []string{
Expand Down
4 changes: 0 additions & 4 deletions pkg/util/admission/scheduler_latency_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@ import (

var _ metric.Struct = &schedulerLatencyListenerMetrics{}

// TODO(irfansharif): There’s some discrepancy between what this struct observes
// as p99 scheduling latencies and what prometheus/client_golang computes. Worth
// investigating.

type schedulerLatencyListener struct {
ctx context.Context
elasticCPULimiter elasticCPULimiter
Expand Down
152 changes: 93 additions & 59 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
prometheusgo "github.com/prometheus/client_model/go"
metrics "github.com/rcrowley/go-metrics"
"github.com/rcrowley/go-metrics"
)

const (
Expand Down Expand Up @@ -68,8 +68,8 @@ type PrometheusExportable interface {
}

// PrometheusIterable is an extension of PrometheusExportable to indicate that
// this metric is comprised of children metrics which augment the parent's
// label values.
// this metric comprises children metrics which augment the parent's label
// values.
//
// The motivating use-case for this interface is the existence of tenants. We'd
// like to capture per-tenant metrics and expose them to prometheus while not
Expand All @@ -82,6 +82,22 @@ type PrometheusIterable interface {
Each([]*prometheusgo.LabelPair, func(metric *prometheusgo.Metric))
}

// WindowedHistogram represents a histogram with data over recent window of
// time. It's used primarily to record histogram data into CRDB's internal
// time-series database, which does not know how to encode cumulative
// histograms. What it does instead is scrape off sample count, sum of values,
// and values at specific quantiles from "windowed" histograms and record that
// data directly. These windows could be arbitrary and overlapping.
type WindowedHistogram interface {
// TotalCountWindowed returns the number of samples in the current window.
TotalCountWindowed() int64
// TotalSumWindowed returns the number of samples in the current window.
TotalSumWindowed() float64
// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the windowed histogram.
ValueAtQuantileWindowed(q float64) float64
}

// GetName returns the metric's name.
func (m *Metadata) GetName() string {
return m.Name
Expand Down Expand Up @@ -160,10 +176,6 @@ func maybeTick(m periodic) {
}
}

// TODO(irfansharif): Figure out how to export runtime scheduler latencies as a
// prometheus histogram? Can we use a functional histogram? And maintain deltas
// underneath? What does prometheus/client_golang do?

// NewHistogram is a prometheus-backed histogram. Depending on the value of
// opts.Buckets, this is suitable for recording any kind of quantity. Common
// sensible choices are {IO,Network}LatencyBuckets.
Expand Down Expand Up @@ -192,6 +204,7 @@ func NewHistogram(meta Metadata, windowDuration time.Duration, buckets []float64

var _ periodic = (*Histogram)(nil)
var _ PrometheusExportable = (*Histogram)(nil)
var _ WindowedHistogram = (*Histogram)(nil)

// Histogram is a prometheus-backed histogram. It collects observed values by
// keeping bucketed counts. For convenience, internally two sets of buckets are
Expand Down Expand Up @@ -296,7 +309,7 @@ func (h *Histogram) TotalCount() int64 {
return int64(h.ToPrometheusMetric().Histogram.GetSampleCount())
}

// TotalCountWindowed returns the number of samples in the current window.
// TotalCountWindowed implements the WindowedHistogram interface.
func (h *Histogram) TotalCountWindowed() int64 {
return int64(h.ToPrometheusMetricWindowed().Histogram.GetSampleCount())
}
Expand All @@ -306,7 +319,7 @@ func (h *Histogram) TotalSum() float64 {
return h.ToPrometheusMetric().Histogram.GetSampleSum()
}

// TotalSumWindowed returns the number of samples in the current window.
// TotalSumWindowed implements the WindowedHistogram interface.
func (h *Histogram) TotalSumWindowed() float64 {
return h.ToPrometheusMetricWindowed().Histogram.GetSampleSum()
}
Expand All @@ -316,10 +329,9 @@ func (h *Histogram) Mean() float64 {
return h.TotalSum() / float64(h.TotalCount())
}

// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the windowed histogram.
// ValueAtQuantileWindowed implements the WindowedHistogram interface.
//
// https://github.com/prometheus/prometheus/blob/d91621890a2ccb3191a6d74812cc1827dd4093bf/promql/quantile.go#L75
// https://github.com/prometheus/prometheus/blob/d9162189/promql/quantile.go#L75
// This function is mostly taken from a prometheus internal function that
// does the same thing. There are a few differences for our use case:
// 1. As a user of the prometheus go client library, we don't have access
Expand All @@ -328,40 +340,7 @@ func (h *Histogram) Mean() float64 {
// 2. Since the prometheus client library ensures buckets are in a strictly
// increasing order at creation, we do not sort them.
func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 {
m := h.ToPrometheusMetricWindowed()

buckets := m.Histogram.Bucket
n := float64(*m.Histogram.SampleCount)
if n == 0 {
return 0
}

rank := uint64(((q / 100) * n) + 0.5)
b := sort.Search(len(buckets)-1, func(i int) bool { return *buckets[i].CumulativeCount >= rank })

var (
bucketStart float64
bucketEnd = *buckets[b].UpperBound
count = *buckets[b].CumulativeCount
)

// Calculate the linearly interpolated value within the bucket
if b > 0 {
bucketStart = *buckets[b-1].UpperBound
count -= *buckets[b-1].CumulativeCount
rank -= *buckets[b-1].CumulativeCount
}
val := bucketStart + (bucketEnd-bucketStart)*(float64(rank)/float64(count))
if math.IsNaN(val) || math.IsInf(val, -1) {
return 0
}

// should not extrapolate past the upper bound of the largest bucket
if val > *buckets[len(buckets)-1].UpperBound {
return *buckets[len(buckets)-1].UpperBound
}

return val
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q)
}

// A Counter holds a single mutable atomic value.
Expand Down Expand Up @@ -493,20 +472,11 @@ func (g *Gauge) GetMetadata() Metadata {
type GaugeFloat64 struct {
Metadata
bits *uint64
fn func() float64
}

// NewGaugeFloat64 creates a GaugeFloat64.
func NewGaugeFloat64(metadata Metadata) *GaugeFloat64 {
return &GaugeFloat64{metadata, new(uint64), nil}
}

// NewFunctionalGaugeFloat64 creates a GaugeFloat64 metric whose value is
// determined when asked for by calling the provided function.
// Note that Update, Inc, and Dec should NOT be called on a Gauge returned
// from NewFunctionalGaugeFloat64.
func NewFunctionalGaugeFloat64(metadata Metadata, f func() float64) *GaugeFloat64 {
return &GaugeFloat64{metadata, nil, f}
return &GaugeFloat64{metadata, new(uint64)}
}

// Snapshot returns a read-only copy of the gauge.
Expand All @@ -521,9 +491,6 @@ func (g *GaugeFloat64) Update(v float64) {

// Value returns the gauge's current value.
func (g *GaugeFloat64) Value() float64 {
if g.fn != nil {
return g.fn()
}
return math.Float64frombits(atomic.LoadUint64(g.bits))
}

Expand Down Expand Up @@ -570,3 +537,70 @@ func (g *GaugeFloat64) GetMetadata() Metadata {
baseMetadata.MetricType = prometheusgo.MetricType_GAUGE
return baseMetadata
}

// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the given histogram.
func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 {
buckets := histogram.Bucket
n := float64(*histogram.SampleCount)
if n == 0 {
return 0
}

// NB: The 0.5 is added for rounding purposes; it helps in cases where
// SampleCount is small.
rank := uint64(((q / 100) * n) + 0.5)

// Since we are missing the +Inf bucket, CumulativeCounts may never exceed
// rank. By omitting the highest bucket we have from the search, the failed
// search will land on that last bucket and we don't have to do any special
// checks regarding landing on a non-existent bucket.
b := sort.Search(len(buckets)-1, func(i int) bool { return *buckets[i].CumulativeCount >= rank })

var (
bucketStart float64 // defaults to 0, which we assume is the lower bound of the smallest bucket
bucketEnd = *buckets[b].UpperBound
count = *buckets[b].CumulativeCount
)

// Calculate the linearly interpolated value within the bucket.
if b > 0 {
bucketStart = *buckets[b-1].UpperBound
count -= *buckets[b-1].CumulativeCount
rank -= *buckets[b-1].CumulativeCount
}
val := bucketStart + (bucketEnd-bucketStart)*(float64(rank)/float64(count))
if math.IsNaN(val) || math.IsInf(val, -1) {
return 0
}

// Should not extrapolate past the upper bound of the largest bucket.
//
// NB: SampleCount includes the implicit +Inf bucket but the
// buckets[len(buckets)-1].UpperBound refers to the largest bucket defined
// by us -- the client library doesn't give us access to the +Inf bucket
// which Prometheus uses under the hood. With a high enough quantile, the
// val computed further below surpasses the upper bound of the largest
// bucket. Using that interpolated value feels wrong since we'd be
// extrapolating. Also, for specific metrics if we see our q99 values to be
// hitting the top-most bucket boundary, that's an indication for us to
// choose better buckets for more accuracy. It's also worth noting that the
// prometheus client library does the same thing when the resulting value is
// in the +Inf bucket, whereby they return the upper bound of the second
// last bucket -- see [1].
//
// [1]: https://github.com/prometheus/prometheus/blob/d9162189/promql/quantile.go#L103.
// the buckets to provide a more accurate histogram. FWIW the Prometheus client
// library does the same when the resulting value is in the +Inf bucket and
// returns the upper bound of the second last bucket:
// It is cleaner/easier for them since they have access to the +Inf bucket
// internally.
// The 0.5 was added for rounding purposes. I went back and forth on whether to
// have it at all but thought it made sense for smaller SampleCount cases like
// in this test:
if val > *buckets[len(buckets)-1].UpperBound {
return *buckets[len(buckets)-1].UpperBound
}

return val
}
12 changes: 11 additions & 1 deletion pkg/util/schedulerlatency/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,40 @@ go_library(
name = "schedulerlatency",
srcs = [
"callbacks.go",
"histogram.go",
"sampler.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/util/schedulerlatency",
visibility = ["//visibility:public"],
deps = [
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/metric",
"//pkg/util/ring",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_gogo_protobuf//proto",
"@com_github_prometheus_client_model//go",
],
)

go_test(
name = "schedulerlatency_test",
srcs = ["scheduler_latency_test.go"],
srcs = [
"histogram_test.go",
"scheduler_latency_test.go",
],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
embed = [":schedulerlatency"],
deps = [
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/skip",
"//pkg/util/metric",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
)
Expand Down
Loading

0 comments on commit 11a8072

Please sign in to comment.