Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: reduce rebalancing memory usage from O(ranges) to O(spans) #115375

Merged
merged 3 commits into from
Mar 18, 2024

Conversation

jayshrivastava
Copy link
Contributor

@jayshrivastava jayshrivastava commented Nov 30, 2023

sql: count ranges per partition in PartitionSpans

This change updates span partitioning to count ranges while making
partitions. This allows callers to rebalance partitions based on
range counts without having to iterate over the spans to count
ranges.

Release note: None
Epic: None

changefeedccl: reduce rebalancing memory usage from O(ranges) to O(spans) #115375

Previously, the rebalanceSpanPartitions would use O(ranges) memory. This change
rewrites it to use range iterators, reducing the memory usage to O(spans).

This change also adds a randomized test to assert that all spans are accounted for after
rebalancing. It also adds one more unit test.

Informs: #113898
Epic: None

changefeedccl: add rebalancing checks

This change adds extra test coverage for partition rebalancing in
changefeeds. It adds checks which are performed after rebalancing
to assert that the output list of spans covers exactly the same keys
as the input list of spans. These checks are expensive so they only
run if the environment variable COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS
is true. This variable is true in cdc roachtests and unit tests.

Release note: None
Epic: None

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@jayshrivastava jayshrivastava changed the title Changefeed/planning dist 2 changefeedccl: reduce rebalancing memory usage from O(ranges) to O(spans) Nov 30, 2023
@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch 2 times, most recently from 710bcf7 to a285d32 Compare November 30, 2023 22:39
@jayshrivastava
Copy link
Contributor Author

Note that the first commit can be ignored as it's being reviewed separately in #115166

@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch from a285d32 to 47111e9 Compare November 30, 2023 22:44
@jayshrivastava jayshrivastava marked this pull request as ready for review November 30, 2023 22:45
@jayshrivastava jayshrivastava requested review from a team as code owners November 30, 2023 22:45
@jayshrivastava jayshrivastava requested review from herkolategan, renatolabs, miretskiy and DrewKimball and removed request for a team November 30, 2023 22:45
@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch 4 times, most recently from 6480f6d to f3e74a4 Compare December 1, 2023 19:56
Copy link
Member

@srosenberg srosenberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, the rebalanceSpanPartitions would use O(ranges) memory. This change
rewrites it to use range iterators, reducing the memory usage to O(spans).

I am looking at the previous version, using AllRangeSpans. It also uses a range iterator. I guess I am not seeing where O(ranges) is being allocated.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @miretskiy, and @renatolabs)

Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading the code correctly, getRangesForSpans returns a list of ranges. Then we save this list of ranges in p[i].Spans for each partition. So, we end up with all the ranges in memory stored as 1 span per range. This is the code:

numRanges := 0
	for i := range p {
		spans, err := r.getRangesForSpans(ctx, p[i].Spans)
		if err != nil {
			return nil, err
		}
		p[i].Spans = spans
		numRanges += len(spans)
	}

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @miretskiy, and @renatolabs)

@srosenberg
Copy link
Member

If I'm reading the code correctly, getRangesForSpans returns a list of ranges.

It returns a list of ranges confined to the given spans, right?

Then we save this list of ranges in p[i].Spans for each partition. So, we end up with all the ranges in memory stored as 1 span per range.

True, we use spans as an intermediate buffer, which your PR removes. That's a great improvement! I was only confused by the commit message which says the improvement is from O(ranges) to O(spans). I don't see an asymptotic improvement in this PR; i.e., an improvement is in reducing the constant multiplier of the same asymptotic complexity.

Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. It's hard to say because we cant express the number of ranges as a function of spans. Regardless I think O(ranges) -> O(spans) is a simple shorthand which gets the point across. It's nicer to have one span in memory than 250k ranges :)

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @miretskiy, and @renatolabs)

Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're definitely right. Ranges have a bounded size. If a table has a certain size in bytes, the number of ranges is a constant multiple of that.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @miretskiy, and @renatolabs)

pkg/ccl/changefeedccl/changefeed_dist.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/changefeed_dist.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/changefeed_dist.go Outdated Show resolved Hide resolved
@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch 2 times, most recently from 4280271 to d5d62aa Compare December 18, 2023 18:20
@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch from d5d62aa to ee2ae69 Compare December 19, 2023 20:39
@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch 2 times, most recently from 612ddbe to 1a16233 Compare December 20, 2023 17:26
@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch 3 times, most recently from f7108e7 to 3c57453 Compare January 10, 2024 20:53
@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch 2 times, most recently from 29b5fce to 258f2be Compare January 16, 2024 16:13
@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch from 258f2be to 15bf814 Compare February 21, 2024 15:00
Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Is there a microbenchmark that demonstrates these improvements?

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @jayshrivastava, @miretskiy, and @renatolabs)


