Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: assert on CPU in rebalance/by-load #102824

Merged
merged 3 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 153 additions & 86 deletions pkg/cmd/roachtest/tests/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"runtime"
"sort"
"strings"
"time"

Expand All @@ -27,22 +25,37 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

const (
// storeToRangeFactor is the number of ranges to create per store in the
// cluster.
storeToRangeFactor = 5
// meanCPUTolerance is the tolerance applied when checking normalized (0-100)
// CPU percent utilization of stores against the mean. In multi-store tests,
// the same CPU utilization will be reported for stores on the same node. The
// acceptable range for CPU w.r.t the mean is:
//
// mean_tolerance = mean * meanCPUTolerance
// [mean - mean_tolerance, mean + mean_tolerance].
meanCPUTolerance = 0.1
// statSamplePeriod is the period at which timeseries stats are sampled.
statSamplePeriod = 10 * time.Second
)

func registerRebalanceLoad(r registry.Registry) {
// This test creates a single table for kv to use and splits the table to
// have one range for every node in the cluster. Because even brand new
// clusters start with 20+ ranges in them, the number of new ranges in kv's
// table is small enough that it typically won't trigger rebalancing of
// leases in the cluster based on lease count alone. We let kv generate a lot
// of load against the ranges such that we'd expect load-based rebalancing to
// distribute the load evenly across the nodes in the cluster.
// have 5 ranges for every node in the cluster. Because even brand new
// clusters start with 40+ ranges in them, the number of new ranges in kv's
// table is small enough that it typically won't trigger significant
// rebalancing of leases in the cluster based on lease count alone. We let kv
// generate a lot of load against the ranges such that we'd expect load-based
// rebalancing to distribute the load evenly across the nodes in the cluster.
rebalanceLoadRun := func(
ctx context.Context,
t test.Test,
Expand All @@ -52,15 +65,24 @@ func registerRebalanceLoad(r registry.Registry) {
concurrency int,
mixedVersion bool,
) {
startOpts := option.DefaultStartOpts()
// This test asserts on the distribution of CPU utilization between nodes
// in the cluster, having backups also running could lead to unrelated
// flakes - disable backup schedule.
startOpts := option.DefaultStartOptsNoBackups()
roachNodes := c.Range(1, c.Spec().NodeCount-1)
appNode := c.Node(c.Spec().NodeCount)
numStores := len(roachNodes)
numNodes := len(roachNodes)
numStores := numNodes
if c.Spec().SSDs > 1 && !c.Spec().RAID0 {
numStores *= c.Spec().SSDs
startOpts.RoachprodOpts.StoreCount = c.Spec().SSDs
}
splits := numStores - 1 // n-1 splits => n ranges => 1 lease per store
// We want each store to end up with approximately storeToRangeFactor
// (factor) leases such that the CPU load is evenly spread, e.g.
// (n * factor) -1 splits = factor * n ranges = factor leases per store
// Note that we only assert on the CPU of each store w.r.t the mean, not
// the lease count.
splits := (numStores * storeToRangeFactor) - 1
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5")
settings := install.MakeClusterSettings()
Expand All @@ -87,8 +109,8 @@ func registerRebalanceLoad(r registry.Registry) {
var m *errgroup.Group // see comment in version.go
m, ctx = errgroup.WithContext(ctx)

// Enable us to exit out of workload early when we achieve the desired
// lease balance. This drastically shortens the duration of the test in the
// Enable us to exit out of workload early when we achieve the desired CPU
// balance. This drastically shortens the duration of the test in the
// common case.
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -100,7 +122,7 @@ func registerRebalanceLoad(r registry.Registry) {
"--duration=%v {pgurl:1-%d}",
concurrency, maxDuration, len(roachNodes)))
if errors.Is(ctx.Err(), context.Canceled) {
// We got canceled either because lease balance was achieved or the
// We got canceled either because CPU balance was achieved or the
// other worker hit an error. In either case, it's not this worker's
// fault.
return nil
Expand All @@ -109,7 +131,7 @@ func registerRebalanceLoad(r registry.Registry) {
})

m.Go(func() error {
t.Status("checking for lease balance")
t.Status("checking for CPU balance")

db := c.Conn(ctx, t.L(), 1)
defer db.Close()
Expand All @@ -125,23 +147,37 @@ func registerRebalanceLoad(r registry.Registry) {
return err
}

for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
if done, err := isLoadEvenlyDistributed(t.L(), db, numStores); err != nil {
return err
} else if done {
t.Status("successfully achieved lease balance; waiting for kv to finish running")
cancel()
return nil
}
storeCPUFn, err := makeStoreCPUFn(ctx, c, t, numNodes, numStores)
if err != nil {
return err
}

var reason string
var done bool
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
// Wait out the sample period initially to allow the timeseries to
// populate meaningful information for the test to query.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
case <-time.After(statSamplePeriod):
}

clusterStoresCPU, err := storeCPUFn(ctx)
if err != nil {
t.L().Printf("unable to get the cluster stores CPU %s\n", err.Error())
}

done, reason = isLoadEvenlyDistributed(clusterStoresCPU, meanCPUTolerance)
t.L().Printf("cpu %s", reason)
if done {
t.Status("successfully achieved CPU balance; waiting for kv to finish running")
cancel()
return nil
}
}

return fmt.Errorf("timed out before leases were evenly spread")
return errors.Errorf("CPU not evenly balanced after timeout: %s", reason)
})
if err := m.Wait(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -236,79 +272,110 @@ func registerRebalanceLoad(r registry.Registry) {
)
}

func isLoadEvenlyDistributed(l *logger.Logger, db *gosql.DB, numStores int) (bool, error) {
rows, err := db.Query(
`SELECT lease_holder, count(*) ` +
`FROM [SHOW RANGES FROM TABLE kv.kv WITH DETAILS] ` +
`GROUP BY lease_holder;`)
if err != nil {
// TODO(rafi): Remove experimental_ranges query once we stop testing 19.1 or
// earlier.
if strings.Contains(err.Error(), "syntax error at or near \"ranges\"") {
rows, err = db.Query(
`SELECT lease_holder, count(*) ` +
`FROM [SHOW RANGES FROM TABLE kv.kv WITH DETAILS] ` +
`GROUP BY lease_holder;`)
}
}
// makeStoreCPUFn returns a function which can be called to gather the CPU of
// the cluster stores. When there are multiple stores per node, stores on the
// same node will report identical CPU.
func makeStoreCPUFn(
octx context.Context, c cluster.Cluster, t test.Test, numNodes, numStores int,
) (func(ctx context.Context) ([]float64, error), error) {
adminURLs, err := c.ExternalAdminUIAddr(octx, t.L(), c.Node(1))
if err != nil {
return false, err
return nil, err
}
defer rows.Close()
leaseCounts := make(map[int]int)
var rangeCount int
for rows.Next() {
var storeID, leaseCount int
if err := rows.Scan(&storeID, &leaseCount); err != nil {
return false, err
url := adminURLs[0]
startTime := timeutil.Now()
tsQueries := make([]tsQuery, numNodes)
for i := range tsQueries {
tsQueries[i] = tsQuery{
name: "cr.node.sys.cpu.combined.percent-normalized",
queryType: total,
sources: []string{fmt.Sprintf("%d", i+1)},
}
leaseCounts[storeID] = leaseCount
rangeCount += leaseCount
}

if len(leaseCounts) < numStores {
l.Printf("not all stores have a lease yet: %v\n", formatLeaseCounts(leaseCounts))
return false, nil
}
return func(ctx context.Context) ([]float64, error) {
now := timeutil.Now()
resp, err := getMetricsWithSamplePeriod(
url, startTime, now, statSamplePeriod, tsQueries)
if err != nil {
return nil, err
}

// The simple case is when ranges haven't split. We can require that every
// store has one lease.
if rangeCount == numStores {
for _, leaseCount := range leaseCounts {
if leaseCount != 1 {
l.Printf("uneven lease distribution: %s\n", formatLeaseCounts(leaseCounts))
return false, nil
// Assume that stores on the same node will have sequential store IDs e.g.
// when the stores per node is 2:
// node 1 = store 1, store 2 ... node N = store 2N-1, store 2N
storesPerNode := numStores / numNodes
storeCPUs := make([]float64, numStores)
for node, result := range resp.Results {
// Take the latest CPU data point only.
cpu := result.Datapoints[len(result.Datapoints)-1].Value
nodeIdx := node * storesPerNode
for storeOffset := 0; storeOffset < storesPerNode; storeOffset++ {
// The values will be a normalized float in [0,1.0], scale to a
// percentage [0,100].
storeCPUs[nodeIdx+storeOffset] = cpu * 100
}
}
l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts))
return true, nil
}
return storeCPUs, nil
}, nil
}

// For completeness, if leases have split, verify the leases per store don't
// differ by any more than 1.
leases := make([]int, 0, numStores)
for _, leaseCount := range leaseCounts {
leases = append(leases, leaseCount)
// isLoadEvenlyDistributed checks whether the load for the stores given are
// within tolerance of the mean. If the store loads are, true is returned as
// well as reason, otherwise false. The function expects the loads to be
// indexed to store IDs, see makeStoreCPUFn for example format.
func isLoadEvenlyDistributed(loads []float64, tolerance float64) (ok bool, reason string) {
mean := arithmeticMean(loads)
// If the mean is zero, there's nothing meaningful to assert on. Return early
// that the load isn't evenly distributed.
if mean == 0 {
return false, "no load: mean=0"
}
sort.Ints(leases)
if leases[0]+1 < leases[len(leases)-1] {
l.Printf("leases per store differ by more than one: %s\n", formatLeaseCounts(leaseCounts))
return false, nil

meanTolerance := mean * tolerance
lb := mean - meanTolerance
ub := mean + meanTolerance

// Partiton the loads into above, below and within the tolerance bounds of
// the load mean.
above, below, within := []int{}, []int{}, []int{}
for i, load := range loads {
storeID := i + 1
if load > ub {
above = append(above, storeID)
} else if load < lb {
below = append(below, storeID)
} else {
within = append(within, storeID)
}
}

l.Printf("leases successfully distributed: %s\n", formatLeaseCounts(leaseCounts))
return true, nil
boundsStr := fmt.Sprintf("mean=%.1f tolerance=%.1f%% (±%.1f) bounds=[%.1f, %.1f]",
mean, 100*tolerance, meanTolerance, lb, ub)
if len(below) > 0 || len(above) > 0 {
ok = false
reason = fmt.Sprintf(
"outside bounds %s\n\tbelow = %s\n\twithin = %s\n\tabove = %s\n",
boundsStr,
formatLoads(below, loads, mean),
formatLoads(within, loads, mean),
formatLoads(above, loads, mean),
)
} else {
ok = true
reason = fmt.Sprintf("within bounds %s\n\tstores=%s\n",
boundsStr, formatLoads(within, loads, mean))
}
return
}

func formatLeaseCounts(counts map[int]int) string {
storeIDs := make([]int, 0, len(counts))
for storeID := range counts {
storeIDs = append(storeIDs, storeID)
}
sort.Ints(storeIDs)
strs := make([]string, 0, len(counts))
for _, storeID := range storeIDs {
strs = append(strs, fmt.Sprintf("s%d: %d", storeID, counts[storeID]))
func formatLoads(storeIDs []int, loads []float64, mean float64) string {
fmtLoads := make([]string, len(storeIDs))
for i, storeID := range storeIDs {
load := loads[storeID-1]
fmtLoads[i] = fmt.Sprintf("s%d: %d (%+3.1f%%)",
storeID, int(load), (load-mean)/mean*100,
)
}
return fmt.Sprintf("[%s]", strings.Join(strs, ", "))
return fmt.Sprintf("[%s]", strings.Join(fmtLoads, ", "))
}
14 changes: 13 additions & 1 deletion pkg/cmd/roachtest/tests/ts_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ const (
rate
)

// defaultSamplePeriod is the default sampling period for getMetrics.
const defaultSamplePeriod = time.Minute

type tsQuery struct {
name string
queryType tsQueryType
sources []string
}

func mustGetMetrics(
Expand All @@ -55,6 +59,12 @@ func mustGetMetrics(

func getMetrics(
adminURL string, start, end time.Time, tsQueries []tsQuery,
) (tspb.TimeSeriesQueryResponse, error) {
return getMetricsWithSamplePeriod(adminURL, start, end, defaultSamplePeriod, tsQueries)
}

func getMetricsWithSamplePeriod(
adminURL string, start, end time.Time, samplePeriod time.Duration, tsQueries []tsQuery,
) (tspb.TimeSeriesQueryResponse, error) {
url := "http://" + adminURL + "/ts/query"
queries := make([]tspb.Query, len(tsQueries))
Expand All @@ -65,13 +75,15 @@ func getMetrics(
Name: tsQueries[i].name,
Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(),
SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(),
Sources: tsQueries[i].sources,
}
case rate:
queries[i] = tspb.Query{
Name: tsQueries[i].name,
Downsampler: tspb.TimeSeriesQueryAggregator_AVG.Enum(),
SourceAggregator: tspb.TimeSeriesQueryAggregator_SUM.Enum(),
Derivative: tspb.TimeSeriesQueryDerivative_NON_NEGATIVE_DERIVATIVE.Enum(),
Sources: tsQueries[i].sources,
}
default:
panic("unexpected")
Expand All @@ -83,7 +95,7 @@ func getMetrics(
// Ask for one minute intervals. We can't just ask for the whole hour
// because the time series query system does not support downsampling
// offsets.
SampleNanos: (1 * time.Minute).Nanoseconds(),
SampleNanos: samplePeriod.Nanoseconds(),
Queries: queries,
}
var response tspb.TimeSeriesQueryResponse
Expand Down