-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support SortMergeJoin spilling #11218
Conversation
all existing spilling tests are okay, I will add 3 more tests to test the spilling |
Multi batch spill tests still fails |
All initial tests passed, I'm planning to add more tests related to result correctness in separate PR |
"Spill file {:?} does not exist", | ||
spill.path() | ||
))); | ||
return internal_err!("Spill file {:?} does not exist", spill.path()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a clean up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drive by cleanups: 👍
I will review this in next few days. |
TestCase::new() | ||
.with_query( | ||
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", | ||
) | ||
.with_memory_limit(1_000) | ||
.with_config(config) | ||
.with_disk_manager_config(DiskManagerConfig::NewOs) | ||
.run() | ||
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder how do we know if it triggers spilling or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that is great idea, I was overthinking how to check that file spilled to disk but metrics is much easier, I'm adding it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya I added metrics tests in sort_merge_join.rs
like https://github.com/apache/datafusion/pull/11218/files#diff-825342e035aec56595dce761afb00dd54e3ae663a2e24ebf3a597123e636f9e2R3140
For this exact test which runs on SQL level I'm thinking if I can access metrics some how
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesnt seem possible to access any metrics in this case. We can rely that if test with disabled spill is failing on mem issues, then the same test with enabled spilling is passing. Hope that is enough
I plan to review this PR later today -- sorry for the delay |
} | ||
|
||
#[tokio::test] | ||
async fn sort_merge_join_spill() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case can only make sure the query can run, it may or may not be spilling.
We should have some ways to verify the spilling is actually happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunately exactly this test case we cannot access any spilling metrics, but there is another test above sort_merge_join_no_spill
which is exactly the same but expectedly fails by mem issue and have the spilling disabled explicitly. This test passes without issues and with spilling enabled so we can conclude the spilling happened.
self.join_type, | ||
on, | ||
self.filter | ||
.as_ref() | ||
.map(|f| format!(", filter={}", f.expression())) | ||
.unwrap_or("".to_string()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why moving the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inlined the display filter and changed map_or_else
to map with default
if buffered_batch.spill_file.is_none() && buffered_batch.batch.is_some() { | ||
self.reservation | ||
.try_shrink(buffered_batch.size_estimation)?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also handle else
cases, i.e., spilling file is Some
and batch
is also Some
, and both are None
, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think those cases are not possible but the current code doesn't make that clear
Here is a proposal that I think makes it clearer what states are possible: comphead#297
// If the batch was spilled to disk, less likely | ||
(Some(spill_file), None) => { | ||
let mut buffered_cols: Vec<ArrayRef> = | ||
Vec::with_capacity(buffered_indices.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buffered_indices.len()
is the length of arrays. I think the capacity should be the number of columns of the batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the should be the same right? the take
kernel will check the bounds
/// Spill the `RecordBatch` to disk as smaller batches | ||
/// split by `batch_size_rows` | ||
/// Return `total_rows` what is spilled | ||
pub fn spill_record_batch_by_size( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this function used other than in test? I don't find it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it may be left over from an earlier version of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Im planning to keep it and reuse it in row_hash in following PR, basically the subbatch slicing is from row_hash.rs
Thanks @viirya for your review, I'll address the comments today/tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @comphead and @viirya
I think this code is now correct, though I also think it could be improved (both with the comments from @viirya , my suggestion in comphead#297 as well as more testing)
Specifically, for testing, given the subtlety of the code involved I am not 100% sure it works for all corner cases. I suggest (as a follow on) we invest in fuzz testing both for SMJ in general as well as for spilling SMJ
I think in particular, making sure we adjust the random inputs to have different numbers of repeated values (as the code in this PR is only going to be exercised when there are many of the same join keys I think)
/// Spill the `RecordBatch` to disk as smaller batches | ||
/// split by `batch_size_rows` | ||
/// Return `total_rows` what is spilled | ||
pub fn spill_record_batch_by_size( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it may be left over from an earlier version of this PR
@@ -565,7 +583,7 @@ impl StreamedBatch { | |||
#[derive(Debug)] | |||
struct BufferedBatch { | |||
/// The buffered record batch | |||
pub batch: RecordBatch, | |||
pub batch: Option<RecordBatch>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While reviewing this PR, I found having to reason about what the valid batch
or spill_file
combinations was confusing (like there is an invariant I think that they can't both be Some)
Rather than use two fields, I tried making an enum that encoded the state and I thought it was easier to reason about. Here is a proposal here: comphead#297
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its great idea. I'll include this to follow up to simplify double option check in favor of enum.
if buffered_batch.spill_file.is_none() && buffered_batch.batch.is_some() { | ||
self.reservation | ||
.try_shrink(buffered_batch.size_estimation)?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think those cases are not possible but the current code doesn't make that clear
Here is a proposal that I think makes it clearer what states are possible: comphead#297
Filed #11541 |
* Support SortMerge spilling
* Support SortMerge spilling
Which issue does this PR close?
Closes #9359 .
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?