Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: assert on CPU in rebalance/by-load and reduce noise #105616

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 161 additions & 99 deletions pkg/cmd/roachtest/tests/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"sort"
"strings"
"time"

Expand All @@ -25,21 +23,39 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

const (
// storeToRangeFactor is the number of ranges to create per store in the
// cluster.
storeToRangeFactor = 5
// meanCPUTolerance is the tolerance applied when checking normalized (0-100)
// CPU percent utilization of stores against the mean. In multi-store tests,
// the same CPU utilization will be reported for stores on the same node. The
// acceptable range for CPU w.r.t the mean is:
//
// mean_tolerance = mean * meanCPUTolerance
// [mean - mean_tolerance, mean + mean_tolerance].
//
// The store rebalancer watches the replica CPU load and balances within
// +-10% of the mean. To reduce noise, add a buffer (5%) ontop.
meanCPUTolerance = 0.15
// statSamplePeriod is the period at which timeseries stats are sampled.
statSamplePeriod = 10 * time.Second
)

func registerRebalanceLoad(r registry.Registry) {
// This test creates a single table for kv to use and splits the table to
// have one range for every node in the cluster. Because even brand new
// clusters start with 20+ ranges in them, the number of new ranges in kv's
// table is small enough that it typically won't trigger rebalancing of
// leases in the cluster based on lease count alone. We let kv generate a lot
// of load against the ranges such that we'd expect load-based rebalancing to
// distribute the load evenly across the nodes in the cluster.
// have 5 ranges for every node in the cluster. Because even brand new
// clusters start with 40+ ranges in them, the number of new ranges in kv's
// table is small enough that it typically won't trigger significant
// rebalancing of leases in the cluster based on lease count alone. We let kv
// generate a lot of load against the ranges such that we'd expect load-based
// rebalancing to distribute the load evenly across the nodes in the cluster.
rebalanceLoadRun := func(
ctx context.Context,
t test.Test,
Expand All @@ -52,12 +68,18 @@ func registerRebalanceLoad(r registry.Registry) {
startOpts := option.DefaultStartOpts()
roachNodes := c.Range(1, c.Spec().NodeCount-1)
appNode := c.Node(c.Spec().NodeCount)
numStores := len(roachNodes)
numNodes := len(roachNodes)
numStores := numNodes
if c.Spec().SSDs > 1 && !c.Spec().RAID0 {
numStores *= c.Spec().SSDs
startOpts.RoachprodOpts.StoreCount = c.Spec().SSDs
}
splits := numStores - 1 // n-1 splits => n ranges => 1 lease per store
// We want each store to end up with approximately storeToRangeFactor
// (factor) leases such that the CPU load is evenly spread, e.g.
// (n * factor) -1 splits = factor * n ranges = factor leases per store
// Note that we only assert on the CPU of each store w.r.t the mean, not
// the lease count.
splits := (numStores * storeToRangeFactor) - 1
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5")
settings := install.MakeClusterSettings()
Expand All @@ -84,23 +106,32 @@ func registerRebalanceLoad(r registry.Registry) {
c.Put(ctx, t.DeprecatedWorkload(), "./workload", appNode)
c.Run(ctx, appNode, fmt.Sprintf("./workload init kv --drop --splits=%d {pgurl:1}", splits))

db := c.Conn(ctx, t.L(), 1)
defer db.Close()

require.NoError(t, WaitFor3XReplication(ctx, t, db))
t.Status("disable load based splitting")
require.NoError(t, disableLoadBasedSplitting(ctx, db))
t.Status(fmt.Sprintf("setting rebalance mode to %s", rebalanceMode))
_, err := db.ExecContext(ctx, `SET CLUSTER SETTING kv.allocator.load_based_rebalancing=$1::string`, rebalanceMode)
require.NoError(t, err)

var m *errgroup.Group // see comment in version.go
m, ctx = errgroup.WithContext(ctx)

// Enable us to exit out of workload early when we achieve the desired
// lease balance. This drastically shortens the duration of the test in the
// Enable us to exit out of workload early when we achieve the desired CPU
// balance. This drastically shortens the duration of the test in the
// common case.
ctx, cancel := context.WithCancel(ctx)

m.Go(func() error {
t.L().Printf("starting load generator\n")

err := c.RunE(ctx, appNode, fmt.Sprintf(
"./workload run kv --read-percent=95 --tolerate-errors --concurrency=%d "+
"--duration=%v {pgurl:1-%d}",
concurrency, maxDuration, len(roachNodes)))
if errors.Is(ctx.Err(), context.Canceled) {
// We got canceled either because lease balance was achieved or the
// We got canceled either because CPU balance was achieved or the
// other worker hit an error. In either case, it's not this worker's
// fault.
return nil
Expand All @@ -109,39 +140,39 @@ func registerRebalanceLoad(r registry.Registry) {
})

m.Go(func() error {
t.Status("checking for lease balance")

db := c.Conn(ctx, t.L(), 1)
defer db.Close()

t.Status("disable load based splitting")
if err := disableLoadBasedSplitting(ctx, db); err != nil {
return err
}
t.Status("checking for CPU balance")

if _, err := db.ExecContext(
ctx, `SET CLUSTER SETTING kv.allocator.load_based_rebalancing=$1::string`, rebalanceMode,
); err != nil {
storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores)
if err != nil {
return err
}

var reason string
var done bool
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
if done, err := isLoadEvenlyDistributed(t.L(), db, numStores); err != nil {
return err
} else if done {
t.Status("successfully achieved lease balance; waiting for kv to finish running")
cancel()
return nil
}

// Wait out the sample period initially to allow the timeseries to
// populate meaningful information for the test to query.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
case <-time.After(statSamplePeriod):
}

clusterStoresCPU, err := storeCPUFn(ctx)
if err != nil {
t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error())
}

done, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance)
t.L().Printf("cpu %s", reason)
if done {
t.Status("successfully achieved CPU balance; waiting for kv to finish running")
cancel()
return nil
}
}

return fmt.Errorf("timed out before leases were evenly spread")
return errors.Errorf("CPU not evenly balanced after timeout: %s", reason)
})
if err := m.Wait(); err != nil {
t.Fatal(err)
Expand All @@ -160,7 +191,7 @@ func registerRebalanceLoad(r registry.Registry) {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, "leases", 3*time.Minute, concurrency, false /* mixedVersion */)
rebalanceLoadRun(ctx, t, c, "leases", 5*time.Minute, concurrency, false /* mixedVersion */)
},
},
)
Expand All @@ -174,7 +205,7 @@ func registerRebalanceLoad(r registry.Registry) {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, "leases", 3*time.Minute, concurrency, true /* mixedVersion */)
rebalanceLoadRun(ctx, t, c, "leases", 5*time.Minute, concurrency, true /* mixedVersion */)
},
},
)
Expand Down Expand Up @@ -233,79 +264,110 @@ func registerRebalanceLoad(r registry.Registry) {
)
}

