From 9816f30fcd3564c64ffaf7636936d4e59f59dfd3 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 25 Jul 2023 16:54:51 +0200 Subject: [PATCH 1/6] kvserver: make setupClusterForClosedTSTesting more flexible Allow passing in a custom side transport interval. This will be needed to adopt it in TestClosedTimestampFrozenAfterSubsumption. --- pkg/kv/kvserver/client_rangefeed_test.go | 2 +- pkg/kv/kvserver/closed_timestamp_test.go | 25 +++++++++++++---------- pkg/kv/kvserver/replica_rangefeed_test.go | 2 +- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/pkg/kv/kvserver/client_rangefeed_test.go b/pkg/kv/kvserver/client_rangefeed_test.go index b4e068695a0c..d8214a035999 100644 --- a/pkg/kv/kvserver/client_rangefeed_test.go +++ b/pkg/kv/kvserver/client_rangefeed_test.go @@ -227,7 +227,7 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) { clusterArgs.ReplicationMode = base.ReplicationManual // NB: setupClusterForClosedTSTesting sets a low closed timestamp target // duration. - tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, "cttest", "kv") + tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) tc.AddNonVotersOrFatal(t, desc.StartKey.AsRawKey(), tc.Target(1)) diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 8bebfbbb082b..466696c8f7e1 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -85,7 +85,7 @@ func TestClosedTimestampCanServe(t *testing.T) { // Disable the replicateQueue so that it doesn't interfere with replica // membership ranges. clusterArgs.ReplicationMode = base.ReplicationManual - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, dbName, tableName) + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -156,7 +156,7 @@ func TestClosedTimestampCanServeOnVoterIncoming(t *testing.T) { clusterArgs.ReplicationMode = base.ReplicationManual knobs, ltk := makeReplicationTestKnobs() clusterArgs.ServerArgs.Knobs = knobs - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, clusterArgs, dbName, tableName) + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, clusterArgs, dbName, tableName) defer tc.Stopper().Stop(ctx) if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { @@ -192,7 +192,7 @@ func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -270,7 +270,7 @@ func TestClosedTimestampCantServeWithConflictingIntent(t *testing.T) { defer txnwait.TestingOverrideTxnLivenessThreshold(time.Hour)() ctx := context.Background() - tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, _, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) @@ -377,7 +377,7 @@ func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") repls := replsForRange(ctx, t, tc, desc) // Disable the automatic merging. if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { @@ -457,7 +457,7 @@ func TestClosedTimestampCantServeBasedOnUncertaintyLimit(t *testing.T) { ctx := context.Background() // Set up the target duration to be very long and rely on lease transfers to // drive MaxClosed. - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -490,7 +490,7 @@ func TestClosedTimestampCanServeForWritingTransaction(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -537,7 +537,7 @@ func TestClosedTimestampCantServeForNonTransactionalReadRequest(t *testing.T) { skip.UnderRace(t) ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -579,7 +579,7 @@ func TestClosedTimestampCantServeForNonTransactionalBatch(t *testing.T) { testutils.RunTrueAndFalse(t, "tsFromServer", func(t *testing.T, tsFromServer bool) { ctx := context.Background() - tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db0, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) @@ -1195,11 +1195,14 @@ func aggressiveResolvedTimestampPushKnobs() *kvserver.StoreTestingKnobs { func setupClusterForClosedTSTesting( ctx context.Context, t *testing.T, - targetDuration time.Duration, + targetDuration, sideTransportInterval time.Duration, clusterArgs base.TestClusterArgs, dbName, tableName string, ) (tc serverutils.TestClusterInterface, db0 *gosql.DB, kvTableDesc roachpb.RangeDescriptor) { const numNodes = 3 + if sideTransportInterval == 0 { + sideTransportInterval = targetDuration / 4 + } tc, desc := setupTestClusterWithDummyRange(t, clusterArgs, dbName, tableName, numNodes) sqlRunner := sqlutils.MakeSQLRunner(tc.ServerConn(0)) sqlRunner.ExecMultiple(t, strings.Split(fmt.Sprintf(` @@ -1207,7 +1210,7 @@ SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s'; SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '%s'; SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true; SET CLUSTER SETTING kv.allocator.load_based_rebalancing = 'off'; -`, targetDuration, targetDuration/4), +`, targetDuration, sideTransportInterval), ";")...) // Disable replicate queues to avoid errant lease transfers. diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 0c85ec4016e2..41724a7d5e7e 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -1116,7 +1116,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - tc, db, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") + tc, db, desc := setupClusterForClosedTSTesting(ctx, t, testingTargetDuration, 0, aggressiveResolvedTimestampClusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) repls := replsForRange(ctx, t, tc, desc) From 208a7eb6cc061b0f358aab76eaf78371c3f1de65 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 25 Jul 2023 16:55:38 +0200 Subject: [PATCH 2/6] kvserver: improve TestClosedTimestampFrozenAfterSubsumption Adopt setupClusterForClosedTSTesting which wraps setupClusterWithDummyRange with goodies such as #107531. --- pkg/kv/kvserver/closed_timestamp_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 466696c8f7e1..ecbec6715e13 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -722,8 +722,7 @@ func TestClosedTimestampFrozenAfterSubsumption(t *testing.T) { // Set up the closed timestamp timing such that, when we block a merge and // transfer the RHS lease, the closed timestamp advances over the LHS // lease but not over the RHS lease. - const numNodes = 3 - tc, _ := setupTestClusterWithDummyRange(t, clusterArgs, "cttest" /* dbName */, "kv" /* tableName */, numNodes) + tc, _, _ := setupClusterForClosedTSTesting(ctx, t, 5*time.Second, 100*time.Millisecond, clusterArgs, "cttest", "kv") defer tc.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) sqlDB.ExecMultiple(t, strings.Split(fmt.Sprintf(` From b89e48d7c892469e28b7e1269386f9a92467350f Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 24 Jul 2023 13:23:11 +0000 Subject: [PATCH 3/6] rangefeed: add `BenchmarkRangefeed` This patch adds a basic benchmark for rangefeed event throughput. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/BUILD.bazel | 1 + pkg/kv/kvserver/rangefeed/bench_test.go | 140 ++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 pkg/kv/kvserver/rangefeed/bench_test.go diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index c5a683ec6f49..1d421e146057 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -45,6 +45,7 @@ go_test( name = "rangefeed_test", size = "large", srcs = [ + "bench_test.go", "budget_test.go", "catchup_scan_bench_test.go", "catchup_scan_test.go", diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go new file mode 100644 index 000000000000..fa83cc643ff3 --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -0,0 +1,140 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "encoding/binary" + "fmt" + "math" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/future" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" +) + +type benchmarkRangefeedOpts struct { + numRegistrations int +} + +// BenchmarkRangefeed benchmarks the processor and registrations, by submitting +// a set of events and waiting until they are all emitted. +func BenchmarkRangefeed(b *testing.B) { + for _, numRegistrations := range []int{1, 10, 100} { + name := fmt.Sprintf("numRegs=%d", numRegistrations) + b.Run(name, func(b *testing.B) { + runBenchmarkRangefeed(b, benchmarkRangefeedOpts{ + numRegistrations: numRegistrations, + }) + }) + } +} + +// runBenchmarkRangefeed runs a rangefeed benchmark, emitting b.N events across +// a rangefeed. +func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { + defer log.Scope(b).Close(b) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")} + + // Set up processor. + p := NewProcessor(Config{ + AmbientContext: log.MakeTestingAmbientContext(nil), + Clock: hlc.NewClockForTesting(nil), + Metrics: NewMetrics(), + Span: span, + MemBudget: newTestBudget(math.MaxInt64), + EventChanCap: b.N, + EventChanTimeout: time.Hour, + }) + require.NoError(b, p.Start(stopper, nil)) + + // Add registrations. + streams := make([]*noopStream, opts.numRegistrations) + futures := make([]*future.ErrorFuture, opts.numRegistrations) + for i := 0; i < opts.numRegistrations; i++ { + // withDiff does not matter for these benchmarks, since the previous value + // is fetched and populated during Raft application. + const withDiff = false + streams[i] = &noopStream{ctx: ctx} + futures[i] = &future.ErrorFuture{} + ok, _ := p.Register(span, hlc.MinTimestamp, nil, withDiff, streams[i], nil, futures[i]) + require.True(b, ok) + } + + // Construct b.N events beforehand -- we don't want to measure this cost. + prefix := roachpb.Key("key-") + value := roachpb.MakeValueFromString("a few bytes of data").RawBytes + ops := make([]enginepb.MVCCLogicalOp, 0, b.N) + + for i := 0; i < b.N; i++ { + key := append(prefix, make([]byte, 4)...) + binary.BigEndian.PutUint32(key[len(prefix):], uint32(i)) + ts := hlc.Timestamp{WallTime: int64(i + 1)} + ops = append(ops, makeLogicalOp(&enginepb.MVCCWriteValueOp{ + Key: key, + Timestamp: ts, + Value: value, + })) + } + + // Wait for catchup scans and flush checkpoint events. + p.syncEventAndRegistrations() + + // Run the benchmark. We accounted for b.N when constructing events. + b.ResetTimer() + + for _, op := range ops { + if !p.ConsumeLogicalOps(ctx, op) { + b.Fatal("failed to submit op") + } + } + p.syncEventAndRegistrations() + + // Check that all registrations ended successfully, and emitted the expected + // number of events. + b.StopTimer() + p.Stop() + + for i, f := range futures { + regErr, err := future.Wait(ctx, f) + require.NoError(b, err) + require.NoError(b, regErr) + require.Equal(b, b.N, streams[i].events-1) // ignore checkpoint after catchup + } +} + +// noopStream is a stream that does nothing, except count events. +type noopStream struct { + ctx context.Context + events int +} + +func (s *noopStream) Context() context.Context { + return s.ctx +} + +func (s *noopStream) Send(*kvpb.RangeFeedEvent) error { + s.events++ + return nil +} From 0f3f1467e8627f822ce82f78b82e9a6ad828556d Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 25 Jul 2023 10:49:43 +0000 Subject: [PATCH 4/6] rangefeed: migrate `BenchmarkProcessorWithBudget` to new benchmarks Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/bench_test.go | 26 ++++++++- pkg/kv/kvserver/rangefeed/processor_test.go | 58 --------------------- 2 files changed, 25 insertions(+), 59 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index fa83cc643ff3..e6db218ea125 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -30,6 +30,7 @@ import ( type benchmarkRangefeedOpts struct { numRegistrations int + budget int64 } // BenchmarkRangefeed benchmarks the processor and registrations, by submitting @@ -40,6 +41,25 @@ func BenchmarkRangefeed(b *testing.B) { b.Run(name, func(b *testing.B) { runBenchmarkRangefeed(b, benchmarkRangefeedOpts{ numRegistrations: numRegistrations, + budget: math.MaxInt64, + }) + }) + } +} + +// BenchmarkRangefeedBudget benchmarks the effect of enabling/disabling the +// processor budget. It sets up a single processor and registration, and +// processes a set of events. +func BenchmarkRangefeedBudget(b *testing.B) { + for _, budget := range []bool{false, true} { + b.Run(fmt.Sprintf("budget=%t", budget), func(b *testing.B) { + var budgetSize int64 + if budget { + budgetSize = math.MaxInt64 + } + runBenchmarkRangefeed(b, benchmarkRangefeedOpts{ + numRegistrations: 1, + budget: budgetSize, }) }) } @@ -55,6 +75,10 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { stopper := stop.NewStopper() defer stopper.Stop(ctx) + var budget *FeedBudget + if opts.budget > 0 { + budget = newTestBudget(opts.budget) + } span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")} // Set up processor. @@ -63,7 +87,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { Clock: hlc.NewClockForTesting(nil), Metrics: NewMetrics(), Span: span, - MemBudget: newTestBudget(math.MaxInt64), + MemBudget: budget, EventChanCap: b.N, EventChanTimeout: time.Hour, }) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 19e1c0030e21..da4e279d3998 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -1451,64 +1451,6 @@ func (c *consumer) Consumed() int { return int(atomic.LoadInt32(&c.sentValues)) } -func BenchmarkProcessorWithBudget(b *testing.B) { - benchmarkEvents := 1 - - var budget *FeedBudget - if false { - budget = newTestBudget(math.MaxInt64) - } - - stopper := stop.NewStopper() - var pushTxnInterval, pushTxnAge time.Duration = 0, 0 // disable - p := NewProcessor(Config{ - AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: hlc.NewClockForTesting(nil), - Span: roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}, - PushTxnsInterval: pushTxnInterval, - PushTxnsAge: pushTxnAge, - EventChanCap: benchmarkEvents * b.N, - Metrics: NewMetrics(), - MemBudget: budget, - EventChanTimeout: time.Minute, - }) - require.NoError(b, p.Start(stopper, nil)) - ctx := context.Background() - defer stopper.Stop(ctx) - - // Add a registration. - r1Stream := newTestStream() - - var r1Done future.ErrorFuture - _, _ = p.Register( - roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("m")}, - hlc.Timestamp{WallTime: 1}, - nil, /* catchUpIter */ - false, /* withDiff */ - r1Stream, - func() {}, - &r1Done, - ) - p.syncEventAndRegistrations() - - b.ResetTimer() - for bi := 0; bi < b.N; bi++ { - for i := 0; i < benchmarkEvents; i++ { - p.ConsumeLogicalOps(ctx, writeValueOpWithKV( - roachpb.Key("k"), - hlc.Timestamp{WallTime: int64(bi*benchmarkEvents + i + 2)}, - []byte("this is value"))) - } - } - - p.syncEventAndRegistrations() - - // Sanity check that subscription was not dropped. - if p.reg.Len() == 0 { - require.NoError(b, waitErrorFuture(&r1Done)) - } -} - // TestSizeOfEvent tests the size of the event struct. It is fine if this struct // changes in size, as long as this is done consciously. func TestSizeOfEvent(t *testing.T) { From 35ceb1ab49f85a24b97f64ed15c7136d6d0ed367 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 25 Jul 2023 11:26:07 +0000 Subject: [PATCH 5/6] rangefeed: add checkpoint variants of `BenchmarkRangefeed` Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/bench_test.go | 78 +++++++++++++++++-------- 1 file changed, 55 insertions(+), 23 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index e6db218ea125..8d22f936d57d 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -29,21 +29,32 @@ import ( ) type benchmarkRangefeedOpts struct { + opType opType numRegistrations int budget int64 } +type opType string + +const ( + writeOpType opType = "write" + closedTSOpType opType = "closedts" +) + // BenchmarkRangefeed benchmarks the processor and registrations, by submitting // a set of events and waiting until they are all emitted. func BenchmarkRangefeed(b *testing.B) { - for _, numRegistrations := range []int{1, 10, 100} { - name := fmt.Sprintf("numRegs=%d", numRegistrations) - b.Run(name, func(b *testing.B) { - runBenchmarkRangefeed(b, benchmarkRangefeedOpts{ - numRegistrations: numRegistrations, - budget: math.MaxInt64, + for _, opType := range []opType{writeOpType, closedTSOpType} { + for _, numRegistrations := range []int{1, 10, 100} { + name := fmt.Sprintf("opType=%s/numRegs=%d", opType, numRegistrations) + b.Run(name, func(b *testing.B) { + runBenchmarkRangefeed(b, benchmarkRangefeedOpts{ + opType: opType, + numRegistrations: numRegistrations, + budget: math.MaxInt64, + }) }) - }) + } } } @@ -58,6 +69,7 @@ func BenchmarkRangefeedBudget(b *testing.B) { budgetSize = math.MaxInt64 } runBenchmarkRangefeed(b, benchmarkRangefeedOpts{ + opType: writeOpType, numRegistrations: 1, budget: budgetSize, }) @@ -107,19 +119,34 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { } // Construct b.N events beforehand -- we don't want to measure this cost. - prefix := roachpb.Key("key-") - value := roachpb.MakeValueFromString("a few bytes of data").RawBytes - ops := make([]enginepb.MVCCLogicalOp, 0, b.N) - - for i := 0; i < b.N; i++ { - key := append(prefix, make([]byte, 4)...) - binary.BigEndian.PutUint32(key[len(prefix):], uint32(i)) - ts := hlc.Timestamp{WallTime: int64(i + 1)} - ops = append(ops, makeLogicalOp(&enginepb.MVCCWriteValueOp{ - Key: key, - Timestamp: ts, - Value: value, - })) + var ( + logicalOps []enginepb.MVCCLogicalOp + closedTimestamps []hlc.Timestamp + prefix = roachpb.Key("key-") + value = roachpb.MakeValueFromString("a few bytes of data").RawBytes + ) + switch opts.opType { + case writeOpType: + logicalOps = make([]enginepb.MVCCLogicalOp, 0, b.N) + for i := 0; i < b.N; i++ { + key := append(prefix, make([]byte, 4)...) + binary.BigEndian.PutUint32(key[len(prefix):], uint32(i)) + ts := hlc.Timestamp{WallTime: int64(i + 1)} + logicalOps = append(logicalOps, makeLogicalOp(&enginepb.MVCCWriteValueOp{ + Key: key, + Timestamp: ts, + Value: value, + })) + } + + case closedTSOpType: + closedTimestamps = make([]hlc.Timestamp, 0, b.N) + for i := 0; i < b.N; i++ { + closedTimestamps = append(closedTimestamps, hlc.Timestamp{WallTime: int64(i + 1)}) + } + + default: + b.Fatalf("unknown operation type %q", opts.opType) } // Wait for catchup scans and flush checkpoint events. @@ -128,9 +155,14 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { // Run the benchmark. We accounted for b.N when constructing events. b.ResetTimer() - for _, op := range ops { - if !p.ConsumeLogicalOps(ctx, op) { - b.Fatal("failed to submit op") + for _, logicalOp := range logicalOps { + if !p.ConsumeLogicalOps(ctx, logicalOp) { + b.Fatal("failed to submit logical operation") + } + } + for _, closedTS := range closedTimestamps { + if !p.ForwardClosedTS(ctx, closedTS) { + b.Fatal("failed to forward closed timestamp") } } p.syncEventAndRegistrations() From 11f8799101454dffda9621ad34efcfb965a745b5 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 25 Jul 2023 11:55:37 +0000 Subject: [PATCH 6/6] rangefeed: add commits variant to `BenchmarkRangefeed` Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/bench_test.go | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go index 8d22f936d57d..637381f271a2 100644 --- a/pkg/kv/kvserver/rangefeed/bench_test.go +++ b/pkg/kv/kvserver/rangefeed/bench_test.go @@ -19,12 +19,14 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/future" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -37,14 +39,15 @@ type benchmarkRangefeedOpts struct { type opType string const ( - writeOpType opType = "write" + writeOpType opType = "write" // individual 1PC writes + commitOpType opType = "commit" // intent + commit writes, 1 per txn closedTSOpType opType = "closedts" ) // BenchmarkRangefeed benchmarks the processor and registrations, by submitting // a set of events and waiting until they are all emitted. func BenchmarkRangefeed(b *testing.B) { - for _, opType := range []opType{writeOpType, closedTSOpType} { + for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} { for _, numRegistrations := range []int{1, 10, 100} { name := fmt.Sprintf("opType=%s/numRegs=%d", opType, numRegistrations) b.Run(name, func(b *testing.B) { @@ -139,6 +142,21 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) { })) } + case commitOpType: + logicalOps = make([]enginepb.MVCCLogicalOp, 2*b.N) + // Write all intents first, then all commits. Txns are tracked in an + // internal heap, and we want to cover some of this cost, even though we + // write and commit them incrementally. + for i := 0; i < b.N; i++ { + var txnID uuid.UUID + txnID.DeterministicV4(uint64(i), uint64(b.N)) + key := append(prefix, make([]byte, 4)...) + binary.BigEndian.PutUint32(key[len(prefix):], uint32(i)) + ts := hlc.Timestamp{WallTime: int64(i + 1)} + logicalOps[i] = writeIntentOpWithKey(txnID, key, isolation.Serializable, ts) + logicalOps[b.N+i] = commitIntentOpWithKV(txnID, key, ts, value) + } + case closedTSOpType: closedTimestamps = make([]hlc.Timestamp, 0, b.N) for i := 0; i < b.N; i++ {