pkg/ccl/changefeedccl/changefeed_dist.go line 563 at r6 (raw file):

	// and mutated during rebalancing.
	numRanges int
	g         roachpb.SpanGroup

nit: use more descriptive names for struct members like group and part


pkg/ccl/changefeedccl/changefeed_dist.go line 587 at r6 (raw file):

		// We cannot rebalance if we're missing range information.
		if !ok {
			log.Infof(ctx, "skipping rebalance due to missing range info")

This seems worth a warning.


pkg/ccl/changefeedccl/changefeed_dist.go line 600 at r6 (raw file):

	})

	targetRanges := int(math.Ceil((1 + sensitivity) * float64(totalRanges) / float64(len(partitions))))

For my education, how do we normally determine sensitivity? I noticed that in the test sensitivity is 0 and there's a TODO to get rid of it. Does that affect only the sensitivity in the test? Do we test that the algorithm works with sensitivity in another test?


pkg/ccl/changefeedccl/changefeed_dist.go line 667 at r6 (raw file):

	for _, b := range builders {
		partitions[b.pIdx] = sql.MakeSpanPartition(
			b.p.SQLInstanceID, b.g.Slice(), true, b.numRanges)

nit: add comment with param name to bool arg for all these calls.


pkg/sql/distsql_physical_planner.go line 1222 at r5 (raw file):

// MakeSpanPartition constructs a SpanPartition.
func MakeSpanPartition(
	instanceID base.SQLInstanceID, spans roachpb.Spans, haveRangeInfo bool, numRanges int,

In all calls to MakeSpanPartition, haveRangeInfo is true. I would get rid of the bool param and just put true in the struct below.


pkg/ccl/changefeedccl/changefeed_dist_test.go line 136 at r6 (raw file):

				[]roachpb.Span{mkRange('z' - i)},
				true,
				1,

nit: the constant arg could also use a param comment


pkg/ccl/changefeedccl/changefeed_dist_test.go line 268 at r6 (raw file):

	// Create a random input and assert that the output has the same
	// spans as the input.
	t.Run("random", func(t *testing.T) {

Nice test!


pkg/ccl/changefeedccl/changefeed_dist_test.go line 322 at r6 (raw file):

		g2 := copySpans(output)
		require.True(t, g1.Encloses(g2.Slice()...))
		require.True(t, g2.Encloses(g1.Slice()...))

Is there another check here we can make around how many spans per partition there are? Assert some max threshold is not exceeded, maybe?

@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch from 15bf814 to b6c6c47 Compare March 13, 2024 18:26
Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for giving this a look!

Unfortunately, I don't have a microbenchmark. That being said, I'm pretty confident in this. The old code would load all the ranges into memory. Now we load spans into memory.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @miretskiy, @renatolabs, and @rharding6373)


pkg/ccl/changefeedccl/changefeed_dist.go line 600 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

it's a bit sad that we have to have all this code just to count the number of ranges.
I think we can easily extend sql.SpanPartition struct to have num ranges counters which we can update as we're building span partitions.

This will remove the need to create 2 iterators.

Done.


pkg/ccl/changefeedccl/changefeed_dist.go line 612 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

terminate is so.... fatal... Maybe canRebalance?

Done.


pkg/ccl/changefeedccl/changefeed_dist.go line 639 at r2 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

i wonder if there is a way to do it in sql.PartitionSpans...
Going to think a bit more about this... but for now, flushing out comments so far.

Done.


pkg/ccl/changefeedccl/changefeed_dist.go line 587 at r6 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

This seems worth a warning.

Done.


pkg/ccl/changefeedccl/changefeed_dist.go line 600 at r6 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

For my education, how do we normally determine sensitivity? I noticed that in the test sensitivity is 0 and there's a TODO to get rid of it. Does that affect only the sensitivity in the test? Do we test that the algorithm works with sensitivity in another test?

It's changed by the setting changefeed.balance_range_distribution.sensitivity. Some of the other, larger tests in changefeed_dist_test.go use the default sensitivity of 5%.

I filed #120427. I think we should actually change it to some other sort of sensitivity (ex. if the number of ranges is below X, then don't rebalance or rebalance in a different way). Something like this would be more useful, but we need to do some thinking/design first.


pkg/sql/distsql_physical_planner.go line 1222 at r5 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

In all calls to MakeSpanPartition, haveRangeInfo is true. I would get rid of the bool param and just put true in the struct below.

Done. I changed the name as well.


pkg/ccl/changefeedccl/changefeed_dist_test.go line 268 at r6 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Nice test!

Thanks!


pkg/ccl/changefeedccl/changefeed_dist_test.go line 322 at r6 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

Is there another check here we can make around how many spans per partition there are? Assert some max threshold is not exceeded, maybe?

Encloses also checks the spans contained not just the count. I believe that I asked in slack how to check two sets of spans for equivalency and someone mentioned that doing the double enclose is the way to do it.

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @jayshrivastava, @miretskiy, and @renatolabs)


pkg/ccl/changefeedccl/changefeed_dist.go line 600 at r6 (raw file):

Previously, jayshrivastava (Jayant) wrote…

It's changed by the setting changefeed.balance_range_distribution.sensitivity. Some of the other, larger tests in changefeed_dist_test.go use the default sensitivity of 5%.

I filed #120427. I think we should actually change it to some other sort of sensitivity (ex. if the number of ranges is below X, then don't rebalance or rebalance in a different way). Something like this would be more useful, but we need to do some thinking/design first.

Thanks!


pkg/ccl/changefeedccl/changefeed_dist_test.go line 322 at r6 (raw file):

Previously, jayshrivastava (Jayant) wrote…

Encloses also checks the spans contained not just the count. I believe that I asked in slack how to check two sets of spans for equivalency and someone mentioned that doing the double enclose is the way to do it.

To clarify, my question is whether there is a way to verify that the algorithm rebalanced so that all partitions have an equal number of spans. Encloses just shows that the same set of spans exist before and after, right?

@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch 2 times, most recently from d73e39e to 4c8b301 Compare March 13, 2024 21:03
Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @miretskiy, @renatolabs, and @rharding6373)


pkg/ccl/changefeedccl/changefeed_dist_test.go line 322 at r6 (raw file):

Previously, rharding6373 (Rachael Harding) wrote…

To clarify, my question is whether there is a way to verify that the algorithm rebalanced so that all partitions have an equal number of spans. Encloses just shows that the same set of spans exist before and after, right?

Good point! I updated the code to check the range counts.

Copy link
Collaborator

@rharding6373 rharding6373 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Nice work!

Copy link
Collaborator

@andyyang890 andyyang890 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 11 files at r5, 1 of 5 files at r8.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @jayshrivastava, @miretskiy, @renatolabs, and @rharding6373)


