Skip to content

Commit

Permalink
changefeedccl: Uniformly distribute work during export.
Browse files Browse the repository at this point in the history
By default, changefeed distributes the work to nodes based
on which nodes are the lease holder for the ranges.
This makes sense since running rangefeed against local node
is more efficient.

In a cluster where ranges are almost uniformly assigned
to each node, running changefeed export is efficient:
all nodes are busy, until they are done.

KV server is responsible for making sure that the ranges
are more or less uniformly distributed across the cluster;
however, this determination is based on the set of all ranges
in the cluster, and not based on a particular table.

As a result, it is possible to have a table that does not
uniform distribution of its ranges across all the nodes.
When this happens, the changefeed export would take long
time due to the long tail: as each node completes its
set of assigned ranges, it idles until changefeed completes.

This PR introduces a change (controlled via
`changefeed.balance_range_distribution.enable` setting)
where the changefeed try to produce a more balanced
assignment, where each node is responsible for roughly
1/Nth of the work for the cluster of N nodes.

Release note (enterprise change): Changefeed exports are
up to 25% faster due to uniform work assignment.
  • Loading branch information
Yevgeniy Miretskiy committed Sep 27, 2022
1 parent ae38d90 commit 89b19f4
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ bulkio.backup.file_size byte size 128 MiB target size for individual data files
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer
changefeed.event_consumer_workers integer 8 the number of workers to use when processing events; 0 or 1 disables
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<tr><td><code>bulkio.backup.read_timeout</code></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td></tr>
<tr><td><code>bulkio.backup.read_with_priority_after</code></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td></tr>
<tr><td><code>bulkio.stream_ingestion.minimum_flush_interval</code></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><code>changefeed.balance_range_distribution.enable</code></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes</td></tr>
<tr><td><code>changefeed.event_consumer_worker_queue_size</code></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of eventswhich a worker can buffer</td></tr>
<tr><td><code>changefeed.event_consumer_workers</code></td><td>integer</td><td><code>8</code></td><td>the number of workers to use when processing events; 0 or 1 disables</td></tr>
<tr><td><code>changefeed.node_throttle_config</code></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/protectedts",
Expand Down Expand Up @@ -96,6 +97,7 @@ go_library(
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/cache",
Expand Down
134 changes: 124 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ package changefeedccl

import (
"context"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand All @@ -29,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -307,6 +311,14 @@ func startDistChangefeed(
return err
}

var enableBalancedRangeDistribution = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.balance_range_distribution.enable",
"if enabled, the ranges are balanced equally among all nodes",
util.ConstantWithMetamorphicTestBool(
"changefeed.balance_range_distribution.enable", false),
).WithPublic()

func makePlan(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
Expand All @@ -316,24 +328,41 @@ func makePlan(
trackedSpans []roachpb.Span,
selectClause string,
) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {

return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
var blankTxn *kv.Txn

distMode := sql.DistributionTypeAlways
if details.SinkURI == `` {
// Sinkless feeds get one ChangeAggregator on this node.
distMode = sql.DistributionTypeNone
}

planCtx := dsp.NewPlanningCtx(ctx, execCtx.ExtendedEvalContext(), nil /* planner */, blankTxn,
sql.DistributionTypeAlways)
sql.DistributionType(distMode))
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans)
if err != nil {
return nil, nil, err
}

var spanPartitions []sql.SpanPartition
if details.SinkURI == `` {
// Sinkless feeds get one ChangeAggregator on the gateway.
spanPartitions = []sql.SpanPartition{{SQLInstanceID: dsp.GatewayID(), Spans: trackedSpans}}
} else {
// All other feeds get a ChangeAggregator local on the leaseholder.
var err error
spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, trackedSpans)
sv := &execCtx.ExecCfg().Settings.SV
if enableBalancedRangeDistribution.Get(sv) {
scanType, err := changefeedbase.MakeStatementOptions(details.Opts).GetInitialScanType()
if err != nil {
return nil, nil, err
}

// Currently, balanced range distribution supported only in export mode.
// TODO(yevgeniy): Consider lifting this restriction.
if scanType == changefeedbase.OnlyInitialScan {
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)

spanPartitions, err = rebalanceSpanPartitions(
ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions)
if err != nil {
return nil, nil, err
}
}
}

// Use the same checkpoint for all aggregators; each aggregator will only look at
Expand Down Expand Up @@ -442,3 +471,88 @@ func (w *changefeedResultWriter) SetError(err error) {
func (w *changefeedResultWriter) Err() error {
return w.err
}

var rebalanceThreshold = settings.RegisterFloatSetting(
settings.TenantWritable,
"changefeed.balance_range_distribution.sensitivity",
"rebalance if the number of ranges on a node exceeds the average by this fraction",
0.05,
settings.PositiveFloat,
)

type rangeResolver interface {
getRangesForSpans(ctx context.Context, spans []roachpb.Span) ([]roachpb.Span, error)
}

type distResolver struct {
*kvcoord.DistSender
}

func (r *distResolver) getRangesForSpans(
ctx context.Context, spans []roachpb.Span,
) ([]roachpb.Span, error) {
return kvfeed.AllRangeSpans(ctx, r.DistSender, spans)
}

func rebalanceSpanPartitions(
ctx context.Context, r rangeResolver, sensitivity float64, p []sql.SpanPartition,
) ([]sql.SpanPartition, error) {
if len(p) <= 1 {
return p, nil
}

// Explode set of spans into set of ranges.
// TODO(yevgeniy): This might not be great if the tables are huge.
numRanges := 0
for i := range p {
spans, err := r.getRangesForSpans(ctx, p[i].Spans)
if err != nil {
return nil, err
}
p[i].Spans = spans
numRanges += len(spans)
}

// Sort descending based on the number of ranges.
sort.Slice(p, func(i, j int) bool {
return len(p[i].Spans) > len(p[j].Spans)
})

targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p)))

