-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filter rows with null keys when coalescing due to reaching cuDF row limits [databricks] #5531
Conversation
…imits Signed-off-by: Alessandro Bellina <[email protected]>
eb08d44
to
b9ca330
Compare
val cb = if (inputFilterExpression.isDefined) { | ||
// If we have reached the cuDF limit once, proactively filter batches | ||
// after that first limit is reached. | ||
GpuFilter.apply(cbFromIter, inputFilterExpression.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit the apply is not needed.
GpuFilter(cbFromITer, inputFilterExpression.get)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be done.
@@ -351,7 +373,14 @@ abstract class AbstractGpuCoalesceIterator( | |||
|
|||
// there is a hard limit of 2^31 rows | |||
while (numRows < Int.MaxValue && !hasOnDeck && iter.hasNext) { | |||
closeOnExcept(iter.next()) { cb => | |||
closeOnExcept(iter.next()) { cbFromIter => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The closing of this appears to be off by a bit in error cases. GpuFilter.apply closes the input batch. Ideally we should rename it to make it clear what it is doing. So after that happens if an exception is thrown we will get a double close on cbFromIter. This is kind of minor, but the result of the filter cb
now is left unprotected if an exception is thrown and would leak.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have taken care of two of these cases, but I am still left with a potential double close on cb
: 751dce0#diff-26bc5860b4878c986610d72135b63fedf0051e84e2a89c61a8df18aea942e139R417
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem is GpuFilter and us relying on it to close things for us. We should have a version that does not close and then we can have clearly defined boundaries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that makes sense. Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I may need to cover one more case, and that is where we still have I am looking into it. Before I PRed this I had also changed |
Ok, the reason for what I am seeing with some |
build |
triggering the databricks CI |
build |
build |
…DF row limits [databricks] (NVIDIA#5531)" This reverts commit 5f33368.
This PR is a workaround to handle cases where we would go above the cuDF row limit when coalescing a build-side batch in the hash join, but we have a chance to potentially rescue things if the batch is built mostly of nulls. The real fix is to not materialize those rows with null keys (https://issues.apache.org/jira/browse/SPARK-39131), but that fix in the logical plan introduces other issues that we don't have an answer to yet.
Posting this as a draft to get some comments on the implementation. Specifically on the join types that this applies to, it would be nice to get another pair of eyes there.
I'll look into adding a test for this.