pkg/ccl/changefeedccl/changefeed_dist.go line 602 at r13 (raw file):

	}

	//Create partition builder structs for the partitions array above.

nit: missing space


pkg/ccl/changefeedccl/changefeed_dist.go line 611 at r13 (raw file):

		// We cannot rebalance if we're missing range information.
		if !ok {
			log.Warning(ctx, "skipping rebalance due to missing range info")

Should we maybe fallback on the old code instead of not rebalancing if range info is missing? I worry that hypothetically if a bug caused the range info not to be populated, we would lose the ability to rebalance at all.


pkg/sql/distsql_physical_planner.go line 1209 at r11 (raw file):

	haveRangeInfo bool
	numRanges     int

Thoughts on making this field a *int so you don't need haveRangeInfo? If you do decide to go down that route, you could also have a numRangesAdd receiver to make the pointer dereferencing needed for adding a little simpler.


pkg/sql/distsql_physical_planner_test.go line 1237 at r11 (raw file):

				rangeCount += n
			}
			require.Equal(t, countRanges(partitions), rangeCount)

Should this test maybe assert equality for each partition instead of just the total?

@andyyang890 andyyang890 requested review from yuzefovich and removed request for miretskiy March 14, 2024 06:17
@yuzefovich yuzefovich requested a review from rharding6373 March 14, 2024 19:52
Copy link
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Reviewed 1 of 11 files at r5, 1 of 5 files at r8, 4 of 4 files at r11, 3 of 3 files at r12, 2 of 2 files at r13, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @herkolategan, @jayshrivastava, @miretskiy, @renatolabs, and @rharding6373)


