Skip to content

Commit

Permalink
changefeedccl: add balanced_full distribution strategy
Browse files Browse the repository at this point in the history
This change adds support for `changefeed.default_range_distribution_strategy='balanced_full'`.
`balanced_full` distributes ranges evenly across all healthy nodes. The implementation first
gets a plan from sql and rebalances it. Rebalancing is better than balancing / assigning ranges
directly because the final plan will leverage some optimizations from distsql. For example,
distsql tends to assign local ranges to nodes. If a node has many local ranges and is assigned
more ranges compared to other nodes, then we reassign some of those ranges during rebalancing.
The node still gets to keep as many local ranges as possible.

This change also adds the setting `changefeed.range_distribution_threshold` which controls
when distribution strategies will take effect. It defines the minimum number of ranges
which a changefeed must watch to trigger distribution strategies. This setting avoids
rebalancing a small changefeed across many nodes. The default value is 1024.

This change also retires the `changefeed.balance_range_distribution.sensitivity` setting. This
setting existed to reduce the amount of work to perform during rebalancing (ie. only rebalance
a partition to within 5% of the ideal number of ranges per partition). This optimization is not
necessary at this moment because rebalancing is relatively cheap and only happens at the start of
a changefeed. Also, mechanism would sometimes cause bad distributions. Consider a scenario where
we 25 ranges are rebalanced over 5 nodes. With some sensitivity, 6 ranges would be allowed per
node. This can result in the imbalanced distribution: 1, 6, 6, 6, 6. Removing sensitivity results
in the ideal distribution: 5, 5, 5, 5, 5 (or close to the ideal distribution when ranges
cannot be divided evenly).

Informs: cockroachdb#113898
Epic: None
Release note (enterprise change): This change adds a new value `balanced_full` for the setting
`changefeed.default_range_distribution_strategy`. Setting this value will make changefeeds attempt
to balance work evenly over at many nodes as possible (while obeying locality restrictions). This
is different than `balanced_simple` which does not always leverage all possible nodes.

This change also adds the setting `changefeed.range_distribution_threshold` which controls
when distribution strategies (ie. changing `changefeed.default_range_distribution_strategy`)
will take effect. It defines the minimum number of ranges which a changefeed must watch to
trigger distribution strategies.
  • Loading branch information
