Skip to content

Commit

Permalink
changefeedccl: reduce rebalancing memory usage from O(ranges) to O(sp…
Browse files Browse the repository at this point in the history
…ans)

Previously, the `rebalanceSpanPartitions` would use O(ranges) memory. This change
rewrites it to use range iterators, reducing the memory usage to O(spans).

This change also adds a randomized test to assert that all spans are accounted for after
rebalancing. It also adds one more unit test.

Informs: #113898
Epic: None
  • Loading branch information
jayshrivastava committed Mar 18, 2024
1 parent 82ed768 commit ab3298b
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 92 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ go_test(
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
168 changes: 109 additions & 59 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ package changefeedccl

import (
"context"
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -400,9 +402,9 @@ func makePlan(
}
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)

ri := kvcoord.MakeRangeIterator(distSender)
spanPartitions, err = rebalanceSpanPartitions(
ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions)
ctx, &ri, rebalanceThreshold.Get(sv), spanPartitions)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -549,6 +551,7 @@ func (w *changefeedResultWriter) Err() error {
return w.err
}

// TODO(#120427): improve this to be more useful.
var rebalanceThreshold = settings.RegisterFloatSetting(
settings.ApplicationLevel,
"changefeed.balance_range_distribution.sensitivity",
Expand All @@ -557,80 +560,127 @@ var rebalanceThreshold = settings.RegisterFloatSetting(
settings.PositiveFloat,
)

type rangeResolver interface {
getRangesForSpans(ctx context.Context, spans []roachpb.Span) ([]roachpb.Span, error)
}

type distResolver struct {
*kvcoord.DistSender
type rangeIterator interface {
Desc() *roachpb.RangeDescriptor
NeedAnother(rs roachpb.RSpan) bool
Valid() bool
Error() error
Next(ctx context.Context)
Seek(ctx context.Context, key roachpb.RKey, scanDir kvcoord.ScanDirection)
}

func (r *distResolver) getRangesForSpans(
ctx context.Context, spans []roachpb.Span,
) ([]roachpb.Span, error) {
spans, _, err := r.DistSender.AllRangeSpans(ctx, spans)
return spans, err
// rebalancingPartition is a container used to store a partition undergoing
// rebalancing.
type rebalancingPartition struct {
// These fields store the current number of ranges and spans in this partition.
// They are initialized corresponding to the sql.SpanPartition partition below
// and mutated during rebalancing.
numRanges int
group roachpb.SpanGroup

// The original span partition corresponding to this bucket and its
// index in the original []sql.SpanPartition.
part sql.SpanPartition
pIdx int
}

func rebalanceSpanPartitions(
ctx context.Context, r rangeResolver, sensitivity float64, p []sql.SpanPartition,
ctx context.Context, ri rangeIterator, sensitivity float64, partitions []sql.SpanPartition,
) ([]sql.SpanPartition, error) {
if len(p) <= 1 {
return p, nil
if len(partitions) <= 1 {
return partitions, nil
}

// Explode set of spans into set of ranges.
// TODO(yevgeniy): This might not be great if the tables are huge.
numRanges := 0
for i := range p {
spans, err := r.getRangesForSpans(ctx, p[i].Spans)
if err != nil {
return nil, err
// Create partition builder structs for the partitions array above.
var builders = make([]rebalancingPartition, len(partitions))
var totalRanges int
for i, p := range partitions {
builders[i].part = p
builders[i].pIdx = i
nRanges, ok := p.NumRanges()
// We cannot rebalance if we're missing range information.
if !ok {
log.Warning(ctx, "skipping rebalance due to missing range info")
return partitions, nil
}
p[i].Spans = spans
numRanges += len(spans)
builders[i].numRanges = nRanges
totalRanges += nRanges
builders[i].group.Add(p.Spans...)
}

// Sort descending based on the number of ranges.
sort.Slice(p, func(i, j int) bool {
return len(p[i].Spans) > len(p[j].Spans)
sort.Slice(builders, func(i, j int) bool {
return builders[i].numRanges > builders[j].numRanges
})

targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p)))

for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; {
from, to := i, j

// Figure out how many ranges we can move.
numToMove := len(p[from].Spans) - targetRanges
canMove := targetRanges - len(p[to].Spans)
if numToMove <= canMove {
i++
}
if canMove <= numToMove {
numToMove = canMove
j--
}
if numToMove == 0 {
break
targetRanges := int(math.Ceil((1 + sensitivity) * float64(totalRanges) / float64(len(partitions))))
to := len(builders) - 1
from := 0

// In each iteration of the outer loop, check if `from` has too many ranges.
// If so, move them to other partitions which need more ranges
// starting from `to` and moving down. Otherwise, increment `from` and check
// again.
for ; from < to && builders[from].numRanges > targetRanges; from++ {
// numToMove is the number of ranges which need to be moved out of `from`
// to other partitions.
numToMove := builders[from].numRanges - targetRanges
count := 0
needMore := func() bool {
return count < numToMove
}
// Iterate over all the spans in `from`.
for spanIdx := 0; from < to && needMore() && spanIdx < len(builders[from].part.Spans); spanIdx++ {
sp := builders[from].part.Spans[spanIdx]
rSpan, err := keys.SpanAddr(sp)
if err != nil {
return nil, err
}
// Iterate over the ranges in the current span.
for ri.Seek(ctx, rSpan.Key, kvcoord.Ascending); from < to && needMore(); ri.Next(ctx) {
// Error check.
if !ri.Valid() {
return nil, ri.Error()
}

// Move numToMove spans from 'from' to 'to'.
idx := len(p[from].Spans) - numToMove
p[to].Spans = append(p[to].Spans, p[from].Spans[idx:]...)
p[from].Spans = p[from].Spans[:idx]
// Move one range from `from` to `to`.
count += 1
builders[from].numRanges -= 1
builders[to].numRanges += 1
// If the range boundaries are outside the original span, trim
// the range.
startKey := ri.Desc().StartKey
if startKey.Compare(rSpan.Key) == -1 {
startKey = rSpan.Key
}
endKey := ri.Desc().EndKey
if endKey.Compare(rSpan.EndKey) == 1 {
endKey = rSpan.EndKey
}
diff := roachpb.Span{
Key: startKey.AsRawKey(), EndKey: endKey.AsRawKey(),
}
builders[from].group.Sub(diff)
builders[to].group.Add(diff)

// Since we moved a range, `to` may have enough ranges.
// Decrement `to` until we find a new partition which needs more
// ranges.
for from < to && builders[to].numRanges >= targetRanges {
to--
}
// No more ranges in this span.
if !ri.NeedAnother(rSpan) {
break
}
}
}
}

// Collapse ranges into nice set of contiguous spans.
for i := range p {
var g roachpb.SpanGroup
g.Add(p[i].Spans...)
p[i].Spans = g.Slice()
// Overwrite the original partitions slice with the balanced partitions.
for _, b := range builders {
partitions[b.pIdx] = sql.MakeSpanPartitionWithRangeCount(
b.part.SQLInstanceID, b.group.Slice(), b.numRanges)
}

// Finally, re-sort based on the node id.
sort.Slice(p, func(i, j int) bool {
return p[i].SQLInstanceID < p[j].SQLInstanceID
})
return p, nil
return partitions, nil
}
Loading

0 comments on commit ab3298b

Please sign in to comment.