Skip to content

Commit

Permalink
roachtest: add cpu load split tests
Browse files Browse the repository at this point in the history
There were no roachtests which used the new CPU load based splitter.
This commit adds similar tests as those that exist for QPS:

- spanning
- uniform
- sequential

In addition to select YCSB tests which have a column family schema and
zipfian/latest distributions. It is a known issue that the YCSB
workloads will commonly return a start key due to column families when
splitting a range with a single hot row #102136.

- YCSB/A
- YCSB/B
- YCSB/D
- YCSB/E

Both the minimum and maximimum number of ranges after some time is
asserted on. For YCSB and KV uniform, the period is 10 minutes. For
KV spanning and sequential, the period is 60 seconds.

Unlike QPS splitting, which will not attempt to split Spanning requests,
CPU splitting will attempt to split ranges with higher CPU than the
threshold, as this does not result in signal amplification unlike QPS -
which doubles the QPS on each split of span request heavy ranges.

Resolves: #97540

Release note: None
  • Loading branch information
kvoli committed Apr 24, 2023
1 parent e9effbb commit 5df42dc
Showing 1 changed file with 231 additions and 23 deletions.
254 changes: 231 additions & 23 deletions pkg/cmd/roachtest/tests/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ type kvSplitLoad struct {
spanPercent int
// sequential indicates the kv workload will use a sequential distribution.
sequential bool
// blockSize controls the size of writes to the kv table.
blockSize int
// waitDuration is the duration the workload should run for.
waitDuration time.Duration
}

func (ksl kvSplitLoad) init(ctx context.Context, t test.Test, c cluster.Cluster) error {
t.Status("running uniform kv workload")
return c.RunE(ctx, c.Node(1), fmt.Sprintf("./workload init kv {pgurl:1-%d}", c.Spec().NodeCount))
return c.RunE(ctx, c.Node(c.Spec().NodeCount), fmt.Sprintf("./workload init kv {pgurl:1-%d}", c.Spec().NodeCount-1))
}

