Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sub-partitioning supports repartitioning the input data multiple times #7996

Merged
merged 5 commits into from
Apr 10, 2023

Conversation

firestarman
Copy link
Collaborator

@firestarman firestarman commented Apr 3, 2023

closes #7911

This adds in the support of repartitioning the input data multiple times with different hash seeds to sub-partitioning when the initial partition number is not big enough to over partition data. It will also calculate the actual partition number needed for the over partitioning.

@firestarman
Copy link
Collaborator Author

build

* Always return the current seed.
* This is intended to share the same seed across sub-partitioners.
*/
override final def nextSeed: Int = seedGenerator.currentSeed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is confusing. I would prefer it if we had a way to pass the seed directly into a sub-partitioner instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

private[this] def needRepartition(parts: SubPartitionBuffer): Boolean = {
// FIXME Is it good enough to ask for repartitioning when there exists any sub
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would want to stop for a given batch if we try to partition the batch and it didn't change. Meaning we tried to partition it N ways and we got back N-1 empty batches and 1 batch with all of the data in it.

That said I think we only need to do two passes through the data at most. The algorithm I see would be something like the following.

  • If the build side < target batch size
    • do the join as normal
  • else
    • partition the build/stream sides according to the configured number of splits
    • group build partitions to try and be just under target batch size and do the join for all build partitions that met this goal
    • for each build partition > target batch size
      • num_partitions = ceil(build size / target batch size)
      • partition the build and stream batches using the new seed and the calculated number of partitions
      • group partitions together again like before and do the join no matter how large the size is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@sameerz sameerz added the reliability Features to improve reliability or bugs that severly impact the reliability of the plugin label Apr 7, 2023
Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
@firestarman firestarman requested a review from revans2 April 7, 2023 09:06
@firestarman
Copy link
Collaborator Author

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, but I would like to have at least one other person look at this too.

Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had a non-blocking comment.

pair = Some(new PartitionPair(buildBatch, streamBatches))
}
}
} else if (bigBuildBatches.nonEmpty) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way this seems to be working is that big batches are handled later, after the original sub partitioner iterators are flushed. Do I understand that correctly? If so I wonder if it makes sense to stop pulling from the sub partitioner, and give priority to the big batches temporarily, because that should alleviate the need to spill all the batches in bigBuildBatches.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the big batches come later. That is what I suggested we do above #7996 (comment)

They come later because it allows us to know their size and repartition them in a single pass instead of needing to possibly split it more than twice. I think we might be able to do something similar to what you want. But we would still have to process the entire build side first. Once we have pulled in the build side and repartitioned it the first time, we could then know which batches we would want to repartition a second time and how many sub-partitions we would need. But then we have to hand the same plan over to the steam side partitioner. We would have to be able to tell it what the seed is for the first partition pass and how many partitions to use, along with what partitions would need a second pass and the corresponding seeds and number of partitions for each of them.

It would be doable, but it is a fairly large change, and it would be a performance improvement when we are in a memory constrained situation. So we would want to have a few queries that exhibited this behavior that we could use to benchmark it.

That all sounds rather involved and I think it would be best to just file a follow on issue to look at it and we can the prioritize it in the backlog according to what management decides.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds good to me. I filed #8057

@firestarman firestarman merged commit 19a658f into NVIDIA:branch-23.06 Apr 10, 2023
@firestarman firestarman deleted the multi-sub-part branch April 10, 2023 05:02
abellina pushed a commit to abellina/spark-rapids that referenced this pull request Apr 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support re-partitioning large data multiple times and each time with a different seed.
4 participants