Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Uniformly distribute work during export. #88672

Merged
merged 1 commit into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ bulkio.backup.file_size byte size 128 MiB target size for individual data files
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer
changefeed.event_consumer_workers integer 8 the number of workers to use when processing events; 0 or 1 disables
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<tr><td><code>bulkio.backup.read_timeout</code></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td></tr>
<tr><td><code>bulkio.backup.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>changefeed.balance_range_distribution.enable</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>8</code></td><td>the number of workers to use when processing events; 0 or 1 disables</td></tr>
<tr><td><code>changefeed.node_throttle_config</code></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/protectedts",
Expand Down Expand Up @@ -96,6 +97,7 @@ go_library(
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/cache",
Expand Down
134 changes: 124 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ package changefeedccl

import (
"context"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -29,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -307,6 +311,14 @@ func startDistChangefeed(
return err
}

var enableBalancedRangeDistribution = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.balance_range_distribution.enable",
"if enabled, the ranges are balanced equally among all nodes",
util.ConstantWithMetamorphicTestBool(
"changefeed.balance_range_distribution.enable", false),
).WithPublic()

func makePlan(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
Expand All @@ -316,24 +328,41 @@ func makePlan(
trackedSpans []roachpb.Span,
selectClause string,
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {

return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
var blankTxn *kv.Txn

distMode := sql.DistributionTypeAlways
if details.SinkURI == `` {
// Sinkless feeds get one ChangeAggregator on this node.
distMode = sql.DistributionTypeNone
}

planCtx := dsp.NewPlanningCtx(ctx, execCtx.ExtendedEvalContext(), nil /* planner */, blankTxn,
sql.DistributionTypeAlways)
sql.DistributionType(distMode))
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans)
if err != nil {
return nil, nil, err
}

var spanPartitions []sql.SpanPartition
if details.SinkURI == `` {
// Sinkless feeds get one ChangeAggregator on the gateway.
spanPartitions = []sql.SpanPartition{{SQLInstanceID: dsp.GatewayID(), Spans: trackedSpans}}
} else {
// All other feeds get a ChangeAggregator local on the leaseholder.
var err error
spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, trackedSpans)
sv := &execCtx.ExecCfg().Settings.SV
if enableBalancedRangeDistribution.Get(sv) {
scanType, err := changefeedbase.MakeStatementOptions(details.Opts).GetInitialScanType()
if err != nil {
return nil, nil, err
}

// Currently, balanced range distribution supported only in export mode.
// TODO(yevgeniy): Consider lifting this restriction.
if scanType == changefeedbase.OnlyInitialScan {
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)

spanPartitions, err = rebalanceSpanPartitions(
ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions)
if err != nil {
return nil, nil, err
}
}
}

// Use the same checkpoint for all aggregators; each aggregator will only look at
Expand Down Expand Up @@ -442,3 +471,88 @@ func (w *changefeedResultWriter) SetError(err error) {
func (w *changefeedResultWriter) Err() error {
return w.err
}

var rebalanceThreshold = settings.RegisterFloatSetting(
settings.TenantWritable,
"changefeed.balance_range_distribution.sensitivity",
"rebalance if the number of ranges on a node exceeds the average by this fraction",
0.05,
settings.PositiveFloat,
)

type rangeResolver interface {
getRangesForSpans(ctx context.Context, spans []roachpb.Span) ([]roachpb.Span, error)
}

type distResolver struct {
*kvcoord.DistSender
}

func (r *distResolver) getRangesForSpans(
ctx context.Context, spans []roachpb.Span,
) ([]roachpb.Span, error) {
return kvfeed.AllRangeSpans(ctx, r.DistSender, spans)
}

func rebalanceSpanPartitions(
ctx context.Context, r rangeResolver, sensitivity float64, p []sql.SpanPartition,
) ([]sql.SpanPartition, error) {
if len(p) <= 1 {
return p, nil
}

// Explode set of spans into set of ranges.
// TODO(yevgeniy): This might not be great if the tables are huge.
numRanges := 0
for i := range p {
spans, err := r.getRangesForSpans(ctx, p[i].Spans)
if err != nil {
return nil, err
}
p[i].Spans = spans
numRanges += len(spans)
}

// Sort descending based on the number of ranges.
sort.Slice(p, func(i, j int) bool {
return len(p[i].Spans) > len(p[j].Spans)
})

targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p)))

