diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index fe6df50d8a6a..cabf950b7f1d 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -10,6 +10,7 @@ bulkio.backup.file_size byte size 128 MiB target size for individual data files
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
+changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 858b99217e83..1085adbec6a3 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -16,6 +16,7 @@
bulkio.backup.read_timeout | duration | 5m0s | amount of time after which a read attempt is considered timed out, which causes the backup to fail |
bulkio.backup.read_with_priority_after | duration | 1m0s | amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads |
bulkio.stream_ingestion.minimum_flush_interval | duration | 5s | the minimum timestamp between flushes; flushes may still occur if internal buffers fill up |
+changefeed.balance_range_distribution.enable | boolean | false | if enabled, the ranges are balanced equally among all nodes |
changefeed.node_throttle_config | string |
| specifies node level throttling configuration for all changefeeeds |
changefeed.schema_feed.read_with_priority_after | duration | 1m0s | retry with high priority if we were not able to read descriptors for too long; 0 disables |
cloudstorage.http.custom_ca | string |
| custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage |
diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel
index 964e9d280fee..a033ffb67af3 100644
--- a/pkg/ccl/changefeedccl/BUILD.bazel
+++ b/pkg/ccl/changefeedccl/BUILD.bazel
@@ -58,6 +58,7 @@ go_library(
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
+ "//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/protectedts",
@@ -96,6 +97,7 @@ go_library(
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
+ "//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/cache",
diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go
index 91210a4e10a9..3952e4729121 100644
--- a/pkg/ccl/changefeedccl/changefeed_dist.go
+++ b/pkg/ccl/changefeedccl/changefeed_dist.go
@@ -10,12 +10,15 @@ package changefeedccl
import (
"context"
+ "sort"
"time"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
+ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
@@ -29,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
+ "github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
@@ -307,6 +311,14 @@ func startDistChangefeed(
return err
}
+var enableBalancedRangeDistribution = settings.RegisterBoolSetting(
+ settings.TenantWritable,
+ "changefeed.balance_range_distribution.enable",
+ "if enabled, the ranges are balanced equally among all nodes",
+ util.ConstantWithMetamorphicTestBool(
+ "changefeed.balance_range_distribution.enable", false),
+).WithPublic()
+
func makePlan(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
@@ -316,24 +328,41 @@ func makePlan(
trackedSpans []roachpb.Span,
selectClause string,
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
-
return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
var blankTxn *kv.Txn
+ distMode := sql.DistributionTypeAlways
+ if details.SinkURI == `` {
+ // Sinkless feeds get one ChangeAggregator on this node.
+ distMode = sql.DistributionTypeNone
+ }
+
planCtx := dsp.NewPlanningCtx(ctx, execCtx.ExtendedEvalContext(), nil /* planner */, blankTxn,
- sql.DistributionTypeAlways)
+ sql.DistributionType(distMode))
+ spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans)
+ if err != nil {
+ return nil, nil, err
+ }
- var spanPartitions []sql.SpanPartition
- if details.SinkURI == `` {
- // Sinkless feeds get one ChangeAggregator on the gateway.
- spanPartitions = []sql.SpanPartition{{SQLInstanceID: dsp.GatewayID(), Spans: trackedSpans}}
- } else {
- // All other feeds get a ChangeAggregator local on the leaseholder.
- var err error
- spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, trackedSpans)
+ sv := &execCtx.ExecCfg().Settings.SV
+ if enableBalancedRangeDistribution.Get(sv) {
+ scanType, err := changefeedbase.MakeStatementOptions(details.Opts).GetInitialScanType()
if err != nil {
return nil, nil, err
}
+
+ // Currently, balanced range distribution supported only in export mode.
+ // TODO(yevgeniy): Consider lifting this restriction.
+ if scanType == changefeedbase.OnlyInitialScan {
+ sender := execCtx.ExecCfg().DB.NonTransactionalSender()
+ distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
+
+ spanPartitions, err = rebalanceSpanPartitions(
+ ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
}
// Use the same checkpoint for all aggregators; each aggregator will only look at
@@ -442,3 +471,88 @@ func (w *changefeedResultWriter) SetError(err error) {
func (w *changefeedResultWriter) Err() error {
return w.err
}
+
+var rebalanceThreshold = settings.RegisterFloatSetting(
+ settings.TenantWritable,
+ "changefeed.balance_range_distribution.sensitivity",
+ "rebalance if the number of ranges on a node exceeds the average by this fraction",
+ 0.05,
+ settings.PositiveFloat,
+)
+
+type rangeResolver interface {
+ getRangesForSpans(ctx context.Context, spans []roachpb.Span) ([]roachpb.Span, error)
+}
+
+type distResolver struct {
+ *kvcoord.DistSender
+}
+
+func (r *distResolver) getRangesForSpans(
+ ctx context.Context, spans []roachpb.Span,
+) ([]roachpb.Span, error) {
+ return kvfeed.AllRangeSpans(ctx, r.DistSender, spans)
+}
+
+func rebalanceSpanPartitions(
+ ctx context.Context, r rangeResolver, sensitivity float64, p []sql.SpanPartition,
+) ([]sql.SpanPartition, error) {
+ if len(p) <= 1 {
+ return p, nil
+ }
+
+ // Explode set of spans into set of ranges.
+ // TODO(yevgeniy): This might not be great if the tables are huge.
+ numRanges := 0
+ for i := range p {
+ spans, err := r.getRangesForSpans(ctx, p[i].Spans)
+ if err != nil {
+ return nil, err
+ }
+ p[i].Spans = spans
+ numRanges += len(spans)
+ }
+
+ // Sort descending based on the number of ranges.
+ sort.Slice(p, func(i, j int) bool {
+ return len(p[i].Spans) > len(p[j].Spans)
+ })
+
+ targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p)))
+
+ for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; {
+ from, to := i, j
+
+ // Figure out how many ranges we can move.
+ numToMove := len(p[from].Spans) - targetRanges
+ canMove := targetRanges - len(p[to].Spans)
+ if numToMove <= canMove {
+ i++
+ }
+ if canMove <= numToMove {
+ numToMove = canMove
+ j--
+ }
+ if numToMove == 0 {
+ break
+ }
+
+ // Move numToMove spans from 'from' to 'to'.
+ idx := len(p[from].Spans) - numToMove
+ p[to].Spans = append(p[to].Spans, p[from].Spans[idx:]...)
+ p[from].Spans = p[from].Spans[:idx]
+ }
+
+ // Collapse ranges into nice set of contiguous spans.
+ for i := range p {
+ var g roachpb.SpanGroup
+ g.Add(p[i].Spans...)
+ p[i].Spans = g.Slice()
+ }
+
+ // Finally, re-sort based on the node id.
+ sort.Slice(p, func(i, j int) bool {
+ return p[i].SQLInstanceID < p[j].SQLInstanceID
+ })
+ return p, nil
+}
diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go
index d09f22fc8958..7059c3312e14 100644
--- a/pkg/ccl/changefeedccl/changefeed_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_test.go
@@ -7450,3 +7450,84 @@ func TestRedactedSchemaRegistry(t *testing.T) {
// kafka supports the confluent_schema_registry option.
cdcTest(t, testFn, feedTestForceSink("kafka"))
}
+
+type echoResolver struct {
+ result []roachpb.Spans
+ pos int
+}
+
+func (r *echoResolver) getRangesForSpans(
+ _ context.Context, _ []roachpb.Span,
+) (spans []roachpb.Span, _ error) {
+ spans = r.result[r.pos]
+ r.pos++
+ return spans, nil
+}
+
+func TestPartitionSpans(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+
+ partitions := func(p ...sql.SpanPartition) []sql.SpanPartition {
+ return p
+ }
+ mkPart := func(n base.SQLInstanceID, spans ...roachpb.Span) sql.SpanPartition {
+ return sql.SpanPartition{SQLInstanceID: n, Spans: spans}
+ }
+ mkSpan := func(start, end string) roachpb.Span {
+ return roachpb.Span{Key: []byte(start), EndKey: []byte(end)}
+ }
+ spans := func(s ...roachpb.Span) roachpb.Spans {
+ return s
+ }
+ const sensitivity = 0.01
+
+ for i, tc := range []struct {
+ input []sql.SpanPartition
+ resolve []roachpb.Spans
+ expect []sql.SpanPartition
+ }{
+ {
+ input: partitions(
+ mkPart(1, mkSpan("a", "j")),
+ mkPart(2, mkSpan("j", "q")),
+ mkPart(3, mkSpan("q", "z")),
+ ),
+ // 6 total ranges, 2 per node.
+ resolve: []roachpb.Spans{
+ spans(mkSpan("a", "c"), mkSpan("c", "e"), mkSpan("e", "j")),
+ spans(mkSpan("j", "q")),
+ spans(mkSpan("q", "y"), mkSpan("y", "z")),
+ },
+ expect: partitions(
+ mkPart(1, mkSpan("a", "e")),
+ mkPart(2, mkSpan("e", "q")),
+ mkPart(3, mkSpan("q", "z")),
+ ),
+ },
+ {
+ input: partitions(
+ mkPart(1, mkSpan("a", "c"), mkSpan("e", "p"), mkSpan("r", "z")),
+ mkPart(2),
+ mkPart(3, mkSpan("c", "e"), mkSpan("p", "r")),
+ ),
+ // 5 total ranges -- on 2 nodes; target should be 1 per node.
+ resolve: []roachpb.Spans{
+ spans(mkSpan("a", "c"), mkSpan("e", "p"), mkSpan("r", "z")),
+ spans(),
+ spans(mkSpan("c", "e"), mkSpan("p", "r")),
+ },
+ expect: partitions(
+ mkPart(1, mkSpan("a", "c"), mkSpan("e", "p")),
+ mkPart(2, mkSpan("r", "z")),
+ mkPart(3, mkSpan("c", "e"), mkSpan("p", "r")),
+ ),
+ },
+ } {
+ t.Run(strconv.Itoa(i), func(t *testing.T) {
+ sp, err := rebalanceSpanPartitions(context.Background(),
+ &echoResolver{result: tc.resolve}, sensitivity, tc.input)
+ require.NoError(t, err)
+ require.Equal(t, tc.expect, sp)
+ })
+ }
+}
diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go
index 36c3f6887c72..e8f5b3ee0294 100644
--- a/pkg/ccl/changefeedccl/kvfeed/scanner.go
+++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go
@@ -187,7 +187,7 @@ func (p *scanRequestScanner) exportSpan(
func getSpansToProcess(
ctx context.Context, ds *kvcoord.DistSender, targetSpans []roachpb.Span,
) ([]roachpb.Span, error) {
- ranges, err := allRangeSpans(ctx, ds, targetSpans)
+ ranges, err := AllRangeSpans(ctx, ds, targetSpans)
if err != nil {
return nil, err
}
@@ -254,7 +254,8 @@ func slurpScanResponse(
return nil
}
-func allRangeSpans(
+// AllRangeSpans returns the list of all ranges that for the specified list of spans.
+func AllRangeSpans(
ctx context.Context, ds *kvcoord.DistSender, spans []roachpb.Span,
) ([]roachpb.Span, error) {