Skip to content

Commit

Permalink
roachtest: assert on CPU in rebalance/by-load
Browse files Browse the repository at this point in the history
The `rebalance/by-load/replicas` roachtests periodically flake due to a
known limitation, where a cluster with a small number of ranges may not
be properly balanced due to heterogeneous localities (including
multi-store) #88829.

This commit updates the total number of ranges from 1 per-store, to 5
per-store for `rebalance/by-load/*` roachtests.

The roachtest asserted on the lease count, as a proxy for load, assuming
that the load evenly hits each lease for the created ranges. However,
the principal indicator of load in a read heavy workload is CPU.

This commit updates the test assertion to require that every store's CPU
is within 10% of the cluster mean. The test assertion previously
required that the max-min lease count delta was 0, when no outside
splits occurred; or 1 when the number of ranges was greater than the
number of stores.

The logging format is updated for easier debugging:

```
cpu outside bounds mean=23.9 tolerance=10.0% (±2.4) bounds=[21.5, 26.3]
   below  = []
   within = [s1: 22 (-6.7%), s2: 22 (-6.3%)]
   above  = [s3: 26 (+13.0%)]
...
cpu within bounds mean=25.7 tolerance=10.0% (±2.6) bounds=[23.2, 28.3]
   stores=[s1: 24 (-3.0%), s2: 24 (-2.9%), s3: 27 (+5.9%)]
```

resolves: #102801
resolves: #102823

Release note: None
  • Loading branch information
kvoli committed Jun 27, 2023
1 parent 20e2332 commit 8661eed
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 85 deletions.
234 changes: 149 additions & 85 deletions pkg/cmd/roachtest/tests/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"sort"
"strings"
"time"

Expand All @@ -25,21 +23,36 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

const (
// storeToRangeFactor is the number of ranges to create per store in the
// cluster.
storeToRangeFactor = 5
// meanCPUTolerance is the tolerance applied when checking normalized (0-100)
// CPU percent utilization of stores against the mean. In multi-store tests,
// the same CPU utilization will be reported for stores on the same node. The
// acceptable range for CPU w.r.t the mean is:
//
// mean_tolerance = mean * meanCPUTolerance
// [mean - mean_tolerance, mean + mean_tolerance].
meanCPUTolerance = 0.1
// statSamplePeriod is the period at which timeseries stats are sampled.
statSamplePeriod = 10 * time.Second
)

func registerRebalanceLoad(r registry.Registry) {
// This test creates a single table for kv to use and splits the table to
// have one range for every node in the cluster. Because even brand new
// clusters start with 20+ ranges in them, the number of new ranges in kv's
// table is small enough that it typically won't trigger rebalancing of
// leases in the cluster based on lease count alone. We let kv generate a lot
// of load against the ranges such that we'd expect load-based rebalancing to
// distribute the load evenly across the nodes in the cluster.
// have 5 ranges for every node in the cluster. Because even brand new
// clusters start with 40+ ranges in them, the number of new ranges in kv's
// table is small enough that it typically won't trigger significant
// rebalancing of leases in the cluster based on lease count alone. We let kv
// generate a lot of load against the ranges such that we'd expect load-based
// rebalancing to distribute the load evenly across the nodes in the cluster.
rebalanceLoadRun := func(
ctx context.Context,
t test.Test,
Expand All @@ -52,12 +65,18 @@ func registerRebalanceLoad(r registry.Registry) {
startOpts := option.DefaultStartOpts()
roachNodes := c.Range(1, c.Spec().NodeCount-1)
appNode := c.Node(c.Spec().NodeCount)
numStores := len(roachNodes)
numNodes := len(roachNodes)
numStores := numNodes
if c.Spec().SSDs > 1 && !c.Spec().RAID0 {
numStores *= c.Spec().SSDs
startOpts.RoachprodOpts.StoreCount = c.Spec().SSDs
}
splits := numStores - 1 // n-1 splits => n ranges => 1 lease per store
// We want each store to end up with approximately storeToRangeFactor
// (factor) leases such that the CPU load is evenly spread, e.g.
// (n * factor) -1 splits = factor * n ranges = factor leases per store
// Note that we only assert on the CPU of each store w.r.t the mean, not
// the lease count.
splits := (numStores * storeToRangeFactor) - 1
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5")
settings := install.MakeClusterSettings()
Expand Down Expand Up @@ -87,8 +106,8 @@ func registerRebalanceLoad(r registry.Registry) {
var m *errgroup.Group // see comment in version.go
m, ctx = errgroup.WithContext(ctx)

// Enable us to exit out of workload early when we achieve the desired
// lease balance. This drastically shortens the duration of the test in the
// Enable us to exit out of workload early when we achieve the desired CPU
// balance. This drastically shortens the duration of the test in the
// common case.
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -100,7 +119,7 @@ func registerRebalanceLoad(r registry.Registry) {
"--duration=%v {pgurl:1-%d}",
concurrency, maxDuration, len(roachNodes)))
if errors.Is(ctx.Err(), context.Canceled) {
// We got canceled either because lease balance was achieved or the
// We got canceled either because CPU balance was achieved or the
// other worker hit an error. In either case, it's not this worker's
// fault.
return nil
Expand All @@ -109,7 +128,7 @@ func registerRebalanceLoad(r registry.Registry) {
})

m.Go(func() error {
t.Status("checking for lease balance")
t.Status("checking for CPU balance")

db := c.Conn(ctx, t.L(), 1)
defer db.Close()
Expand All @@ -125,23 +144,37 @@ func registerRebalanceLoad(r registry.Registry) {
return err
}

for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
if done, err := isLoadEvenlyDistributed(t.L(), db, numStores); err != nil {
return err
} else if done {
t.Status("successfully achieved lease balance; waiting for kv to finish running")
cancel()
return nil
}
storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores)
if err != nil {
return err
}

var reason string
var done bool
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
// Wait out the sample period initially to allow the timeseries to
// populate meaningful information for the test to query.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
case <-time.After(statSamplePeriod):
}

clusterStoresCPU, err := storeCPUFn(ctx)
if err != nil {
t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error())
}

done, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance)
t.L().Printf("cpu %s", reason)
if done {
t.Status("successfully achieved CPU balance; waiting for kv to finish running")
cancel()
return nil
}
}

