Skip to content

Commit

Permalink
roachtest: export allocator roachperf benchmark
Browse files Browse the repository at this point in the history
The allocator related roachtests do not currently export benchmark
related statistics. This patch introduces stat collection on
replicate/up/1to3, replicate/rebalance/3to5, rebalance/by-load/leases and
rebalance/by-load/ranges.

The benchmark metric for the nightly run is the time taken to reach
balance, from starting the test. In addition, cluster statistics are
exported for detailed view into: cpu usage, io bandwidth, qps and range
counts.

Release note: None
  • Loading branch information
kvoli committed Jun 8, 2022
1 parent be73642 commit 3bace94
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 9 deletions.
3 changes: 3 additions & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ go_library(
"quit.go",
"rapid_restart.go",
"rebalance_load.go",
"rebalance_stats.go",
"registry.go",
"replicagc.go",
"reset_quorum.go",
Expand Down Expand Up @@ -160,6 +161,7 @@ go_library(
"//pkg/clusterversion",
"//pkg/cmd/cmpconn",
"//pkg/cmd/roachtest/cluster",
"//pkg/cmd/roachtest/clusterstats",
"//pkg/cmd/roachtest/option",
"//pkg/cmd/roachtest/prometheus",
"//pkg/cmd/roachtest/registry",
Expand Down Expand Up @@ -218,6 +220,7 @@ go_library(
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_kr_pretty//:pretty",
"@com_github_lib_pq//:pq",
"@com_github_montanaflynn_stats//:stats",
"@com_github_prometheus_client_golang//api",
"@com_github_prometheus_client_golang//api/prometheus/v1:prometheus",
"@com_github_prometheus_common//model",
Expand Down
62 changes: 54 additions & 8 deletions pkg/cmd/roachtest/tests/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
Expand All @@ -28,10 +29,16 @@ import (
"github.com/cockroachdb/errors"
)

// The duration of no rebalancing actions taken before we assume the
// configuration is in a steady state and assess balance.
const allocatorStableSeconds = 3 * 60

func registerAllocator(r registry.Registry) {
runAllocator := func(ctx context.Context, t test.Test, c cluster.Cluster, start int, maxStdDev float64) {
c.Put(ctx, t.Cockroach(), "./cockroach")

// Put away one node to be the stats collector.
nodes := c.Spec().NodeCount - 1
startOpts := option.DefaultStartOpts()
startOpts.RoachprodOpts.ExtraArgs = []string{"--vmodule=store_rebalancer=5,allocator=5,allocator_scorer=5,replicate_queue=5"}
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(1, start))
Expand All @@ -50,11 +57,18 @@ func registerAllocator(r registry.Registry) {
})
m.Wait()

statCollector, err := clusterstats.NewStatsCollector(ctx, t, c)

if err != nil {
t.Fatal(err)
}
defer statCollector.CleanUp()

// Start the remaining nodes to kick off upreplication/rebalancing.
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(start+1, c.Spec().NodeCount))
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(start+1, nodes))

c.Run(ctx, c.Node(1), `./cockroach workload init kv --drop`)
for node := 1; node <= c.Spec().NodeCount; node++ {
for node := 1; node <= nodes; node++ {
node := node
// TODO(dan): Ideally, the test would fail if this queryload failed,
// but we can't put it in monitor as-is because the test deadlocks.
Expand All @@ -69,26 +83,60 @@ func registerAllocator(r registry.Registry) {
}()
}

// Wait for 3x replication, we record the time taken to achieve this.
var replicateTime time.Time
m = c.NewMonitor(ctx, c.All())
m.Go(func(ctx context.Context) error {
err := WaitFor3XReplication(ctx, t, db)
replicateTime = timeutil.Now()
return err
})
m.Wait()

// Wait for replica count balance, this occurs only following
// up-replication finishing.
m = c.NewMonitor(ctx, c.All())
m.Go(func(ctx context.Context) error {
t.Status("waiting for reblance")
return waitForRebalance(ctx, t.L(), db, maxStdDev)
startTime := timeutil.Now()
err := waitForRebalance(ctx, t.L(), db, maxStdDev)
if err != nil {
return err
}
endTime := timeutil.Now()
err = statCollector.Exporter().Export(
ctx,
c,
t,
startTime, endTime,
joinSummaryQueries(actionsSummary, underReplicatedSummary, requestBalanceSummary, resourceBalanceSummary),
// NB: We record the time taken to reach balance, from when
// up-replication began, until the last rebalance action
// taken.
func(stats map[string]clusterstats.StatSummary) (string, float64) {
return "t-balance", endTime.Sub(startTime).Seconds() - allocatorStableSeconds
},
func(stats map[string]clusterstats.StatSummary) (string, float64) {
return "t-uprepl", replicateTime.Sub(startTime).Seconds() - allocatorStableSeconds
},
)
return err
})
m.Wait()
}

r.Add(registry.TestSpec{
Name: `replicate/up/1to3`,
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(3),
Cluster: r.MakeClusterSpec(4),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runAllocator(ctx, t, c, 1, 10.0)
},
})
r.Add(registry.TestSpec{
Name: `replicate/rebalance/3to5`,
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(5),
Cluster: r.MakeClusterSpec(6),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runAllocator(ctx, t, c, 3, 42.0)
},
Expand Down Expand Up @@ -223,9 +271,7 @@ func allocatorStats(db *gosql.DB) (s replicationStats, err error) {
func waitForRebalance(
ctx context.Context, l *logger.Logger, db *gosql.DB, maxStdDev float64,
) error {
// const statsInterval = 20 * time.Second
const statsInterval = 2 * time.Second
const stableSeconds = 3 * 60

var statsTimer timeutil.Timer
defer statsTimer.Stop()
Expand All @@ -242,7 +288,7 @@ func waitForRebalance(
}

l.Printf("%v\n", stats)
if stableSeconds <= stats.SecondsSinceLastEvent {
if allocatorStableSeconds <= stats.SecondsSinceLastEvent {
l.Printf("replica count stddev = %f, max allowed stddev = %f\n", stats.ReplicaCountStdDev, maxStdDev)
if stats.ReplicaCountStdDev > maxStdDev {
_ = printRebalanceStats(l, db)
Expand Down
25 changes: 24 additions & 1 deletion pkg/cmd/roachtest/tests/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
Expand Down Expand Up @@ -92,6 +93,12 @@ func registerRebalanceLoad(r registry.Registry) {
c.Start(ctx, t.L(), startOpts, settings, roachNodes)
}

statCollector, err := clusterstats.NewStatsCollector(ctx, t, c)
if err != nil {
t.Fatal(err)
}
defer statCollector.CleanUp()

c.Put(ctx, t.DeprecatedWorkload(), "./workload", appNode)
c.Run(ctx, appNode, fmt.Sprintf("./workload init kv --drop --splits=%d {pgurl:1}", splits))

Expand Down Expand Up @@ -136,13 +143,29 @@ func registerRebalanceLoad(r registry.Registry) {
return err
}

startTime := timeutil.Now()
for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= maxDuration; {
if done, err := isLoadEvenlyDistributed(t.L(), db, numStores); err != nil {
return err
} else if done {
t.Status("successfully achieved lease balance; waiting for kv to finish running")
// TODO(kvoli): Support mixed version testing, currently it will attempt to init
if !mixedVersion {
endTime := timeutil.Now()
err = statCollector.Exporter().Export(
ctx,
c,
t,
startTime, endTime,
joinSummaryQueries(actionsSummary, rangeBalanceSummary, requestBalanceSummary, resourceBalanceSummary),
// NB: We record the time taken to reach balance.
func(stats map[string]clusterstats.StatSummary) (string, float64) {
return "t-balance", endTime.Sub(startTime).Seconds()
},
)
}
cancel()
return nil
return err
}

select {
Expand Down
98 changes: 98 additions & 0 deletions pkg/cmd/roachtest/tests/rebalance_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/montanaflynn/stats"
)

var (
qpsStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rebalancing_queriespersecond"}
rqpsStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rebalancing_requestspersecond"}
wpsStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rebalancing_writespersecond"}
rpsStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rebalancing_readspersecond"}
// NB: CPU is fractional, roachperf will only take integers on a detailed
// view. Scale by 100 to get % here. When aggregating on cpu, measure
// of distribution that are normalized are preferred, due to scale.
cpuStat = clusterstats.ClusterStat{LabelName: "instance", Query: "sys_cpu_combined_percent_normalized * 100"}
// NB: These are recorded as counters. These are then rated, as we are
// interested in the progression at points in time.
ioReadStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rate(sys_host_disk_read_bytes[10m])"}
ioWriteStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rate(sys_host_disk_write_bytes[10m])"}
netSendStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rate(sys_host_net_send_bytes[10m])"}
netRecvStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rate(sys_host_net_recv_bytes[10m])"}
rangeCountStat = clusterstats.ClusterStat{LabelName: "instance", Query: "ranges"}
underreplicatedStat = clusterstats.ClusterStat{LabelName: "instance", Query: "ranges_underreplicated"}
replicasStat = clusterstats.ClusterStat{LabelName: "instance", Query: "replicas"}
leaseTransferStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rebalancing_lease_transfers"}
rangeRebalancesStat = clusterstats.ClusterStat{LabelName: "instance", Query: "rebalancing_range_rebalances"}
rangeSplitStat = clusterstats.ClusterStat{LabelName: "instance", Query: "range_splits"}

underReplicatedSummary = []clusterstats.AggQuery{
{Stat: underreplicatedStat, Query: "sum(ranges_underreplicated)"},
}
rangeBalanceSummary = []clusterstats.AggQuery{
{Stat: rangeCountStat, AggFn: distributionAggregate},
}
actionsSummary = []clusterstats.AggQuery{
{Stat: rangeRebalancesStat, Query: "sum(rebalancing_range_rebalances)"},
{Stat: leaseTransferStat, Query: "sum(rebalancing_lease_transfers)"},
{Stat: rangeSplitStat, Query: "sum(range_splits)"},
}
resourceBalanceSummary = []clusterstats.AggQuery{
// NB: cv is an abbreviation for coefficient of variation.
{Stat: cpuStat, AggFn: distributionAggregate},
{Stat: ioWriteStat, AggFn: distributionAggregate},
{Stat: ioReadStat, AggFn: distributionAggregate},
}
requestBalanceSummary = []clusterstats.AggQuery{
{Stat: qpsStat, AggFn: distributionAggregate},
{Stat: wpsStat, AggFn: distributionAggregate},
{Stat: rpsStat, AggFn: distributionAggregate},
}
)

func joinSummaryQueries(queries ...[]clusterstats.AggQuery) []clusterstats.AggQuery {
ret := make([]clusterstats.AggQuery, 0, 1)
for _, q := range queries {
ret = append(ret, q...)
}
return ret
}

func distributionAggregate(query string, series [][]float64) (string, []float64) {
return fmt.Sprintf("cv(%s)", query), scale(applyToSeries(series, coefficientOfVariation), 100)
}

func applyToSeries(timeseries [][]float64, aggFn func(vals []float64) float64) []float64 {
ret := make([]float64, len(timeseries))
for i := range timeseries {
ret[i] = aggFn(timeseries[i])
}
return ret
}

func coefficientOfVariation(vals []float64) float64 {
stdev, _ := stats.StandardDeviationSample(vals)
mean, _ := stats.Mean(vals)
return stdev / mean
}

func scale(vals []float64, scale float64) []float64 {
scaled := make([]float64, len(vals))
for i := range vals {
scaled[i] = vals[i] * scale
}
return scaled
}

0 comments on commit 3bace94

Please sign in to comment.