Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Uniformly distribute work during export. #88672

Merged
merged 1 commit into from
Sep 28, 2022

Conversation

miretskiy
Copy link
Contributor

By default, changefeed distributes the work to nodes based on which nodes
are the lease holder for the ranges. This makes sense since running rangefeed
against local node is more efficient.

In a cluster where ranges are almost uniformly assigned to each node,
running changefeed export is efficient: all nodes are busy, until they are done.

KV server is responsible for making sure that the ranges are more or less
uniformly distributed across the cluster; however, this determination is based
on the set of all ranges in the cluster, and not based on a particular table.

As a result, it is possible to have a table that does not have uniform distribution
of its ranges across all the nodes. When this happens, the changefeed export
would take long time due to the long tail: as each node completes
its set of assigned ranges, it idles until changefeed completes.

This PR introduces a a change (controlled via
changefeed.balance_range_distribution.enable setting which the changefeed
to try to produce a more balanced assignment, where each node is responsible
for roughly 1/Nth of the work for the cluster of N nodes.

Release note (enterprise change): Changefeed exports are up to 25% faster due to uniform work assignment.

@miretskiy miretskiy requested a review from a team as a code owner September 25, 2022 01:21
@miretskiy miretskiy requested review from ajwerner and removed request for a team September 25, 2022 01:21
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great analysis! I feel like this logic should ideally live in the DistSQL planner itself (in distsql_physical_planner or replicaoracle). It already contains some primitives that could be useful. In particular, telling it to use a RandomOracle instead of a BinPackingOracle might be a simpler and more scalable solution than readjusting the spans afterwards; in large clusters a RandomOracle will get you a uniform distribution, and the default BinPackingOracle does multiple things that can create a non-uniform distribution and aren't appropriate for changefeeds. We could also look into making maxPreferredRangesPerLeaseHolder configurable/smarter (currently it's just a hardcoded 10 scientifically chosen at random).

Reviewed 1 of 5 files at r1, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner)

@miretskiy
Copy link
Contributor Author

RandomOracle
I'm not sure I want to use random oracle -- there is probably some value in keeping as much locality as possible.
I was thinking of making changes in distsql land -- but, given how widely those methods are used, I would not feel
comfortable backporting such a change.

@ajwerner
Copy link
Contributor

Would we be better off just re-planning once some of the change aggregators finish and having that replanning using a different oracle?

@miretskiy
Copy link
Contributor Author

Would we be better off just re-planning once some of the change aggregators finish and having that replanning using a different oracle?

It's a possibility -- but probably a bit more complex? Like: aggregators do not communicate with each other, so they won't know if one exited; then there is a need to send up to date frontier - preferably w/out truncation (due to proto buffer size).

@miretskiy miretskiy force-pushed the balance branch 2 times, most recently from d3bbd36 to 0f59e8b Compare September 26, 2022 18:34
@ajwerner
Copy link
Contributor

It's a possibility -- but probably a bit more complex? Like: aggregators do not communicate with each other, so they won't know if one exited; then there is a need to send up to date frontier - preferably w/out truncation (due to proto buffer size).

Aggregators communicate with the frontier which can know when an aggregator has finished. The root of the query made the plan and has a aggregator->spans mapping which could be provided to the frontier. It just seems to me like better planning up front is never going to get rid of this tail if there's rebalancing, but one graceful shutdown (if we could figure out how to make it happen) could do a great job rebalancing and make it such that the last part of the export happens quite fast.

@miretskiy
Copy link
Contributor Author

Aggregators communicate with the frontier which can know when an aggregator has finished. The root of the query made the plan and has a aggregator->spans mapping which could be provided to the frontier. It just seems to me like better planning up front is never going to get rid of this tail if there's rebalancing, but one graceful shutdown (if we could figure out how to make it happen) could do a great job rebalancing and make it such that the last part of the export happens quite fast.

I agree that better planning will not help w/ rebalance.
we do have ability to restart the flow if the distribution changed (dramatically) -- alas, I'm not sure
it would be very helpful in case of backfills due to the possibility of replan causing too much progress
loss;

The question is: is this PR good enough for now? Seems to be reasonably light weight. Or,
do we want to block what appears to be a decent improvement and try to come up with something
else..

@HonoreDB
Copy link
Contributor

The question is: is this PR good enough for now? Seems to be reasonably light weight. Or,
do we want to block what appears to be a decent improvement and try to come up with something
else..

Fair enough, I'll do a line review now.

Copy link
Contributor

@HonoreDB HonoreDB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @miretskiy)


pkg/ccl/changefeedccl/changefeed_dist.go line 523 at r2 (raw file):

	targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p)))

	for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; {

Why not have j start at len(p)-1 so that we're moving ranges from the largest entry to the smallest?

@miretskiy
Copy link
Contributor Author

pkg/ccl/changefeedccl/changefeed_dist.go line 523 at r2 (raw file):

	targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p)))

	for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; {

Why not have j start at len(p)-1 so that we're moving ranges from the largest entry to the smallest?

Isn't it what I'm doing for i, j := 0, len(p)-1...?

@miretskiy miretskiy force-pushed the balance branch 2 times, most recently from 43f16c9 to 3d5fae6 Compare September 27, 2022 00:46
By default, changefeed distributes the work to nodes based
on which nodes are the lease holder for the ranges.
This makes sense since running rangefeed against local node
is more efficient.

In a cluster where ranges are almost uniformly assigned
to each node, running changefeed export is efficient:
all nodes are busy, until they are done.

KV server is responsible for making sure that the ranges
are more or less uniformly distributed across the cluster;
however, this determination is based on the set of all ranges
in the cluster, and not based on a particular table.

As a result, it is possible to have a table that does not
uniform distribution of its ranges across all the nodes.
When this happens, the changefeed export would take long
time due to the long tail: as each node completes its
set of assigned ranges, it idles until changefeed completes.

This PR introduces a change (controlled via
`changefeed.balance_range_distribution.enable` setting)
where the changefeed try to produce a more balanced
assignment, where each node is responsible for roughly
1/Nth of the work for the cluster of N nodes.

Release note (enterprise change): Changefeed exports are
up to 25% faster due to uniform work assignment.
@miretskiy
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented Sep 27, 2022

Build failed (retrying...):

@craig
Copy link
Contributor

craig bot commented Sep 28, 2022

Build failed (retrying...):

@craig
Copy link
Contributor

craig bot commented Sep 28, 2022

Build succeeded:

@craig craig bot merged commit 4982322 into cockroachdb:master Sep 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants