diff --git a/pkg/cmd/roachtest/tests/split.go b/pkg/cmd/roachtest/tests/split.go index 3a90f717399c..d4a7ec5941f3 100644 --- a/pkg/cmd/roachtest/tests/split.go +++ b/pkg/cmd/roachtest/tests/split.go @@ -50,13 +50,15 @@ type kvSplitLoad struct { spanPercent int // sequential indicates the kv workload will use a sequential distribution. sequential bool + // blockSize controls the size of writes to the kv table. + blockSize int // waitDuration is the duration the workload should run for. waitDuration time.Duration } func (ksl kvSplitLoad) init(ctx context.Context, t test.Test, c cluster.Cluster) error { t.Status("running uniform kv workload") - return c.RunE(ctx, c.Node(1), fmt.Sprintf("./workload init kv {pgurl:1-%d}", c.Spec().NodeCount)) + return c.RunE(ctx, c.Node(c.Spec().NodeCount), fmt.Sprintf("./workload init kv {pgurl:1-%d}", c.Spec().NodeCount-1)) } func (ksl kvSplitLoad) rangeCount(db *gosql.DB) (int, error) { @@ -66,14 +68,60 @@ func (ksl kvSplitLoad) rangeCount(db *gosql.DB) (int, error) { func (ksl kvSplitLoad) run(ctx context.Context, t test.Test, c cluster.Cluster) error { var extraFlags string if ksl.sequential { - extraFlags += "--sequential" + extraFlags += "--sequential " } - return c.RunE(ctx, c.Node(1), fmt.Sprintf("./workload run kv "+ + if ksl.blockSize != 0 { + extraFlags += fmt.Sprintf("--min-block-bytes=%d --max-block-bytes=%d ", + ksl.blockSize, ksl.blockSize) + } + return c.RunE(ctx, c.Node(c.Spec().NodeCount), fmt.Sprintf("./workload run kv "+ "--init --concurrency=%d --read-percent=%d --span-percent=%d %s {pgurl:1-%d} --duration='%s'", - ksl.concurrency, ksl.readPercent, ksl.spanPercent, extraFlags, c.Spec().NodeCount, + ksl.concurrency, ksl.readPercent, ksl.spanPercent, extraFlags, c.Spec().NodeCount-1, ksl.waitDuration.String())) } +type ycsbSplitLoad struct { + // workload is the YCSB workload letter e.g. a, b, ..., f. + workload string + // concurrency is the number of concurrent workers. + concurrency int + // hashed determines whether the inserted keys are hashed. + hashed bool + // insertCount is the number of records to pre-load into the user table. + insertCount int + // waitDuration is the duration the workload should run for. + waitDuration time.Duration +} + +func (ysl ycsbSplitLoad) init(ctx context.Context, t test.Test, c cluster.Cluster) error { + t.Status("running ycsb workload ", ysl.workload) + extraArgs := "" + if ysl.hashed { + extraArgs += "--insert-hash" + } + + return c.RunE(ctx, c.Node(c.Spec().NodeCount), fmt.Sprintf( + "./workload init ycsb --insert-count=%d --workload=%s %s {pgurl:1-%d}", + ysl.insertCount, ysl.workload, extraArgs, c.Spec().NodeCount-1)) +} + +func (ysl ycsbSplitLoad) rangeCount(db *gosql.DB) (int, error) { + return rangeCountFrom("ycsb.usertable", db) +} + +func (ysl ycsbSplitLoad) run(ctx context.Context, t test.Test, c cluster.Cluster) error { + extraArgs := "" + if ysl.hashed { + extraArgs += "--insert-hash" + } + + return c.RunE(ctx, c.Node(c.Spec().NodeCount), fmt.Sprintf( + "./workload run ycsb --record-count=%d --workload=%s --concurrency=%d "+ + "--duration='%s' %s {pgurl:1-%d}", + ysl.insertCount, ysl.workload, ysl.concurrency, + ysl.waitDuration.String(), extraArgs, c.Spec().NodeCount-1)) +} + func rangeCountFrom(from string, db *gosql.DB) (int, error) { var ranges int q := fmt.Sprintf("SELECT count(*) FROM [SHOW RANGES FROM TABLE %s]", @@ -85,16 +133,18 @@ func rangeCountFrom(from string, db *gosql.DB) (int, error) { } type splitParams struct { - load splitLoad - maxSize int // The maximum size a range is allowed to be. - qpsThreshold int // QPS Threshold for load based splitting. - cpuThreshold time.Duration // CPU Threshold for load based splitting. - minimumRanges int // Minimum number of ranges expected at the end. - maximumRanges int // Maximum number of ranges expected at the end. + load splitLoad + maxSize int // The maximum size a range is allowed to be. + qpsThreshold int // QPS Threshold for load based splitting. + cpuThreshold time.Duration // CPU Threshold for load based splitting. + minimumRanges int // Minimum number of ranges expected at the end. + maximumRanges int // Maximum number of ranges expected at the end. + initialRangeCount int // Initial range count expected after intiailization. } func registerLoadSplits(r registry.Registry) { - const numNodes = 3 + // Use the 4th node as the workload runner. + const numNodes = 4 r.Add(registry.TestSpec{ Name: fmt.Sprintf("splits/load/uniform/nodes=%d", numNodes), @@ -141,18 +191,37 @@ func registerLoadSplits(r registry.Registry) { }}) }, }) + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("splits/load/uniform/nodes=%d/obj=cpu", numNodes), + Owner: registry.OwnerKV, + Cluster: r.MakeClusterSpec(numNodes), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLoadSplits(ctx, t, c, splitParams{ + maxSize: 10 << 30, // 10 GB + cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second. + // There should be at least 15 splits, in practice there are on average + // 20. + minimumRanges: 15, + maximumRanges: 25, + load: kvSplitLoad{ + concurrency: 64, // 64 concurrent workers + readPercent: 95, // 95% reads + waitDuration: 10 * time.Minute, + }}) + }, + }) r.Add(registry.TestSpec{ Name: fmt.Sprintf("splits/load/sequential/nodes=%d", numNodes), Owner: registry.OwnerKV, Cluster: r.MakeClusterSpec(numNodes), Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { runLoadSplits(ctx, t, c, splitParams{ - maxSize: 10 << 30, // 10 GB - qpsThreshold: 100, // 100 queries per second - minimumRanges: 1, // We expect no splits so require only 1 range. + maxSize: 10 << 30, // 10 GB + qpsThreshold: 100, // 100 queries per second // We expect no splits so require only 1 range. However, in practice we // sometimes see a split or two early in, presumably when the sampling // gets lucky. + minimumRanges: 1, maximumRanges: 3, load: kvSplitLoad{ concurrency: 64, // 64 concurrent workers @@ -162,6 +231,30 @@ func registerLoadSplits(r registry.Registry) { }}) }, }) + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("splits/load/sequential/nodes=%d/obj=cpu", numNodes), + Owner: registry.OwnerKV, + Cluster: r.MakeClusterSpec(numNodes), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLoadSplits(ctx, t, c, splitParams{ + maxSize: 10 << 30, // 10 GB + cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second. + // We expect no splits so require only 1 range, however in practice a + // split may slip in. The reason we don't expect splits for a + // sequential pattern is that existing split samples should only have + // the right counter incremented as new requests come in to the right. + // Any sample we keep should always be the right most request we have + // seen so far. + minimumRanges: 1, + maximumRanges: 5, + load: kvSplitLoad{ + concurrency: 64, + readPercent: 0, + sequential: true, + waitDuration: 60 * time.Second, + }}) + }, + }) r.Add(registry.TestSpec{ Name: fmt.Sprintf("splits/load/spanning/nodes=%d", numNodes), Owner: registry.OwnerKV, @@ -180,6 +273,116 @@ func registerLoadSplits(r registry.Registry) { }}) }, }) + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("splits/load/spanning/nodes=%d/obj=cpu", numNodes), + Owner: registry.OwnerKV, + Cluster: r.MakeClusterSpec(numNodes), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLoadSplits(ctx, t, c, splitParams{ + maxSize: 10 << 30, // 10 GB + cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second. + // We expect 1-4 splits. There doesn't have the same requirement for + // containment as QPS, instead we want the CPU to be distributed over + // the ranges. i.e. Splitting a range based on QPS when there are only + // scans amplifies the orignal QPS, effectively doubling it. Whereas + // for CPU, the resulting lhs and rhs post split should still add up to + // approx the original range's CPU - when ignoring fixed overhead. + minimumRanges: 2, + maximumRanges: 5, + load: kvSplitLoad{ + concurrency: 64, // 64 concurrent workers + readPercent: 0, // 0% reads + spanPercent: 95, // 95% spanning queries + waitDuration: 60 * time.Second, + }}) + }, + }) + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("splits/load/ycsb/a/nodes=%d/obj=cpu", numNodes), + Owner: registry.OwnerKV, + Cluster: r.MakeClusterSpec(numNodes), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLoadSplits(ctx, t, c, splitParams{ + maxSize: 10 << 30, // 10 GB + cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second. + // YCSB/A has a zipfian distribution with 50% inserts and 50% updates. + // The number of splits should be between 20-30 after 10 minutes with + // 100ms threshold on 8vCPU machines. + minimumRanges: 20, + maximumRanges: 30, + initialRangeCount: 2, + load: ycsbSplitLoad{ + workload: "a", + concurrency: 64, + insertCount: 1e4, // 100k + waitDuration: 10 * time.Minute, + }}) + }, + }) + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("splits/load/ycsb/b/nodes=%d/obj=cpu", numNodes), + Owner: registry.OwnerKV, + Cluster: r.MakeClusterSpec(numNodes), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLoadSplits(ctx, t, c, splitParams{ + maxSize: 10 << 30, // 10 GB + // YCSB/B has a zipfian distribution with 95% reads and 5% updates. + // The number of splits should be similar to YCSB/A. + cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second. + minimumRanges: 20, + maximumRanges: 30, + initialRangeCount: 2, + load: ycsbSplitLoad{ + workload: "b", + concurrency: 64, + insertCount: 1e4, // 100k + waitDuration: 10 * time.Minute, + }}) + }, + }) + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("splits/load/ycsb/d/nodes=%d/obj=cpu", numNodes), + Owner: registry.OwnerKV, + Cluster: r.MakeClusterSpec(numNodes), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLoadSplits(ctx, t, c, splitParams{ + maxSize: 10 << 30, // 10 GB + cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second. + // YCSB/D has a latest distribution i.e. moving hotkey. The inserts are + // hashed - this will lead to many hotspots over the keyspace that + // move. Expect a few less splits than A and B. + minimumRanges: 15, + maximumRanges: 25, + initialRangeCount: 2, + load: ycsbSplitLoad{ + workload: "d", + concurrency: 64, + insertCount: 1e4, // 100k + waitDuration: 10 * time.Minute, + }}) + }, + }) + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("splits/load/ycsb/e/nodes=%d/obj=cpu", numNodes), + Owner: registry.OwnerKV, + Cluster: r.MakeClusterSpec(numNodes), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runLoadSplits(ctx, t, c, splitParams{ + maxSize: 10 << 30, // 10 GB + cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second. + // YCSB/E has a zipfian distribution with 95% scans (limit 1k) and 5% + // inserts. + minimumRanges: 8, + maximumRanges: 15, + initialRangeCount: 2, + load: ycsbSplitLoad{ + workload: "e", + concurrency: 64, + insertCount: 1e4, // 100k + waitDuration: 10 * time.Minute, + }}) + }, + }) } // runLoadSplits tests behavior of load based splitting under @@ -187,8 +390,12 @@ func registerLoadSplits(r registry.Registry) { // splits occur in different workload scenarios. func runLoadSplits(ctx context.Context, t test.Test, c cluster.Cluster, params splitParams) { c.Put(ctx, t.Cockroach(), "./cockroach", c.All()) - c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(1)) - c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.All()) + c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(4)) + startOpts := option.DefaultStartOptsNoBackups() + startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs, + "--vmodule=split_queue=2,store_rebalancer=2,allocator=2,replicate_queue=2", + ) + c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.All()) m := c.NewMonitor(ctx, c.All()) m.Go(func(ctx context.Context) error { @@ -200,9 +407,6 @@ func runLoadSplits(ctx context.Context, t test.Test, c cluster.Cluster, params s return err } - // TODO(kvoli): Add load split tests which use CPU, similar to the current - // QPS ones. Tracked by #97540. - // // Set the objective to QPS or CPU and update the load split threshold // appropriately. if params.qpsThreshold > 0 { @@ -237,8 +441,8 @@ func runLoadSplits(ctx context.Context, t test.Test, c cluster.Cluster, params s t.Fatalf("failed to set range_max_bytes: %v", err) } } - // Set the range size to a huge size so we don't get splits that occur - // as a result of size thresholds. The kv table will thus be in a single + // Set the range size to a huge size so we don't get splits that occur as a + // result of size thresholds. The workload table will thus be in a single // range unless split by load. setRangeMaxBytes(params.maxSize) @@ -248,8 +452,12 @@ func runLoadSplits(ctx context.Context, t test.Test, c cluster.Cluster, params s } t.Status("checking initial range count") - if rc, _ := params.load.rangeCount(db); rc != 1 { - return errors.Errorf("kv.kv table split over multiple ranges.") + expectedInitialRangeCount := params.initialRangeCount + if expectedInitialRangeCount == 0 { + expectedInitialRangeCount = 1 + } + if rc, _ := params.load.rangeCount(db); rc != expectedInitialRangeCount { + return errors.Errorf("table split over multiple ranges (%d)", rc) } t.Status("enable load based splitting")