func (ksl kvSplitLoad) rangeCount(db *gosql.DB) (int, error) {
Expand All @@ -66,14 +68,60 @@ func (ksl kvSplitLoad) rangeCount(db *gosql.DB) (int, error) {
func (ksl kvSplitLoad) run(ctx context.Context, t test.Test, c cluster.Cluster) error {
var extraFlags string
if ksl.sequential {
extraFlags += "--sequential"
extraFlags += "--sequential "
}
return c.RunE(ctx, c.Node(1), fmt.Sprintf("./workload run kv "+
if ksl.blockSize != 0 {
extraFlags += fmt.Sprintf("--min-block-bytes=%d --max-block-bytes=%d ",
ksl.blockSize, ksl.blockSize)
}
return c.RunE(ctx, c.Node(c.Spec().NodeCount), fmt.Sprintf("./workload run kv "+
"--init --concurrency=%d --read-percent=%d --span-percent=%d %s {pgurl:1-%d} --duration='%s'",
ksl.concurrency, ksl.readPercent, ksl.spanPercent, extraFlags, c.Spec().NodeCount,
ksl.concurrency, ksl.readPercent, ksl.spanPercent, extraFlags, c.Spec().NodeCount-1,
ksl.waitDuration.String()))
}

type ycsbSplitLoad struct {
// workload is the YCSB workload letter e.g. a, b, ..., f.
workload string
// concurrency is the number of concurrent workers.
concurrency int
// hashed determines whether the inserted keys are hashed.
hashed bool
// insertCount is the number of records to pre-load into the user table.
insertCount int
// waitDuration is the duration the workload should run for.
waitDuration time.Duration
}

func (ysl ycsbSplitLoad) init(ctx context.Context, t test.Test, c cluster.Cluster) error {
t.Status("running ycsb workload ", ysl.workload)
extraArgs := ""
if ysl.hashed {
extraArgs += "--insert-hash"
}

return c.RunE(ctx, c.Node(c.Spec().NodeCount), fmt.Sprintf(
"./workload init ycsb --insert-count=%d --workload=%s %s {pgurl:1-%d}",
ysl.insertCount, ysl.workload, extraArgs, c.Spec().NodeCount-1))
}

func (ysl ycsbSplitLoad) rangeCount(db *gosql.DB) (int, error) {
return rangeCountFrom("ycsb.usertable", db)
}

func (ysl ycsbSplitLoad) run(ctx context.Context, t test.Test, c cluster.Cluster) error {
extraArgs := ""
if ysl.hashed {
extraArgs += "--insert-hash"
}

return c.RunE(ctx, c.Node(c.Spec().NodeCount), fmt.Sprintf(
"./workload run ycsb --record-count=%d --workload=%s --concurrency=%d "+
"--duration='%s' %s {pgurl:1-%d}",
ysl.insertCount, ysl.workload, ysl.concurrency,
ysl.waitDuration.String(), extraArgs, c.Spec().NodeCount-1))
}

func rangeCountFrom(from string, db *gosql.DB) (int, error) {
var ranges int
q := fmt.Sprintf("SELECT count(*) FROM [SHOW RANGES FROM TABLE %s]",
Expand All @@ -85,16 +133,18 @@ func rangeCountFrom(from string, db *gosql.DB) (int, error) {
}

type splitParams struct {
load splitLoad
maxSize int // The maximum size a range is allowed to be.
qpsThreshold int // QPS Threshold for load based splitting.
cpuThreshold time.Duration // CPU Threshold for load based splitting.
minimumRanges int // Minimum number of ranges expected at the end.
maximumRanges int // Maximum number of ranges expected at the end.
load splitLoad
maxSize int // The maximum size a range is allowed to be.
qpsThreshold int // QPS Threshold for load based splitting.
cpuThreshold time.Duration // CPU Threshold for load based splitting.
minimumRanges int // Minimum number of ranges expected at the end.
maximumRanges int // Maximum number of ranges expected at the end.
initialRangeCount int // Initial range count expected after intiailization.
}

func registerLoadSplits(r registry.Registry) {
const numNodes = 3
// Use the 4th node as the workload runner.
const numNodes = 4

r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/uniform/nodes=%d", numNodes),
Expand Down Expand Up @@ -141,18 +191,37 @@ func registerLoadSplits(r registry.Registry) {
}})
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/uniform/nodes=%d/obj=cpu", numNodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLoadSplits(ctx, t, c, splitParams{
maxSize: 10 << 30, // 10 GB
cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second.
// There should be at least 15 splits, in practice there are on average
// 20.
minimumRanges: 15,
maximumRanges: 25,
load: kvSplitLoad{
concurrency: 64, // 64 concurrent workers
readPercent: 95, // 95% reads
waitDuration: 10 * time.Minute,
}})
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/sequential/nodes=%d", numNodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLoadSplits(ctx, t, c, splitParams{
maxSize: 10 << 30, // 10 GB
qpsThreshold: 100, // 100 queries per second
minimumRanges: 1, // We expect no splits so require only 1 range.
maxSize: 10 << 30, // 10 GB
qpsThreshold: 100, // 100 queries per second
// We expect no splits so require only 1 range. However, in practice we
// sometimes see a split or two early in, presumably when the sampling
// gets lucky.
minimumRanges: 1,
maximumRanges: 3,
load: kvSplitLoad{
concurrency: 64, // 64 concurrent workers
Expand All @@ -162,6 +231,30 @@ func registerLoadSplits(r registry.Registry) {
}})
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/sequential/nodes=%d/obj=cpu", numNodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLoadSplits(ctx, t, c, splitParams{
maxSize: 10 << 30, // 10 GB
cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second.
// We expect no splits so require only 1 range, however in practice a
// split may slip in. The reason we don't expect splits for a
// sequential pattern is that existing split samples should only have
// the right counter incremented as new requests come in to the right.
// Any sample we keep should always be the right most request we have
// seen so far.
minimumRanges: 1,
maximumRanges: 5,
load: kvSplitLoad{
concurrency: 64,
readPercent: 0,
sequential: true,
waitDuration: 60 * time.Second,
}})
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/spanning/nodes=%d", numNodes),
Owner: registry.OwnerKV,
Expand All @@ -180,15 +273,129 @@ func registerLoadSplits(r registry.Registry) {
}})
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/spanning/nodes=%d/obj=cpu", numNodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLoadSplits(ctx, t, c, splitParams{
maxSize: 10 << 30, // 10 GB
cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second.
// We expect 1-4 splits. There doesn't have the same requirement for
// containment as QPS, instead we want the CPU to be distributed over
// the ranges. i.e. Splitting a range based on QPS when there are only
// scans amplifies the orignal QPS, effectively doubling it. Whereas
// for CPU, the resulting lhs and rhs post split should still add up to
// approx the original range's CPU - when ignoring fixed overhead.
minimumRanges: 2,
maximumRanges: 5,
load: kvSplitLoad{
concurrency: 64, // 64 concurrent workers
readPercent: 0, // 0% reads
spanPercent: 95, // 95% spanning queries
waitDuration: 60 * time.Second,
}})
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/ycsb/a/nodes=%d/obj=cpu", numNodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLoadSplits(ctx, t, c, splitParams{
maxSize: 10 << 30, // 10 GB
cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second.
// YCSB/A has a zipfian distribution with 50% inserts and 50% updates.
// The number of splits should be between 20-30 after 10 minutes with
// 100ms threshold on 8vCPU machines.
minimumRanges: 20,
maximumRanges: 30,
initialRangeCount: 2,
load: ycsbSplitLoad{
workload: "a",
concurrency: 64,
insertCount: 1e4, // 100k
waitDuration: 10 * time.Minute,
}})
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/ycsb/b/nodes=%d/obj=cpu", numNodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLoadSplits(ctx, t, c, splitParams{
maxSize: 10 << 30, // 10 GB
// YCSB/B has a zipfian distribution with 95% reads and 5% updates.
// The number of splits should be similar to YCSB/A.
cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second.
minimumRanges: 20,
maximumRanges: 30,
initialRangeCount: 2,
load: ycsbSplitLoad{
workload: "b",
concurrency: 64,
insertCount: 1e4, // 100k
waitDuration: 10 * time.Minute,
}})
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/ycsb/d/nodes=%d/obj=cpu", numNodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLoadSplits(ctx, t, c, splitParams{
maxSize: 10 << 30, // 10 GB
cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second.
// YCSB/D has a latest distribution i.e. moving hotkey. The inserts are
// hashed - this will lead to many hotspots over the keyspace that
// move. Expect a few less splits than A and B.
minimumRanges: 15,
maximumRanges: 25,
initialRangeCount: 2,
load: ycsbSplitLoad{
workload: "d",
concurrency: 64,
insertCount: 1e4, // 100k
waitDuration: 10 * time.Minute,
}})
},
})
r.Add(registry.TestSpec{
Name: fmt.Sprintf("splits/load/ycsb/e/nodes=%d/obj=cpu", numNodes),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(numNodes),
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runLoadSplits(ctx, t, c, splitParams{
maxSize: 10 << 30, // 10 GB
cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second.
// YCSB/E has a zipfian distribution with 95% scans (limit 1k) and 5%
// inserts.
minimumRanges: 8,
maximumRanges: 15,
initialRangeCount: 2,
load: ycsbSplitLoad{
workload: "e",
concurrency: 64,
insertCount: 1e4, // 100k
waitDuration: 10 * time.Minute,
}})
},
})
}