for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; {
from, to := i, j

// Figure out how many ranges we can move.
numToMove := len(p[from].Spans) - targetRanges
canMove := targetRanges - len(p[to].Spans)
if numToMove <= canMove {
i++
}
if canMove <= numToMove {
numToMove = canMove
j--
}
if numToMove == 0 {
break
}

// Move numToMove spans from 'from' to 'to'.
idx := len(p[from].Spans) - numToMove
p[to].Spans = append(p[to].Spans, p[from].Spans[idx:]...)
p[from].Spans = p[from].Spans[:idx]
}

// Collapse ranges into nice set of contiguous spans.
for i := range p {
var g roachpb.SpanGroup
g.Add(p[i].Spans...)
p[i].Spans = g.Slice()
}

// Finally, re-sort based on the node id.
sort.Slice(p, func(i, j int) bool {
return p[i].SQLInstanceID < p[j].SQLInstanceID
})
return p, nil
}
81 changes: 81 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7450,3 +7450,84 @@ func TestRedactedSchemaRegistry(t *testing.T) {
// kafka supports the confluent_schema_registry option.
cdcTest(t, testFn, feedTestForceSink("kafka"))
}

type echoResolver struct {
result []roachpb.Spans
pos int
}

func (r *echoResolver) getRangesForSpans(
_ context.Context, _ []roachpb.Span,
) (spans []roachpb.Span, _ error) {
spans = r.result[r.pos]
r.pos++
return spans, nil
}

func TestPartitionSpans(t *testing.T) {
defer leaktest.AfterTest(t)()

partitions := func(p ...sql.SpanPartition) []sql.SpanPartition {
return p
}
mkPart := func(n base.SQLInstanceID, spans ...roachpb.Span) sql.SpanPartition {
return sql.SpanPartition{SQLInstanceID: n, Spans: spans}
}
mkSpan := func(start, end string) roachpb.Span {
return roachpb.Span{Key: []byte(start), EndKey: []byte(end)}
}
spans := func(s ...roachpb.Span) roachpb.Spans {
return s
}
const sensitivity = 0.01

for i, tc := range []struct {
input []sql.SpanPartition
resolve []roachpb.Spans
expect []sql.SpanPartition
}{
{
input: partitions(
mkPart(1, mkSpan("a", "j")),
mkPart(2, mkSpan("j", "q")),
mkPart(3, mkSpan("q", "z")),
),
// 6 total ranges, 2 per node.
resolve: []roachpb.Spans{
spans(mkSpan("a", "c"), mkSpan("c", "e"), mkSpan("e", "j")),
spans(mkSpan("j", "q")),
spans(mkSpan("q", "y"), mkSpan("y", "z")),
},
expect: partitions(
mkPart(1, mkSpan("a", "e")),
mkPart(2, mkSpan("e", "q")),
mkPart(3, mkSpan("q", "z")),
),
},
{
input: partitions(
mkPart(1, mkSpan("a", "c"), mkSpan("e", "p"), mkSpan("r", "z")),
mkPart(2),
mkPart(3, mkSpan("c", "e"), mkSpan("p", "r")),
),
// 5 total ranges -- on 2 nodes; target should be 1 per node.
resolve: []roachpb.Spans{
spans(mkSpan("a", "c"), mkSpan("e", "p"), mkSpan("r", "z")),
spans(),
spans(mkSpan("c", "e"), mkSpan("p", "r")),
},
expect: partitions(
mkPart(1, mkSpan("a", "c"), mkSpan("e", "p")),
mkPart(2, mkSpan("r", "z")),
mkPart(3, mkSpan("c", "e"), mkSpan("p", "r")),
),
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
sp, err := rebalanceSpanPartitions(context.Background(),
&echoResolver{result: tc.resolve}, sensitivity, tc.input)
require.NoError(t, err)
require.Equal(t, tc.expect, sp)
})
}
}
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (p *scanRequestScanner) exportSpan(
func getSpansToProcess(
ctx context.Context, ds *kvcoord.DistSender, targetSpans []roachpb.Span,
) ([]roachpb.Span, error) {
ranges, err := allRangeSpans(ctx, ds, targetSpans)
ranges, err := AllRangeSpans(ctx, ds, targetSpans)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +254,8 @@ func slurpScanResponse(
return nil
}

func allRangeSpans(
// AllRangeSpans returns the list of all ranges that for the specified list of spans.
func AllRangeSpans(
ctx context.Context, ds *kvcoord.DistSender, spans []roachpb.Span,
) ([]roachpb.Span, error) {

Expand Down