func isLoadEvenlyDistributed(l *logger.Logger, db *gosql.DB, numStores int) (bool, error) {
rows, err := db.Query(
`select lease_holder, count(*) ` +
`from [show ranges from table kv.kv] ` +
`group by lease_holder;`)
if err != nil {
// TODO(rafi): Remove experimental_ranges query once we stop testing 19.1 or
// earlier.
if strings.Contains(err.Error(), "syntax error at or near \"ranges\"") {
rows, err = db.Query(
`select lease_holder, count(*) ` +
`from [show experimental_ranges from table kv.kv] ` +
`group by lease_holder;`)
}
}
// makeStoreCPUFn returns a function which can be called to gather the CPU of
// the cluster stores. When there are multiple stores per node, stores on the
// same node will report identical CPU.
func makeStoreCPUFn(
octx context.Context, c cluster.Cluster, t test.Test, numNodes, numStores int,
) (func(ctx context.Context) ([]float64, error), error) {
adminURLs, err := c.ExternalAdminUIAddr(octx, t.L(), c.Node(1))
if err != nil {
return false, err
return nil, err
}
defer rows.Close()
leaseCounts := make(map[int]int)
var rangeCount int
for rows.Next() {
var storeID, leaseCount int
if err := rows.Scan(&storeID, &leaseCount); err != nil {
return false, err
url := adminURLs[0]
startTime := timeutil.Now()
tsQueries := make([]tsQuery, numNodes)
for i := range tsQueries {
tsQueries[i] = tsQuery{
name: "cr.node.sys.cpu.combined.percent-normalized",
queryType: total,
sources: []string{fmt.Sprintf("%d", i+1)},
}
leaseCounts[storeID] = leaseCount
rangeCount += leaseCount
}

if len(leaseCounts) < numStores {
l.Printf("not all stores have a lease yet: %v\n", formatLeaseCounts(leaseCounts))
return false, nil
}
return func(ctx context.Context) ([]float64, error) {
now := timeutil.Now()
resp, err := getMetricsWithSamplePeriod(
url, startTime, now, statSamplePeriod, tsQueries)
if err != nil {
return nil, err
}

// The simple case is when ranges haven't split. We can require that every
// store has one lease.
if rangeCount == numStores {
for _, leaseCount := range leaseCounts {
if leaseCount != 1 {
l.Printf("uneven lease distribution: %s\n", formatLeaseCounts(leaseCounts))
return false, nil
// Assume that stores on the same node will have sequential store IDs e.g.
// when the stores per node is 2:
// node 1 = store 1, store 2 ... node N = store 2N-1, store 2N
storesPerNode := numStores / numNodes
storeCPUs := make([]float64, numStores)
for node, result := range resp.Results {
// Take the latest CPU data point only.
cpu := result.Datapoints[len(result.Datapoints)-1].Value
nodeIdx := node * storesPerNode
for storeOffset := 0; storeOffset < storesPerNode; storeOffset++ {
// The values will be a normalized float in [0,1.0], scale to a
// percentage [0,100].
storeCPUs[nodeIdx+storeOffset] = cpu * 100
}
}
l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts))
return true, nil
}
return storeCPUs, nil
}, nil
}

// For completeness, if leases have split, verify the leases per store don't
// differ by any more than 1.
leases := make([]int, 0, numStores)
for _, leaseCount := range leaseCounts {
leases = append(leases, leaseCount)
// isLoadEvenlyDistributed checks whether the load for the stores given are
// within tolerance of the mean. If the store loads are, true is returned as
// well as reason, otherwise false. The function expects the loads to be
// indexed to store IDs, see makeStoreCPUFn for example format.
func isLoadEvenlyDistributed(loads []float64, tolerance float64) (ok bool, reason string) {
mean := arithmeticMean(loads)
// If the mean is zero, there's nothing meaningful to assert on. Return early
// that the load isn't evenly distributed.
if mean == 0 {
return false, "no load: mean=0"
}
sort.Ints(leases)
if leases[0]+1 < leases[len(leases)-1] {
l.Printf("leases per store differ by more than one: %s\n", formatLeaseCounts(leaseCounts))
return false, nil

meanTolerance := mean * tolerance
lb := mean - meanTolerance
ub := mean + meanTolerance

// Partiton the loads into above, below and within the tolerance bounds of
// the load mean.
above, below, within := []int{}, []int{}, []int{}
for i, load := range loads {
storeID := i + 1
if load > ub {
above = append(above, storeID)
} else if load < lb {
below = append(below, storeID)
} else {
within = append(within, storeID)
}
}

l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts))
return true, nil
boundsStr := fmt.Sprintf("mean=%.1f tolerance=%.1f%% (±%.1f) bounds=[%.1f, %.1f]",
mean, 100*tolerance, meanTolerance, lb, ub)
if len(below) > 0 || len(above) > 0 {
ok = false
reason = fmt.Sprintf(
"outside bounds %s\n\tbelow = %s\n\twithin = %s\n\tabove = %s\n",
boundsStr,
formatLoads(below, loads, mean),
formatLoads(within, loads, mean),
formatLoads(above, loads, mean),
)
} else {
ok = true
reason = fmt.Sprintf("within bounds %s\n\tstores=%s\n",
boundsStr, formatLoads(within, loads, mean))
}
return
}

func formatLeaseCounts(counts map[int]int) string {
storeIDs := make([]int, 0, len(counts))
for storeID := range counts {
storeIDs = append(storeIDs, storeID)
}
sort.Ints(storeIDs)
strs := make([]string, 0, len(counts))
for _, storeID := range storeIDs {
strs = append(strs, fmt.Sprintf("s%d: %d", storeID, counts[storeID]))
func formatLoads(storeIDs []int, loads []float64, mean float64) string {
fmtLoads := make([]string, len(storeIDs))
for i, storeID := range storeIDs {
load := loads[storeID-1]
fmtLoads[i] = fmt.Sprintf("s%d: %d (%+3.1f%%)",
storeID, int(load), (load-mean)/mean*100,
)
}
return fmt.Sprintf("[%s]", strings.Join(strs, ", "))
return fmt.Sprintf("[%s]", strings.Join(fmtLoads, ", "))
}
10 changes: 10 additions & 0 deletions pkg/cmd/roachtest/tests/rebalance_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ func coefficientOfVariation(vals []float64) float64 {
return stdev / mean
}

// arithmeticMean is an average measure where avg = sum / count. This fn is
// included to return 0 instead of NaN like the stats pkg used underneath.
func arithmeticMean(vals []float64) float64 {
if len(vals) == 0 {
return 0
}
mean, _ := stats.Mean(vals)
return mean
}

func scale(vals []float64, scale float64) []float64 {
scaled := make([]float64, len(vals))
for i := range vals {
Expand Down
Loading