jayshrivastava committed Jan 11, 2024
1 parent 29b5fce commit 9de25bc
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 43 deletions.
3 changes: 2 additions & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ changefeed.aggregator.flush_jitter float 0.1 jitter aggregator flushes as a frac
changefeed.backfill.concurrent_scan_requests integer 0 number of concurrent scan requests per node issued during a backfill application
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request application
changefeed.batch_reduction_retry.enabled boolean false if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes application
changefeed.default_range_distribution_strategy enumeration default configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1] application
changefeed.default_range_distribution_strategy enumeration default configures how work is distributed among nodes for a given changefeed. for the most balanced distribution across as many nodes as possible use 'balanced_full'. for a smaller selection of nodes, use 'balanced_simple'. changing this setting will not override locality restrictions. non-default values for this setting will only take effect if the number of ranges targeted by the changefeed is greater than 'changefeed.range_distribution_threshold'. see https://www.cockroachlabs.com/docs/stable/show-ranges for more information about ranges [default = 0, balanced_simple = 1, balanced_full = 2] application
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer application
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled application
changefeed.fast_gzip.enabled boolean true use fast gzip implementation application
Expand All @@ -23,6 +23,7 @@ changefeed.min_highwater_advance duration 0s minimum amount of time the changefe
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds application
changefeed.protect_timestamp.max_age duration 96h0m0s fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration application
changefeed.protect_timestamp_interval duration 10m0s controls how often the changefeed forwards its protected timestamp to the resolved timestamp application
changefeed.range_distribution_threshold integer 1024 the number of ranges targeted by a changefeed above which changefeed.default_range_distribution_strategy will take effect application
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables application
changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value application
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload application
Expand Down
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<tr><td><div id="setting-changefeed-backfill-concurrent-scan-requests" class="anchored"><code>changefeed.backfill.concurrent_scan_requests</code></div></td><td>integer</td><td><code>0</code></td><td>number of concurrent scan requests per node issued during a backfill</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-batch-reduction-retry-enabled" class="anchored"><code>changefeed.batch_reduction_retry.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-default-range-distribution-strategy" class="anchored"><code>changefeed.default_range_distribution_strategy</code></div></td><td>enumeration</td><td><code>default</code></td><td>configures how work is distributed among nodes for a given changefeed. for the most balanced distribution, use `balanced_simple`. changing this setting will not override locality restrictions [default = 0, balanced_simple = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-default-range-distribution-strategy" class="anchored"><code>changefeed.default_range_distribution_strategy</code></div></td><td>enumeration</td><td><code>default</code></td><td>configures how work is distributed among nodes for a given changefeed. for the most balanced distribution across as many nodes as possible use &#39;balanced_full&#39;. for a smaller selection of nodes, use &#39;balanced_simple&#39;. changing this setting will not override locality restrictions. non-default values for this setting will only take effect if the number of ranges targeted by the changefeed is greater than &#39;changefeed.range_distribution_threshold&#39;. see https://www.cockroachlabs.com/docs/stable/show-ranges for more information about ranges [default = 0, balanced_simple = 1, balanced_full = 2]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-fast-gzip-enabled" class="anchored"><code>changefeed.fast_gzip.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>use fast gzip implementation</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand All @@ -28,6 +28,7 @@
<tr><td><div id="setting-changefeed-node-throttle-config" class="anchored"><code>changefeed.node_throttle_config</code></div></td><td>string</td><td><code></code></td><td>specifies node level throttling configuration for all changefeeeds</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-max-age" class="anchored"><code>changefeed.protect_timestamp.max_age</code></div></td><td>duration</td><td><code>96h0m0s</code></td><td>fail the changefeed if the protected timestamp age exceeds this threshold; 0 disables expiration</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-protect-timestamp-interval" class="anchored"><code>changefeed.protect_timestamp_interval</code></div></td><td>duration</td><td><code>10m0s</code></td><td>controls how often the changefeed forwards its protected timestamp to the resolved timestamp</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-range-distribution-threshold" class="anchored"><code>changefeed.range_distribution_threshold</code></div></td><td>integer</td><td><code>1024</code></td><td>the number of ranges targeted by a changefeed above which changefeed.default_range_distribution_strategy will take effect</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-schema-feed-read-with-priority-after" class="anchored"><code>changefeed.schema_feed.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>retry with high priority if we were not able to read descriptors for too long; 0 disables</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-changefeed-sink-io-workers" class="anchored"><code>changefeed.sink_io_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers used by changefeeds when sending requests to the sink (currently webhook only): &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-cloudstorage-azure-concurrent-upload-buffers" class="anchored"><code>cloudstorage.azure.concurrent_upload_buffers</code></div></td><td>integer</td><td><code>1</code></td><td>controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
105 changes: 88 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"math"
"sort"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -42,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -335,27 +337,72 @@ const (
// set of nodes to distribute work to. However, changefeeds will try to
// distribute work evenly across this set of nodes.
balancedSimpleDistribution rangeDistributionType = 1
// TODO(jayant): add balancedFullDistribution which takes
// full control of node selection and distribution.
// balancedFullDistribution distributes work uniformly across a list of
// all healthy nodes provided by distsql.
balancedFullDistribution rangeDistributionType = 2
)

const rangeDistributionThresholdSettingName = "changefeed.range_distribution_threshold"

// RangeDisitributionThreshold is the number of ranges targeted by a changefeed
// above which changefeed.default_range_distribution_strategy will take effect.
var RangeDisitributionThreshold = settings.RegisterIntSetting(
settings.ApplicationLevel,
rangeDistributionThresholdSettingName,
fmt.Sprintf("the number of ranges targeted by a changefeed above which %s will take effect",
rangeDistributionStrategySettingName),
1024,
settings.NonNegativeInt,
settings.WithPublic)

const rangeDistributionStrategySettingName = "changefeed.default_range_distribution_strategy"

// RangeDistributionStrategy is used to determine how the changefeed balances
// ranges between nodes.
// TODO: deprecate this setting in favor of a changefeed option.
var RangeDistributionStrategy = settings.RegisterEnumSetting(
settings.ApplicationLevel,
"changefeed.default_range_distribution_strategy",
"configures how work is distributed among nodes for a given changefeed. "+
"for the most balanced distribution, use `balanced_simple`. changing this setting "+
"will not override locality restrictions",
rangeDistributionStrategySettingName,
fmt.Sprintf("configures how work is distributed among nodes for a given changefeed. "+
"for the most balanced distribution across as many nodes as possible use "+
"'balanced_full'. for a smaller selection of nodes, use 'balanced_simple'. "+
"changing this setting will not override locality restrictions. non-default "+
"values for this setting will only take effect if the number of ranges targeted by "+
"the changefeed is greater than '%s'. see https://www.cockroachlabs.com/docs/stable/show-ranges "+
"for more information about ranges",
rangeDistributionThresholdSettingName),
util.ConstantWithMetamorphicTestChoice("default_range_distribution_strategy",
"default", "balanced_simple").(string),
map[int64]string{
int64(defaultDistribution): "default",
int64(balancedSimpleDistribution): "balanced_simple",
int64(balancedFullDistribution): "balanced_full",
},
settings.WithPublic)

// addNewEmptyPartitions adds empty partitions to the input slice `partitions`
// for any sql instances in `allNodes` which do not yet have partitions.
func addNewEmptyPartitions(
allNodes []base.SQLInstanceID, partitions []sql.SpanPartition,
) []sql.SpanPartition {
// len(partitions) > len(allNodes) is unexpected and handled silently here.
// len(partitions) == len(allNodes) where node X is in `allNodes` but not in
//`partitions` is also unexpected and silently handled by this case.
if len(partitions) >= len(allNodes) {
return partitions
}
s := intsets.Fast{}
for _, p := range partitions {
s.Add(int(p.SQLInstanceID))
}
for _, instanceID := range allNodes {
if !s.Contains(int(instanceID)) {
partitions = append(partitions, sql.MakeSpanPartition(instanceID, nil, true, 0))
}
}
return partitions
}

func makePlan(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
Expand Down Expand Up @@ -398,7 +445,32 @@ func makePlan(
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
ri := kvcoord.MakeRangeIterator(distSender)
spanPartitions, err = rebalanceSpanPartitions(
ctx, &ri, rebalanceThreshold.Get(sv), spanPartitions)
ctx, &ri, spanPartitions, RangeDisitributionThreshold.Get(&execCtx.ExecCfg().Settings.SV))
if err != nil {
return nil, nil, err
}
case rangeDistribution == int64(balancedFullDistribution):
var allNodes []base.SQLInstanceID
planCtx, allNodes, err = dsp.SetupAllNodesPlanningWithOracle(ctx, execCtx.ExtendedEvalContext(),
planCtx.ExtendedEvalCtx.ExecCfg, oracle, locFilter)
if err != nil {
return nil, nil, err
}
// We still use dsp.PartitionSpans() followed by a minimal rebalance to leverage distsql heuristics
// without being unbalanced. For example, distsql tends to assign local ranges to nodes.
// If a node has many local ranges and is assigned more ranges compared to other nodes,
// then we reassign some of those ranges during rebalancing. The node still gets to keep as many local
// ranges as possible.
spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, trackedSpans)
if err != nil {
return nil, nil, err
}
spanPartitions = addNewEmptyPartitions(allNodes, spanPartitions)
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
ri := kvcoord.MakeRangeIterator(distSender)
spanPartitions, err = rebalanceSpanPartitions(
ctx, &ri, spanPartitions, RangeDisitributionThreshold.Get(&execCtx.ExecCfg().Settings.SV))
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -539,14 +611,6 @@ func (w *changefeedResultWriter) Err() error {
return w.err
}

var rebalanceThreshold = settings.RegisterFloatSetting(
settings.ApplicationLevel,
"changefeed.balance_range_distribution.sensitivity",
"rebalance if the number of ranges on a node exceeds the average by this fraction",
0.05,
settings.PositiveFloat,
)

type rangeIterator interface {
Desc() *roachpb.RangeDescriptor
NeedAnother(rs roachpb.RSpan) bool
Expand Down Expand Up @@ -576,7 +640,7 @@ var expensiveReblanceChecksEnabled = buildutil.CrdbTestBuild || envutil.EnvOrDef
"COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS", false)

func rebalanceSpanPartitions(
ctx context.Context, ri rangeIterator, sensitivity float64, partitions []sql.SpanPartition,
ctx context.Context, ri rangeIterator, partitions []sql.SpanPartition, threshold int64,
) ([]sql.SpanPartition, error) {
if len(partitions) <= 1 {
return partitions, nil
Expand All @@ -599,12 +663,19 @@ func rebalanceSpanPartitions(
builders[i].g.Add(p.Spans...)
}

// If the number of ranges is small, there's no need to rebalance.
// As an extra backstop, don't rebalance if the total ranges is smaller
// than the number of partitions (ie. nodes).
if int64(totalRanges) <= threshold || totalRanges < len(partitions) {
return partitions, nil
}

// Sort descending based on the number of ranges.
sort.Slice(builders, func(i, j int) bool {
return builders[i].numRanges > builders[j].numRanges
})

targetRanges := int(math.Ceil((1 + sensitivity) * float64(totalRanges) / float64(len(partitions))))
targetRanges := int(math.Ceil(float64(totalRanges) / float64(len(partitions))))
to := len(builders) - 1
from := 0
terminate := func() bool {
Expand Down
Loading

0 comments on commit 9de25bc

Please sign in to comment.