Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: reduce rebalancing memory usage from O(ranges) to O(spans) #115375

Merged
merged 3 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ go_test(
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
221 changes: 162 additions & 59 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ package changefeedccl

import (
"context"
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -35,7 +37,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -390,7 +394,7 @@ func makePlan(
return nil, nil, err
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "spans returned by DistSQL: %s", spanPartitions)
log.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
}
switch {
case distMode == sql.LocalDistribution || rangeDistribution == int64(defaultDistribution):
Expand All @@ -400,14 +404,14 @@ func makePlan(
}
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)

ri := kvcoord.MakeRangeIterator(distSender)
spanPartitions, err = rebalanceSpanPartitions(
ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions)
ctx, &ri, rebalanceThreshold.Get(sv), spanPartitions)
if err != nil {
return nil, nil, err
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "spans after balanced simple distribution rebalancing: %s", spanPartitions)
log.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
}
default:
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
Expand Down Expand Up @@ -440,7 +444,7 @@ func makePlan(
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
for i, sp := range spanPartitions {
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "watched spans for node %d: %s", sp.SQLInstanceID, sp)
log.Infof(ctx, "watched spans for node %d: %v", sp.SQLInstanceID, sp)
}
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
for watchIdx, nodeSpan := range sp.Spans {
Expand Down Expand Up @@ -549,6 +553,7 @@ func (w *changefeedResultWriter) Err() error {
return w.err
}

// TODO(#120427): improve this to be more useful.
var rebalanceThreshold = settings.RegisterFloatSetting(
settings.ApplicationLevel,
"changefeed.balance_range_distribution.sensitivity",
Expand All @@ -557,80 +562,178 @@ var rebalanceThreshold = settings.RegisterFloatSetting(
settings.PositiveFloat,
)

type rangeResolver interface {
getRangesForSpans(ctx context.Context, spans []roachpb.Span) ([]roachpb.Span, error)
type rangeIterator interface {
Desc() *roachpb.RangeDescriptor
NeedAnother(rs roachpb.RSpan) bool
Valid() bool
Error() error
Next(ctx context.Context)
Seek(ctx context.Context, key roachpb.RKey, scanDir kvcoord.ScanDirection)
}

type distResolver struct {
*kvcoord.DistSender
// rebalancingPartition is a container used to store a partition undergoing
// rebalancing.
type rebalancingPartition struct {
// These fields store the current number of ranges and spans in this partition.
// They are initialized corresponding to the sql.SpanPartition partition below
// and mutated during rebalancing.
numRanges int
group roachpb.SpanGroup

// The original span partition corresponding to this bucket and its
// index in the original []sql.SpanPartition.
part sql.SpanPartition
pIdx int
}

func (r *distResolver) getRangesForSpans(
ctx context.Context, spans []roachpb.Span,
) ([]roachpb.Span, error) {
spans, _, err := r.DistSender.AllRangeSpans(ctx, spans)
return spans, err
}
// Setting expensiveReblanceChecksEnabled = true will cause re-balancing to
// panic if the output list of partitions does not cover the same keys as the
// input list of partitions.
var expensiveReblanceChecksEnabled = buildutil.CrdbTestBuild || envutil.EnvOrDefaultBool(
"COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS", false)

func rebalanceSpanPartitions(
ctx context.Context, r rangeResolver, sensitivity float64, p []sql.SpanPartition,
ctx context.Context, ri rangeIterator, sensitivity float64, partitions []sql.SpanPartition,
) ([]sql.SpanPartition, error) {
if len(p) <= 1 {
return p, nil
if len(partitions) <= 1 {
return partitions, nil
}

// Explode set of spans into set of ranges.
// TODO(yevgeniy): This might not be great if the tables are huge.
numRanges := 0
for i := range p {
spans, err := r.getRangesForSpans(ctx, p[i].Spans)
if err != nil {
return nil, err
// Create partition builder structs for the partitions array above.
var builders = make([]rebalancingPartition, len(partitions))
var totalRanges int
for i, p := range partitions {
builders[i].part = p
builders[i].pIdx = i
nRanges, ok := p.NumRanges()
// We cannot rebalance if we're missing range information.
if !ok {
log.Warning(ctx, "skipping rebalance due to missing range info")
return partitions, nil
}
p[i].Spans = spans
numRanges += len(spans)
builders[i].numRanges = nRanges
totalRanges += nRanges
builders[i].group.Add(p.Spans...)
}

// Sort descending based on the number of ranges.
sort.Slice(p, func(i, j int) bool {
return len(p[i].Spans) > len(p[j].Spans)
sort.Slice(builders, func(i, j int) bool {
return builders[i].numRanges > builders[j].numRanges
})

targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p)))

for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; {
from, to := i, j

// Figure out how many ranges we can move.
numToMove := len(p[from].Spans) - targetRanges
canMove := targetRanges - len(p[to].Spans)
if numToMove <= canMove {
i++
}
if canMove <= numToMove {
numToMove = canMove
j--
targetRanges := int(math.Ceil((1 + sensitivity) * float64(totalRanges) / float64(len(partitions))))
to := len(builders) - 1
from := 0

// In each iteration of the outer loop, check if `from` has too many ranges.
// If so, move them to other partitions which need more ranges
// starting from `to` and moving down. Otherwise, increment `from` and check
// again.
for ; from < to && builders[from].numRanges > targetRanges; from++ {
// numToMove is the number of ranges which need to be moved out of `from`
// to other partitions.
numToMove := builders[from].numRanges - targetRanges
count := 0
needMore := func() bool {
return count < numToMove
}
if numToMove == 0 {
break
// Iterate over all the spans in `from`.
for spanIdx := 0; from < to && needMore() && spanIdx < len(builders[from].part.Spans); spanIdx++ {
sp := builders[from].part.Spans[spanIdx]
rSpan, err := keys.SpanAddr(sp)
if err != nil {
return nil, err
}
// Iterate over the ranges in the current span.
for ri.Seek(ctx, rSpan.Key, kvcoord.Ascending); from < to && needMore(); ri.Next(ctx) {
// Error check.
if !ri.Valid() {
return nil, ri.Error()
}

// Move one range from `from` to `to`.
count += 1
builders[from].numRanges -= 1
builders[to].numRanges += 1
// If the range boundaries are outside the original span, trim
// the range.
startKey := ri.Desc().StartKey
if startKey.Compare(rSpan.Key) == -1 {
startKey = rSpan.Key
}
endKey := ri.Desc().EndKey
if endKey.Compare(rSpan.EndKey) == 1 {
endKey = rSpan.EndKey
}
diff := roachpb.Span{
Key: startKey.AsRawKey(), EndKey: endKey.AsRawKey(),
}
builders[from].group.Sub(diff)
builders[to].group.Add(diff)

// Since we moved a range, `to` may have enough ranges.
// Decrement `to` until we find a new partition which needs more
// ranges.
for from < to && builders[to].numRanges >= targetRanges {
to--
}
// No more ranges in this span.
if !ri.NeedAnother(rSpan) {
break
}
}
}
}

// Move numToMove spans from 'from' to 'to'.
idx := len(p[from].Spans) - numToMove
p[to].Spans = append(p[to].Spans, p[from].Spans[idx:]...)
p[from].Spans = p[from].Spans[:idx]
// Overwrite the original partitions slice with the balanced partitions.
for _, b := range builders {
partitions[b.pIdx] = sql.MakeSpanPartitionWithRangeCount(
b.part.SQLInstanceID, b.group.Slice(), b.numRanges)
}

// Collapse ranges into nice set of contiguous spans.
for i := range p {
var g roachpb.SpanGroup
g.Add(p[i].Spans...)
p[i].Spans = g.Slice()
if err := verifyPartitionsIfExpensiveChecksEnabled(builders, partitions, targetRanges); err != nil {
return nil, err
}

// Finally, re-sort based on the node id.
sort.Slice(p, func(i, j int) bool {
return p[i].SQLInstanceID < p[j].SQLInstanceID
})
return p, nil
return partitions, nil
}

// verifyPartitionsIfExpensiveChecksEnabled panics if the output partitions
// cover a different set of keys than the input partitions.
func verifyPartitionsIfExpensiveChecksEnabled(
builderWithInputSpans []rebalancingPartition,
outputPartitions []sql.SpanPartition,
targetRanges int,
) error {
if !expensiveReblanceChecksEnabled {
return nil
}
var originalSpansG roachpb.SpanGroup
var originalSpansArr []roachpb.Span
var newSpansG roachpb.SpanGroup
var newSpansArr []roachpb.Span
for _, b := range builderWithInputSpans {
originalSpansG.Add(b.part.Spans...)
originalSpansArr = append(originalSpansArr, b.part.Spans...)
}
for _, p := range outputPartitions {
if numRanges, ok := p.NumRanges(); !ok {
return changefeedbase.WithTerminalError(
errors.Newf("partition missing number of ranges info, partition: %v, partitions: %v", p, outputPartitions))
} else if numRanges > targetRanges {
return changefeedbase.WithTerminalError(
errors.Newf("found partition with too many ranges, target: %d, partition: %v, partitions: %v",
targetRanges, p, outputPartitions))
}

newSpansG.Add(p.Spans...)
newSpansArr = append(newSpansArr, p.Spans...)
}
// If the original spans enclose the new spans and the new spans enclose the original spans,
// then the two groups must cover exactly the same keys.
if !originalSpansG.Encloses(newSpansArr...) || !newSpansG.Encloses(originalSpansArr...) {
return changefeedbase.WithTerminalError(errors.Newf("incorrect rebalance. input spans: %v, output spans: %v",
originalSpansArr, newSpansArr))
}
return nil
}
Loading
Loading