From 43f16c92eed71a8130585fb768a13bdcfb1b309f Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sat, 24 Sep 2022 19:28:41 -0400 Subject: [PATCH] changefeedccl: Uniformly distribute work during export. By default, changefeed distributes the work to nodes based on which nodes are the lease holder for the ranges. This makes sense since running rangefeed against local node is more efficient. In a cluster where ranges are almost uniformly assigned to each node, running changefeed export is efficient: all nodes are busy, until they are done. KV server is responsible for making sure that the ranges are more or less uniformly distributed across the cluster; however, this determination is based on the set of all ranges in the cluster, and not based on a particular table. As a result, it is possible to have a table that does not uniform distribution of its ranges across all the nodes. When this happens, the changefeed export would take long time due to the long tail: as each node completes its set of assigned ranges, it idles until changefeed completes. This PR introduces a change (controlled via `changefeed.balance_range_distribution.enable` setting) where the changefeed try to produce a more balanced assignment, where each node is responsible for roughly 1/Nth of the work for the cluster of N nodes. Release note (enterprise change): Changefeed exports are up to 25% faster due to uniform work assignment. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/ccl/changefeedccl/BUILD.bazel | 2 + pkg/ccl/changefeedccl/changefeed_dist.go | 134 ++++++++++++++++-- pkg/ccl/changefeedccl/changefeed_test.go | 81 +++++++++++ pkg/ccl/changefeedccl/kvfeed/scanner.go | 5 +- 6 files changed, 212 insertions(+), 12 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index fe6df50d8a6a..cabf950b7f1d 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -10,6 +10,7 @@ bulkio.backup.file_size byte size 128 MiB target size for individual data files bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up +changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 858b99217e83..1085adbec6a3 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -16,6 +16,7 @@ bulkio.backup.read_timeoutduration5m0samount of time after which a read attempt is considered timed out, which causes the backup to fail bulkio.backup.read_with_priority_afterduration1m0samount of time since the read-as-of time above which a BACKUP should use priority when retrying reads bulkio.stream_ingestion.minimum_flush_intervalduration5sthe minimum timestamp between flushes; flushes may still occur if internal buffers fill up +changefeed.balance_range_distribution.enablebooleanfalseif enabled, the ranges are balanced equally among all nodes changefeed.node_throttle_configstringspecifies node level throttling configuration for all changefeeeds changefeed.schema_feed.read_with_priority_afterduration1m0sretry with high priority if we were not able to read descriptors for too long; 0 disables cloudstorage.http.custom_castringcustom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 964e9d280fee..a033ffb67af3 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -58,6 +58,7 @@ go_library( "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/protectedts", @@ -96,6 +97,7 @@ go_library( "//pkg/sql/sessiondatapb", "//pkg/sql/sqlutil", "//pkg/sql/types", + "//pkg/util", "//pkg/util/bitarray", "//pkg/util/bufalloc", "//pkg/util/cache", diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 91210a4e10a9..3952e4729121 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -10,12 +10,15 @@ package changefeedccl import ( "context" + "sort" "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" @@ -29,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" @@ -307,6 +311,14 @@ func startDistChangefeed( return err } +var enableBalancedRangeDistribution = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.balance_range_distribution.enable", + "if enabled, the ranges are balanced equally among all nodes", + util.ConstantWithMetamorphicTestBool( + "changefeed.balance_range_distribution.enable", false), +).WithPublic() + func makePlan( execCtx sql.JobExecContext, jobID jobspb.JobID, @@ -316,24 +328,41 @@ func makePlan( trackedSpans []roachpb.Span, selectClause string, ) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { - return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { var blankTxn *kv.Txn + distMode := sql.DistributionTypeAlways + if details.SinkURI == `` { + // Sinkless feeds get one ChangeAggregator on this node. + distMode = sql.DistributionTypeNone + } + planCtx := dsp.NewPlanningCtx(ctx, execCtx.ExtendedEvalContext(), nil /* planner */, blankTxn, - sql.DistributionTypeAlways) + sql.DistributionType(distMode)) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans) + if err != nil { + return nil, nil, err + } - var spanPartitions []sql.SpanPartition - if details.SinkURI == `` { - // Sinkless feeds get one ChangeAggregator on the gateway. - spanPartitions = []sql.SpanPartition{{SQLInstanceID: dsp.GatewayID(), Spans: trackedSpans}} - } else { - // All other feeds get a ChangeAggregator local on the leaseholder. - var err error - spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, trackedSpans) + sv := &execCtx.ExecCfg().Settings.SV + if enableBalancedRangeDistribution.Get(sv) { + scanType, err := changefeedbase.MakeStatementOptions(details.Opts).GetInitialScanType() if err != nil { return nil, nil, err } + + // Currently, balanced range distribution supported only in export mode. + // TODO(yevgeniy): Consider lifting this restriction. + if scanType == changefeedbase.OnlyInitialScan { + sender := execCtx.ExecCfg().DB.NonTransactionalSender() + distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender) + + spanPartitions, err = rebalanceSpanPartitions( + ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions) + if err != nil { + return nil, nil, err + } + } } // Use the same checkpoint for all aggregators; each aggregator will only look at @@ -442,3 +471,88 @@ func (w *changefeedResultWriter) SetError(err error) { func (w *changefeedResultWriter) Err() error { return w.err } + +var rebalanceThreshold = settings.RegisterFloatSetting( + settings.TenantWritable, + "changefeed.balance_range_distribution.sensitivity", + "rebalance if the number of ranges on a node exceeds the average by this fraction", + 0.05, + settings.PositiveFloat, +) + +type rangeResolver interface { + getRangesForSpans(ctx context.Context, spans []roachpb.Span) ([]roachpb.Span, error) +} + +type distResolver struct { + *kvcoord.DistSender +} + +func (r *distResolver) getRangesForSpans( + ctx context.Context, spans []roachpb.Span, +) ([]roachpb.Span, error) { + return kvfeed.AllRangeSpans(ctx, r.DistSender, spans) +} + +func rebalanceSpanPartitions( + ctx context.Context, r rangeResolver, sensitivity float64, p []sql.SpanPartition, +) ([]sql.SpanPartition, error) { + if len(p) <= 1 { + return p, nil + } + + // Explode set of spans into set of ranges. + // TODO(yevgeniy): This might not be great if the tables are huge. + numRanges := 0 + for i := range p { + spans, err := r.getRangesForSpans(ctx, p[i].Spans) + if err != nil { + return nil, err + } + p[i].Spans = spans + numRanges += len(spans) + } + + // Sort descending based on the number of ranges. + sort.Slice(p, func(i, j int) bool { + return len(p[i].Spans) > len(p[j].Spans) + }) + + targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p))) + + for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; { + from, to := i, j + + // Figure out how many ranges we can move. + numToMove := len(p[from].Spans) - targetRanges + canMove := targetRanges - len(p[to].Spans) + if numToMove <= canMove { + i++ + } + if canMove <= numToMove { + numToMove = canMove + j-- + } + if numToMove == 0 { + break + } + + // Move numToMove spans from 'from' to 'to'. + idx := len(p[from].Spans) - numToMove + p[to].Spans = append(p[to].Spans, p[from].Spans[idx:]...) + p[from].Spans = p[from].Spans[:idx] + } + + // Collapse ranges into nice set of contiguous spans. + for i := range p { + var g roachpb.SpanGroup + g.Add(p[i].Spans...) + p[i].Spans = g.Slice() + } + + // Finally, re-sort based on the node id. + sort.Slice(p, func(i, j int) bool { + return p[i].SQLInstanceID < p[j].SQLInstanceID + }) + return p, nil +} diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index d09f22fc8958..7059c3312e14 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7450,3 +7450,84 @@ func TestRedactedSchemaRegistry(t *testing.T) { // kafka supports the confluent_schema_registry option. cdcTest(t, testFn, feedTestForceSink("kafka")) } + +type echoResolver struct { + result []roachpb.Spans + pos int +} + +func (r *echoResolver) getRangesForSpans( + _ context.Context, _ []roachpb.Span, +) (spans []roachpb.Span, _ error) { + spans = r.result[r.pos] + r.pos++ + return spans, nil +} + +func TestPartitionSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + + partitions := func(p ...sql.SpanPartition) []sql.SpanPartition { + return p + } + mkPart := func(n base.SQLInstanceID, spans ...roachpb.Span) sql.SpanPartition { + return sql.SpanPartition{SQLInstanceID: n, Spans: spans} + } + mkSpan := func(start, end string) roachpb.Span { + return roachpb.Span{Key: []byte(start), EndKey: []byte(end)} + } + spans := func(s ...roachpb.Span) roachpb.Spans { + return s + } + const sensitivity = 0.01 + + for i, tc := range []struct { + input []sql.SpanPartition + resolve []roachpb.Spans + expect []sql.SpanPartition + }{ + { + input: partitions( + mkPart(1, mkSpan("a", "j")), + mkPart(2, mkSpan("j", "q")), + mkPart(3, mkSpan("q", "z")), + ), + // 6 total ranges, 2 per node. + resolve: []roachpb.Spans{ + spans(mkSpan("a", "c"), mkSpan("c", "e"), mkSpan("e", "j")), + spans(mkSpan("j", "q")), + spans(mkSpan("q", "y"), mkSpan("y", "z")), + }, + expect: partitions( + mkPart(1, mkSpan("a", "e")), + mkPart(2, mkSpan("e", "q")), + mkPart(3, mkSpan("q", "z")), + ), + }, + { + input: partitions( + mkPart(1, mkSpan("a", "c"), mkSpan("e", "p"), mkSpan("r", "z")), + mkPart(2), + mkPart(3, mkSpan("c", "e"), mkSpan("p", "r")), + ), + // 5 total ranges -- on 2 nodes; target should be 1 per node. + resolve: []roachpb.Spans{ + spans(mkSpan("a", "c"), mkSpan("e", "p"), mkSpan("r", "z")), + spans(), + spans(mkSpan("c", "e"), mkSpan("p", "r")), + }, + expect: partitions( + mkPart(1, mkSpan("a", "c"), mkSpan("e", "p")), + mkPart(2, mkSpan("r", "z")), + mkPart(3, mkSpan("c", "e"), mkSpan("p", "r")), + ), + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + sp, err := rebalanceSpanPartitions(context.Background(), + &echoResolver{result: tc.resolve}, sensitivity, tc.input) + require.NoError(t, err) + require.Equal(t, tc.expect, sp) + }) + } +} diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index 36c3f6887c72..e8f5b3ee0294 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -187,7 +187,7 @@ func (p *scanRequestScanner) exportSpan( func getSpansToProcess( ctx context.Context, ds *kvcoord.DistSender, targetSpans []roachpb.Span, ) ([]roachpb.Span, error) { - ranges, err := allRangeSpans(ctx, ds, targetSpans) + ranges, err := AllRangeSpans(ctx, ds, targetSpans) if err != nil { return nil, err } @@ -254,7 +254,8 @@ func slurpScanResponse( return nil } -func allRangeSpans( +// AllRangeSpans returns the list of all ranges that for the specified list of spans. +func AllRangeSpans( ctx context.Context, ds *kvcoord.DistSender, spans []roachpb.Span, ) ([]roachpb.Span, error) {