for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; {
from, to := i, j

// Figure out how many ranges we can move.
numToMove := len(p[from].Spans) - targetRanges
canMove := targetRanges - len(p[to].Spans)
if numToMove <= canMove {
i++
}
if canMove <= numToMove {
numToMove = canMove
j--
}
if numToMove == 0 {
break
}

// Move numToMove spans from 'from' to 'to'.
idx := len(p[from].Spans) - numToMove
p[to].Spans = append(p[to].Spans, p[from].Spans[idx:]...)
p[from].Spans = p[from].Spans[:idx]
}

// Collapse ranges into nice set of contiguous spans.
for i := range p {
var g roachpb.SpanGroup
g.Add(p[i].Spans...)
p[i].Spans = g.Slice()
}

// Finally, re-sort based on the node id.
sort.Slice(p, func(i, j int) bool {
return p[i].SQLInstanceID < p[j].SQLInstanceID
})
return p, nil
}
81 changes: 81 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7450,3 +7450,84 @@ func TestRedactedSchemaRegistry(t *testing.T) {
// kafka supports the confluent_schema_registry option.
cdcTest(t, testFn, feedTestForceSink("kafka"))
}

type echoResolver struct {
result []roachpb.Spans
pos int
}

func (r *echoResolver) getRangesForSpans(
_ context.Context, _ []roachpb.Span,
) (spans []roachpb.Span, _ error) {
spans = r.result[r.pos]
r.pos++
return spans, nil
}

func TestPartitionSpans(t *testing.T) {
defer leaktest.AfterTest(t)()

partitions := func(p ...sql.SpanPartition) []sql.SpanPartition {
return p
}
mkPart := func(n base.SQLInstanceID, spans ...roachpb.Span) sql.SpanPartition {
return sql.SpanPartition{SQLInstanceID: n, Spans: spans}
}
mkSpan := func(start, end string) roachpb.Span {
return roachpb.Span{Key: []byte(start), EndKey: []byte(end)}
}
spans := func(s ...roachpb.Span) roachpb.Spans {
return s
}
const sensitivity = 0.01

for i, tc := range []struct {
input []sql.SpanPartition
resolve []roachpb.Spans
expect []sql.SpanPartition
}{
{
input: partitions(
mkPart(1, mkSpan("a", "j")),
mkPart(2, mkSpan("j", "q")),
mkPart(3, mkSpan("q", "z")),
),
// 6 total ranges, 2 per node.
resolve: []roachpb.Spans{
spans(mkSpan("a", "c"), mkSpan("c", "e"), mkSpan("e", "j")),
spans(mkSpan("j", "q")),
spans(mkSpan("q", "y"), mkSpan("y", "z")),
},
expect: partitions(
mkPart(1, mkSpan("a", "e")),
mkPart(2, mkSpan("e", "q")),
mkPart(3, mkSpan("q", "z")),
),
},
{
input: partitions(
mkPart(1, mkSpan("a", "c"), mkSpan("e", "p"), mkSpan("r", "z")),
mkPart(2),
mkPart(3, mkSpan("c", "e"), mkSpan("p", "r")),
),
// 5 total ranges -- on 2 nodes; target should be 1 per node.
resolve: []roachpb.Spans{
spans(mkSpan("a", "c"), mkSpan("e", "p"), mkSpan("r", "z")),
spans(),
spans(mkSpan("c", "e"), mkSpan("p", "r")),
},
expect: partitions(
mkPart(1, mkSpan("a", "c"), mkSpan("e", "p")),
mkPart(2, mkSpan("r", "z")),
mkPart(3, mkSpan("c", "e"), mkSpan("p", "r")),
),
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
sp, err := rebalanceSpanPartitions(context.Background(),
&echoResolver{result: tc.resolve}, sensitivity, tc.input)
require.NoError(t, err)
require.Equal(t, tc.expect, sp)
})
}
}
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (p *scanRequestScanner) exportSpan(
func getSpansToProcess(
ctx context.Context, ds *kvcoord.DistSender, targetSpans []roachpb.Span,
) ([]roachpb.Span, error) {
ranges, err := allRangeSpans(ctx, ds, targetSpans)
ranges, err := AllRangeSpans(ctx, ds, targetSpans)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +254,8 @@ func slurpScanResponse(
return nil
}

func allRangeSpans(
// AllRangeSpans returns the list of all ranges that for the specified list of spans.
func AllRangeSpans(
ctx context.Context, ds *kvcoord.DistSender, spans []roachpb.Span,
) ([]roachpb.Span, error) {

Expand Down

0 comments on commit 89b19f4

Please sign in to comment.