-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add in basic support for OOM retry for project and filter #7864
Conversation
Signed-off-by: Robert (Bobby) Evans <[email protected]>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBoundAttribute.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
@@ -332,34 +408,14 @@ case class GpuProjectAstExec( | |||
} | |||
case exprSet :: tail => | |||
val projectCb = withResource(new NvtxRange("project tier", NvtxColor.ORANGE)) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems we should closeOnExcept(cb)
while doing the project
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
@abellina I added a few tests and addressed your review feedback. I think I am done at this point. I cannot easily test retry on non-deterministic project. It is a rare enough case that I decided not to test it outside of normal integration tests, but if you want me to I am happy to try and add something in. |
build |
build |
vulnerability scan is failing with a docker error where a checksum is not registered. Not sure how to work around this... |
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala
Outdated
Show resolved
Hide resolved
Overall I just had a question on the temporary batches as we recurse, the rest looks good to me. It would be great to get @jbrennan333's feedback though. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I ran some performance tests locally. I need to do it in a more stable environment, but it looks to be around a 3% performance regression on NDS with this patch. I think that is acceptable especially if we have a plan on how to improve the likely culprit (extra contig split calls to make the data spillable). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. Just one minor nit.
}, inputTiers) | ||
input: AttributeSeq, | ||
runTiered: Boolean): GpuTieredProject = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish I had thought to move pass runTiered like this. Definitely cleans up the call sites.
val newExprTiers = exprTiers.zipWithIndex.map { | ||
case (exprTier, index) => | ||
// get what the output should look like. | ||
val atInput = index + 1 | ||
if (atInput < inputTiers.length) { | ||
inputTiers(atInput).attrs.map { attr => | ||
exprTier.find { expr => | ||
expr.asInstanceOf[NamedExpression].toAttribute == attr | ||
}.getOrElse(attr) | ||
} | ||
} else { | ||
exprTier | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice refactoring of the code to remove the unneeded columns - simplifies the project code.
} | ||
} else { | ||
@tailrec | ||
def recurse(boundExprs: Seq[Seq[GpuExpression]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) maybe stylistic, but I'd prefer if this recurse inner function were moved up to the top of this function.
build |
This adds in basic support for retry for project and filter. It specifically adds that retry in for hash aggregate pre and post processing. The pre-processing can be quite memory intensive and in our testing is likely to be one of the places where GPU OOM errors often happen.
This is not 100% ready to go in. I am working on some unit tests for the code. I have manually tested it, but I would like some more automated tests.
There is still a lot of follow on work to do too.