Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make fetch shuffle partition data in parallel #256

Merged
merged 3 commits into from
Sep 27, 2022

Conversation

yahoNanJing
Copy link
Contributor

Which issue does this PR close?

Closes #208.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@yahoNanJing yahoNanJing mentioned this pull request Sep 21, 2022
18 tasks
@yahoNanJing
Copy link
Contributor Author

Hi @andygrove, @thinkharderdev, @avantgardnerio, could you help review this PR?

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question but otherwise LGTM

Comment on lines +125 to +132
let mut partition_locations: Vec<PartitionLocation> = partition_locations
.into_values()
.flat_map(|ps| ps.into_iter().enumerate())
.sorted_by(|(p1_idx, _), (p2_idx, _)| Ord::cmp(p1_idx, p2_idx))
.map(|(_, p)| p)
.collect();
// Shuffle partitions for evenly send fetching partition requests to avoid hot executors within multiple tasks
partition_locations.shuffle(&mut thread_rng());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we sort by index before shuffling here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorting first may be helpful for reducing the bias of random chosen. Maybe it's not necessary.

Comment on lines +115 to +116
// TODO make the maximum size configurable, or make it depends on global memory control
let max_request_num = 50usize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for making this a config option that is documented in the user guide. I can help with this in a follow-on PR.

.try_flatten()
});
let task_id = context.task_id().unwrap_or_else(|| partition.to_string());
info!("ShuffleReaderExec::execute({})", task_id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could make the logs quite noisy again?

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @yahoNanJing!

@mingmwang
Copy link
Contributor

@yahoNanJing Could you please explain a little about the error handling case ?
For example, if one of the parallel fetch failed, what happened to the others ?
And what happened if there are multiple parallel fetch failures, will all the error info correctly propagated to the stream consumer ?

@yahoNanJing
Copy link
Contributor Author

@mingmwang Just added one commit to abort fast when error occurs.

@yahoNanJing yahoNanJing merged commit 913f675 into apache:master Sep 27, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make executor fetch shuffle partition data in parallel
5 participants