// runLoadSplits tests behavior of load based splitting under
// conditions defined by the params. It checks whether certain number of
// splits occur in different workload scenarios.
func runLoadSplits(ctx context.Context, t test.Test, c cluster.Cluster, params splitParams) {
c.Put(ctx, t.Cockroach(), "./cockroach", c.All())
c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(1))
c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.All())
c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(4))
startOpts := option.DefaultStartOptsNoBackups()
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=split_queue=2,store_rebalancer=2,allocator=2,replicate_queue=2",
)
c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.All())

m := c.NewMonitor(ctx, c.All())
m.Go(func(ctx context.Context) error {
Expand All @@ -200,9 +407,6 @@ func runLoadSplits(ctx context.Context, t test.Test, c cluster.Cluster, params s
return err
}

// TODO(kvoli): Add load split tests which use CPU, similar to the current
// QPS ones. Tracked by #97540.
//
// Set the objective to QPS or CPU and update the load split threshold
// appropriately.
if params.qpsThreshold > 0 {
Expand Down Expand Up @@ -237,8 +441,8 @@ func runLoadSplits(ctx context.Context, t test.Test, c cluster.Cluster, params s
t.Fatalf("failed to set range_max_bytes: %v", err)
}
}
// Set the range size to a huge size so we don't get splits that occur
// as a result of size thresholds. The kv table will thus be in a single
// Set the range size to a huge size so we don't get splits that occur as a
// result of size thresholds. The workload table will thus be in a single
// range unless split by load.
setRangeMaxBytes(params.maxSize)

Expand All @@ -248,8 +452,12 @@ func runLoadSplits(ctx context.Context, t test.Test, c cluster.Cluster, params s
}

t.Status("checking initial range count")
if rc, _ := params.load.rangeCount(db); rc != 1 {
return errors.Errorf("kv.kv table split over multiple ranges.")
expectedInitialRangeCount := params.initialRangeCount
if expectedInitialRangeCount == 0 {
expectedInitialRangeCount = 1
}
if rc, _ := params.load.rangeCount(db); rc != expectedInitialRangeCount {
return errors.Errorf("table split over multiple ranges (%d)", rc)
}

t.Status("enable load based splitting")
Expand Down

0 comments on commit 5df42dc

Please sign in to comment.