Skip to content

Commit

Permalink
Fixed issue with filtering before the key project and not after
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 committed Sep 12, 2020
1 parent 6112e1b commit fa1215b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,12 @@ trait GpuHashJoin extends GpuExec with HashJoin {
builtBatch
}

private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch =
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch =
if (shouldFilterStreamTableForNulls) {
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
} else {
streamedBatch
}

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
Expand All @@ -197,11 +201,7 @@ trait GpuHashJoin extends GpuExec with HashJoin {
override def hasNext: Boolean = {
while (nextCb.isEmpty && (first || stream.hasNext)) {
if (stream.hasNext) {
val cb = if (shouldFilterStreamTableForNulls) {
filterStreamedTable(stream.next())
} else {
stream.next()
}
val cb = stream.next()
val startTime = System.nanoTime()
nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows,
numOutputBatches, joinTime, filterTime)
Expand Down Expand Up @@ -239,16 +239,14 @@ trait GpuHashJoin extends GpuExec with HashJoin {
joinTime: SQLMetric,
filterTime: SQLMetric): Option[ColumnarBatch] = {

val streamedTable = try {
val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys)
try {
val combined = combine(streamedKeysBatch, streamedBatch)
GpuColumnVector.from(combined)
} finally {
streamedKeysBatch.close()
val combined = withResource(streamedBatch) { streamedBatch =>
withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) {
streamedKeysBatch =>
GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch))
}
} finally {
streamedBatch.close()
}
val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb =>
GpuColumnVector.from(cb)
}

val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,24 @@ trait GpuHashJoin extends GpuExec with HashJoin {
private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch =
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch =
if (shouldFilterStreamTableForNulls) {
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
} else {
streamedBatch
}

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
Expand All @@ -193,11 +202,7 @@ trait GpuHashJoin extends GpuExec with HashJoin {
override def hasNext: Boolean = {
while (nextCb.isEmpty && (first || stream.hasNext)) {
if (stream.hasNext) {
val cb = if (shouldFilterStreamTableForNulls) {
filterStreamedTable(stream.next())
} else {
stream.next()
}
val cb = stream.next()
val startTime = System.nanoTime()
nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows,
numOutputBatches, joinTime, filterTime)
Expand Down Expand Up @@ -235,16 +240,14 @@ trait GpuHashJoin extends GpuExec with HashJoin {
joinTime: SQLMetric,
filterTime: SQLMetric): Option[ColumnarBatch] = {

val streamedTable = try {
val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys)
try {
val combined = combine(streamedKeysBatch, streamedBatch)
GpuColumnVector.from(combined)
} finally {
streamedKeysBatch.close()
val combined = withResource(streamedBatch) { streamedBatch =>
withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) {
streamedKeysBatch =>
GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch))
}
} finally {
streamedBatch.close()
}
val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb =>
GpuColumnVector.from(cb)
}

val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,24 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen {
private[this] lazy val streamedTableNullFilterExpression: GpuExpression =
mkNullFilterExpr(gpuStreamedKeys)

/**
* Filter the builtBatch if needed. builtBatch will be closed.
* @param builtBatch
* @return
*/
def filterBuiltTableIfNeeded(builtBatch: ColumnarBatch): ColumnarBatch =
if (shouldFilterBuiltTableForNulls) {
GpuFilter(builtBatch, builtTableNullFilterExpression)
} else {
builtBatch
}

private[this] def filterStreamedTable(streamedBatch:ColumnarBatch): ColumnarBatch =
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
private[this] def filterStreamedTableIfNeeded(streamedBatch:ColumnarBatch): ColumnarBatch =
if (shouldFilterStreamTableForNulls) {
GpuFilter(streamedBatch, streamedTableNullFilterExpression)
} else {
streamedBatch
}

def doJoin(builtTable: Table,
stream: Iterator[ColumnarBatch],
Expand All @@ -193,11 +202,7 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen {
override def hasNext: Boolean = {
while (nextCb.isEmpty && (first || stream.hasNext)) {
if (stream.hasNext) {
val cb = if (shouldFilterStreamTableForNulls) {
filterStreamedTable(stream.next())
} else {
stream.next()
}
val cb = stream.next()
val startTime = System.nanoTime()
nextCb = doJoin(builtTable, cb, boundCondition, joinOutputRows, numOutputRows,
numOutputBatches, joinTime, filterTime)
Expand Down Expand Up @@ -235,16 +240,14 @@ trait GpuHashJoin extends GpuExec with HashJoinWithoutCodegen {
joinTime: SQLMetric,
filterTime: SQLMetric): Option[ColumnarBatch] = {

val streamedTable = try {
val streamedKeysBatch = GpuProjectExec.project(streamedBatch, gpuStreamedKeys)
try {
val combined = combine(streamedKeysBatch, streamedBatch)
GpuColumnVector.from(combined)
} finally {
streamedKeysBatch.close()
val combined = withResource(streamedBatch) { streamedBatch =>
withResource(GpuProjectExec.project(streamedBatch, gpuStreamedKeys)) {
streamedKeysBatch =>
GpuHashJoin.incRefCount(combine(streamedKeysBatch, streamedBatch))
}
} finally {
streamedBatch.close()
}
val streamedTable = withResource(filterStreamedTableIfNeeded(combined)) { cb =>
GpuColumnVector.from(cb)
}

val nvtxRange = new NvtxWithMetrics("hash join", NvtxColor.ORANGE, joinTime)
Expand Down

0 comments on commit fa1215b

Please sign in to comment.