return fmt.Errorf("timed out before leases were evenly spread")
return errors.Errorf("CPU not evenly balanced after timeout: %s", reason)
})
if err := m.Wait(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -233,79 +266,110 @@ func registerRebalanceLoad(r registry.Registry) {
)
}

func isLoadEvenlyDistributed(l *logger.Logger, db *gosql.DB, numStores int) (bool, error) {
rows, err := db.Query(
`select lease_holder, count(*) ` +
`from [show ranges from table kv.kv] ` +
`group by lease_holder;`)
if err != nil {
// TODO(rafi): Remove experimental_ranges query once we stop testing 19.1 or
// earlier.
if strings.Contains(err.Error(), "syntax error at or near \"ranges\"") {
rows, err = db.Query(
`select lease_holder, count(*) ` +
`from [show experimental_ranges from table kv.kv] ` +
`group by lease_holder;`)
}
}
// makeStoreCPUFn returns a function which can be called to gather the CPU of
// the cluster stores. When there are multiple stores per node, stores on the
// same node will report identical CPU.
func makeStoreCPUFn(
octx context.Context, c cluster.Cluster, t test.Test, numNodes, numStores int,
) (func(ctx context.Context) ([]float64, error), error) {
adminURLs, err := c.ExternalAdminUIAddr(octx, t.L(), c.Node(1))
if err != nil {
return false, err
return nil, err
}
defer rows.Close()
leaseCounts := make(map[int]int)
var rangeCount int
for rows.Next() {
var storeID, leaseCount int
if err := rows.Scan(&storeID, &leaseCount); err != nil {
return false, err
url := adminURLs[0]
startTime := timeutil.Now()
tsQueries := make([]tsQuery, numNodes)
for i := range tsQueries {
tsQueries[i] = tsQuery{
name: "cr.node.sys.cpu.combined.percent-normalized",
queryType: total,
sources: []string{fmt.Sprintf("%d", i+1)},
}
leaseCounts[storeID] = leaseCount
rangeCount += leaseCount
}

if len(leaseCounts) < numStores {
l.Printf("not all stores have a lease yet: %v\n", formatLeaseCounts(leaseCounts))
return false, nil
}
return func(ctx context.Context) ([]float64, error) {
now := timeutil.Now()
resp, err := getMetricsWithSamplePeriod(
url, startTime, now, statSamplePeriod, tsQueries)
if err != nil {
return nil, err
}

// The simple case is when ranges haven't split. We can require that every
// store has one lease.
if rangeCount == numStores {
for _, leaseCount := range leaseCounts {
if leaseCount != 1 {
l.Printf("uneven lease distribution: %s\n", formatLeaseCounts(leaseCounts))
return false, nil
// Assume that stores on the same node will have sequential store IDs e.g.
// when the stores per node is 2:
// node 1 = store 1, store 2 ... node N = store 2N-1, store 2N
storesPerNode := numStores / numNodes
storeCPUs := make([]float64, numStores)
for node, result := range resp.Results {
// Take the latest CPU data point only.
cpu := result.Datapoints[len(result.Datapoints)-1].Value
nodeIdx := node * storesPerNode
for storeOffset := 0; storeOffset < storesPerNode; storeOffset++ {
// The values will be a normalized float in [0,1.0], scale to a
// percentage [0,100].
storeCPUs[nodeIdx+storeOffset] = cpu * 100
}
}
l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts))
return true, nil
}
return storeCPUs, nil
}, nil
}

// For completeness, if leases have split, verify the leases per store don't
// differ by any more than 1.
leases := make([]int, 0, numStores)
for _, leaseCount := range leaseCounts {
leases = append(leases, leaseCount)
// isLoadEvenlyDistributed checks whether the load for the stores given are
// within tolerance of the mean. If the store loads are, true is returned as
// well as reason, otherwise false. The function expects the loads to be
// indexed to store IDs, see makeStoreCPUFn for example format.
func isLoadEvenlyDistributed(loads []float64, tolerance float64) (ok bool, reason string) {
mean := arithmeticMean(loads)
// If the mean is zero, there's nothing meaningful to assert on. Return early
// that the load isn't evenly distributed.
if mean == 0 {
return false, "no load: mean=0"
}
sort.Ints(leases)
if leases[0]+1 < leases[len(leases)-1] {
l.Printf("leases per store differ by more than one: %s\n", formatLeaseCounts(leaseCounts))
return false, nil

meanTolerance := mean * tolerance
lb := mean - meanTolerance
ub := mean + meanTolerance

// Partiton the loads into above, below and within the tolerance bounds of
// the load mean.
above, below, within := []int{}, []int{}, []int{}
for i, load := range loads {
storeID := i + 1
if load > ub {
above = append(above, storeID)
} else if load < lb {
below = append(below, storeID)
} else {
within = append(within, storeID)
}
}

l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts))
return true, nil
boundsStr := fmt.Sprintf("mean=%.1f tolerance=%.1f%% (±%.1f) bounds=[%.1f, %.1f]",
mean, 100*tolerance, meanTolerance, lb, ub)
if len(below) > 0 || len(above) > 0 {
ok = false
reason = fmt.Sprintf(
"outside bounds %s\n\tbelow = %s\n\twithin = %s\n\tabove = %s\n",
boundsStr,
formatLoads(below, loads, mean),
formatLoads(within, loads, mean),
formatLoads(above, loads, mean),
)
} else {
ok = true
reason = fmt.Sprintf("within bounds %s\n\tstores=%s\n",
boundsStr, formatLoads(within, loads, mean))
}
return
}

func formatLeaseCounts(counts map[int]int) string {
storeIDs := make([]int, 0, len(counts))
for storeID := range counts {
storeIDs = append(storeIDs, storeID)
}
sort.Ints(storeIDs)
strs := make([]string, 0, len(counts))
for _, storeID := range storeIDs {
strs = append(strs, fmt.Sprintf("s%d: %d", storeID, counts[storeID]))
func formatLoads(storeIDs []int, loads []float64, mean float64) string {
fmtLoads := make([]string, len(storeIDs))
for i, storeID := range storeIDs {
load := loads[storeID-1]
fmtLoads[i] = fmt.Sprintf("s%d: %d (%+3.1f%%)",
storeID, int(load), (load-mean)/mean*100,
)
}
return fmt.Sprintf("[%s]", strings.Join(strs, ", "))
return fmt.Sprintf("[%s]", strings.Join(fmtLoads, ", "))
}
10 changes: 10 additions & 0 deletions pkg/cmd/roachtest/tests/rebalance_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ func coefficientOfVariation(vals []float64) float64 {
return stdev / mean
}

// arithmeticMean is an average measure where avg = sum / count. This fn is
// included to return 0 instead of NaN like the stats pkg used underneath.
func arithmeticMean(vals []float64) float64 {
if len(vals) == 0 {
return 0
}
mean, _ := stats.Mean(vals)
return mean
}

func scale(vals []float64, scale float64) []float64 {
scaled := make([]float64, len(vals))
for i := range vals {
Expand Down

0 comments on commit 8661eed

Please sign in to comment.