pkg/ccl/changefeedccl/changefeed_dist.go line 682 at r12 (raw file):

	// Overwrite the original partitions slice with the balanced partitions.
	for _, b := range builders {
		partitions[b.pIdx] = sql.MakeSpanPartitionWithRangeCount(

For my own edification - IIUC we don't really need to store the updated range count information, right? In other words, we used this information during the rebalancing loop and we won't use it in the future, correct?


pkg/sql/distsql_physical_planner.go line 1371 at r11 (raw file):

	getSQLInstanceIDForKVNodeID func(roachpb.NodeID) (base.SQLInstanceID, SpanPartitionReason),
	ignoreMisplannedRanges *bool,
) (_ []SpanPartition, lastPartitionIdx int, err error) {

nit: why give the error return argument a name?


pkg/ccl/changefeedccl/changefeed_dist_test.go line 125 at r12 (raw file):

// TestPartitionSpans unit tests the rebalanceSpanPartitions function.
func TestPartitionSpans(t *testing.T) {
	defer leaktest.AfterTest(t)()

nit: now that the test logs more stuff consider adding defer log.Scope(t).Close(t).

Copy link
Contributor Author

@jayshrivastava jayshrivastava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @DrewKimball, @herkolategan, @miretskiy, @renatolabs, @rharding6373, and @yuzefovich)


pkg/ccl/changefeedccl/changefeed_dist.go line 682 at r12 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

For my own edification - IIUC we don't really need to store the updated range count information, right? In other words, we used this information during the rebalancing loop and we won't use it in the future, correct?

Yes.


pkg/ccl/changefeedccl/changefeed_dist.go line 611 at r13 (raw file):

Previously, andyyang890 (Andy Yang) wrote…

Should we maybe fallback on the old code instead of not rebalancing if range info is missing? I worry that hypothetically if a bug caused the range info not to be populated, we would lose the ability to rebalance at all.

I think this would only happen if someone made a code change, but tests would fail if such a change was made. I don't think it's possible for the info to be missing for any other reason. Distsql does it while updating the Spans in the partition:

func (dsp *DistSQLPlanner) partitionSpan(
. There's no way for spans to be there without range info. I left the warning there just in case but it really shouldn't happen.


pkg/sql/distsql_physical_planner.go line 1209 at r11 (raw file):

Previously, andyyang890 (Andy Yang) wrote…

Thoughts on making this field a *int so you don't need haveRangeInfo? If you do decide to go down that route, you could also have a numRangesAdd receiver to make the pointer dereferencing needed for adding a little simpler.

I don't really want to make this change for an int. Feels wrong to put the int field on the heap unnecessarily. I know that avro/json does it but I think that's because they really need to.


pkg/sql/distsql_physical_planner_test.go line 1237 at r11 (raw file):

Previously, andyyang890 (Andy Yang) wrote…

Should this test maybe assert equality for each partition instead of just the total?

It does right here

// Assert that the PartitionState is what we expect it to be.
tc.partitionState.testingOverrideRandomSelection = nil
planCtx.spanPartitionState.testingOverrideRandomSelection = nil
if !reflect.DeepEqual(*planCtx.spanPartitionState, tc.partitionState) {
t.Errorf("expected partition state:\n %v\ngot:\n %v",
tc.partitionState, *planCtx.spanPartitionState)
}
resMap := make(map[int][][2]string)
for _, p := range partitions {
if _, ok := resMap[int(p.SQLInstanceID)]; ok {
t.Fatalf("node %d shows up in multiple partitions", p.SQLInstanceID)
}
var spans [][2]string
for _, s := range p.Spans {
spans = append(spans, [2]string{string(s.Key), string(s.EndKey)})
}
resMap[int(p.SQLInstanceID)] = spans
}
recording := getRecAndFinish()
t.Logf("recording is %s", recording)
for _, expectedMsg := range tc.partitionStates {
require.NotEqual(t, -1, tracing.FindMsgInRecording(recording, expectedMsg))
}
if !reflect.DeepEqual(resMap, tc.partitions) {
t.Errorf("expected partitions:\n %v\ngot:\n %v", tc.partitions, resMap)
}

@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch from 4c8b301 to 64674cf Compare March 18, 2024 15:10
This change updates span partitioning to count ranges while making
partitions. This allows callers to rebalance partitions based on
range counts without having to iterate over the spans to count
ranges.

Release note: None
Epic: None
…ans)

Previously, the `rebalanceSpanPartitions` would use O(ranges) memory. This change
rewrites it to use range iterators, reducing the memory usage to O(spans).

This change also adds a randomized test to assert that all spans are accounted for after
rebalancing. It also adds one more unit test.

Informs: cockroachdb#113898
Epic: None
This change adds extra test coverage for partition rebalancing in
changefeeds. It adds checks which are performed after rebalancing
to assert that the output list of spans covers exactly the same keys
as the input list of spans. These checks are expensive so they only
run if the environment variable `COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS`
is true. This variable is true in cdc roachtests and unit tests.

Release note: None
Epic: None
@jayshrivastava jayshrivastava force-pushed the changefeed/planning-dist-2 branch from 64674cf to 8fee491 Compare March 18, 2024 17:17
@jayshrivastava
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented Mar 18, 2024

@craig craig bot merged commit 4deb9e3 into cockroachdb:master Mar 18, 2024
21 of 22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants