Skip to content

Commit

Permalink
Fix asymmetric join crash when stream side is empty (#11411)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <[email protected]>
  • Loading branch information
jlowe authored Aug 30, 2024
1 parent db1d580 commit ee2049a
Showing 1 changed file with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -901,19 +901,26 @@ object GpuShuffledAsymmetricHashJoinExec {
concatTime = metrics(CONCAT_TIME),
opTime = metrics(OP_TIME),
opName = "stream as build")
val streamBatch = streamBatchIter.next()
val singleStreamIter = new SingleGpuColumnarBatchIterator(streamBatch)
assert(!streamBatchIter.hasNext, "stream side not exhausted")
val streamStats = JoinBuildSideStats.fromBatch(streamBatch, exprs.boundStreamKeys)
if (buildStats.streamMagnificationFactor <
streamStats.streamMagnificationFactor) {
metrics(BUILD_DATA_SIZE).set(buildSize)
JoinInfo(joinType, buildSide, buildIter, buildSize, Some(buildStats),
singleStreamIter, exprs)
if (streamBatchIter.hasNext) {
val streamBatch = streamBatchIter.next()
val singleStreamIter = new SingleGpuColumnarBatchIterator(streamBatch)
assert(!streamBatchIter.hasNext, "stream side not exhausted")
val streamStats = JoinBuildSideStats.fromBatch(streamBatch, exprs.boundStreamKeys)
if (buildStats.streamMagnificationFactor <
streamStats.streamMagnificationFactor) {
metrics(BUILD_DATA_SIZE).set(buildSize)
JoinInfo(joinType, buildSide, buildIter, buildSize, Some(buildStats),
singleStreamIter, exprs)
} else {
metrics(BUILD_DATA_SIZE).set(streamSize)
val flippedSide = flipped(buildSide)
JoinInfo(joinType, flippedSide, singleStreamIter, streamSize, Some(streamStats),
buildIter, exprs.flipped(joinType, flippedSide, condition))
}
} else {
metrics(BUILD_DATA_SIZE).set(streamSize)
val flippedSide = flipped(buildSide)
JoinInfo(joinType, flippedSide, singleStreamIter, streamSize, Some(streamStats),
JoinInfo(joinType, flippedSide, streamBatchIter, streamSize, None,
buildIter, exprs.flipped(joinType, flippedSide, condition))
}
} else {
Expand Down

0 comments on commit ee2049a

Please sign in to comment.