Skip to content

Commit

Permalink
Support columnar processing for FlatMapCoGroupInPandas[databricks] (#…
Browse files Browse the repository at this point in the history
…6751)

* Support columnar processing for FlatMapCoGroupInPandas

Signed-off-by: firestarman <[email protected]>
  • Loading branch information
firestarman authored Oct 14, 2022
1 parent 4777935 commit 03b1164
Show file tree
Hide file tree
Showing 24 changed files with 760 additions and 590 deletions.
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ Name | Description | Default Value | Notes
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
<a name="sql.exec.AggregateInPandasExec"></a>spark.rapids.sql.exec.AggregateInPandasExec|The backend for an Aggregation Pandas UDF, this accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.|true|None|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled|true|None|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports scheduling GPU resources for the Python process when enabled|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.|false|This is disabled by default because Performance is not ideal with many small groups|
<a name="sql.exec.FlatMapGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Flat Map Groups Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.|true|None|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.|true|None|
<a name="sql.exec.WindowInPandasExec"></a>spark.rapids.sql.exec.WindowInPandasExec|The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled. For now it only supports row based window frame.|false|This is disabled by default because it only supports row based frame for now|
Expand Down
48 changes: 48 additions & 0 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,30 @@ Accelerator supports are described below.
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">FlatMapCoGroupsInPandasExec</td>
<td rowspan="1">The backend for CoGrouped Aggregation Pandas UDF. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.</td>
<td rowspan="1">This is disabled by default because Performance is not ideal with many small groups</td>
<td>Input/Output</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td rowspan="1">FlatMapGroupsInPandasExec</td>
<td rowspan="1">The backend for Flat Map Groups Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.</td>
<td rowspan="1">None</td>
Expand Down Expand Up @@ -1204,6 +1228,30 @@ Accelerator supports are described below.
<td><b>NS</b></td>
</tr>
<tr>
<th>Executor</th>
<th>Description</th>
<th>Notes</th>
<th>Param(s)</th>
<th>BOOLEAN</th>
<th>BYTE</th>
<th>SHORT</th>
<th>INT</th>
<th>LONG</th>
<th>FLOAT</th>
<th>DOUBLE</th>
<th>DATE</th>
<th>TIMESTAMP</th>
<th>STRING</th>
<th>DECIMAL</th>
<th>NULL</th>
<th>BINARY</th>
<th>CALENDAR</th>
<th>ARRAY</th>
<th>MAP</th>
<th>STRUCT</th>
<th>UDT</th>
</tr>
<tr>
<td rowspan="2">WindowExec</td>
<td rowspan="2">Window-operator backend</td>
<td rowspan="2">None</td>
Expand Down
5 changes: 2 additions & 3 deletions integration_tests/src/main/python/udf_cudf_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,7 +37,7 @@
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType
from spark_session import with_cpu_session, with_gpu_session
from marks import allow_non_gpu, cudf_udf
from marks import cudf_udf


_conf = {
Expand Down Expand Up @@ -300,7 +300,6 @@ def gpu_run(spark):


# ======= Test CoGroup Map In Pandas =======
@allow_non_gpu('GpuFlatMapCoGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_cogroup(enable_cudf_udf):
def cpu_run(spark):
Expand Down
20 changes: 17 additions & 3 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
raise AssertionError("incorrect pyarrow version during required testing " + str(e))
pytestmark = pytest.mark.skip(reason=str(e))

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from data_gen import *
from marks import incompat, approximate_float, allow_non_gpu, ignore_order
from pyspark.sql import Window
Expand All @@ -44,7 +44,7 @@
arrow_udf_conf = {
'spark.sql.execution.arrow.pyspark.enabled': 'true',
'spark.rapids.sql.exec.WindowInPandasExec': 'true',
'spark.rapids.sql.exec.AggregateInPandasExec': 'true'
'spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec': 'true'
}

data_gens_nested_for_udf = arrow_array_gens + arrow_struct_gens
Expand Down Expand Up @@ -323,7 +323,6 @@ def create_df(spark, data_gen, left_length, right_length):


@ignore_order
@allow_non_gpu('FlatMapCoGroupsInPandasExec', 'PythonUDF', 'Alias')
@pytest.mark.parametrize('data_gen', [ShortGen(nullable=False)], ids=idfn)
def test_cogroup_apply_udf(data_gen):
def asof_join(l, r):
Expand All @@ -335,3 +334,18 @@ def do_it(spark):
right.groupby('a')).applyInPandas(
asof_join, schema="a int, b int")
assert_gpu_and_cpu_are_equal_collect(do_it, conf=arrow_udf_conf)


@ignore_order
@allow_non_gpu('FlatMapCoGroupsInPandasExec')
def test_cogroup_apply_fallback():
def asof_join(l, r):
return r

def do_it(spark):
left = two_col_df(spark, int_gen, int_gen, length=100)
right = two_col_df(spark, short_gen, int_gen, length=100)
return left.groupby('a').cogroup(
right.groupby('a')).applyInPandas(
asof_join, schema="a int, b int")
assert_gpu_fallback_collect(do_it, 'FlatMapCoGroupsInPandasExec', conf=arrow_udf_conf)
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package com.nvidia.spark.rapids.shims
import scala.collection.mutable.ArrayBuffer

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore

import org.apache.spark.TaskContext
import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.rapids.execution.python.{BatchQueue, GpuArrowPythonRunner, GpuPythonHelper, GpuPythonUDF, GpuWindowInPandasExecBase, GroupingIterator}
import org.apache.spark.sql.rapids.execution.python.{BatchQueue, CombiningIterator, GpuArrowPythonRunner, GpuPythonHelper, GpuPythonUDF, GpuWindowInPandasExecBase, GroupingIterator}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -86,11 +85,8 @@ case class GpuWindowInPandasExec(

// Override doExecuteColumnar so we use the correct GpuArrowPythonRunner
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = gpuLongMetric(NUM_INPUT_ROWS)
val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES)
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val spillCallback = GpuMetric.makeSpillCallback(allMetrics)
val (numInputRows, numInputBatches, numOutputRows, numOutputBatches,
spillCallback) = commonGpuMetrics()
val sessionLocalTimeZone = conf.sessionLocalTimeZone

// 1) Unwrap the expressions and build some info data:
Expand Down Expand Up @@ -228,45 +224,12 @@ case class GpuWindowInPandasExec(
/* The whole group data should be written in a single call, so here is unlimited */
Int.MaxValue,
spillCallback.semaphoreWaitTime,
() => queue.finish(),
pythonOutputSchema)
pythonOutputSchema,
() => queue.finish())

val outputBatchIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
new Iterator[ColumnarBatch] {
// for hasNext we are waiting on the queue to have something inserted into it
// instead of waiting for a result to be ready from python. The reason for this
// is to let us know the target number of rows in the batch that we want when reading.
// It is a bit hacked up but it works. In the future when we support spilling we should
// store the number of rows separate from the batch. That way we can get the target batch
// size out without needing to grab the GpuSemaphore which we cannot do if we might block
// on a read operation.
// Besides, when the queue is empty, need to call the `hasNext` of the out iterator to
// trigger reading and handling the control data followed with the stream data.
override def hasNext: Boolean = queue.hasNext || outputBatchIterator.hasNext

private [this] def combine(
origBatch: ColumnarBatch,
retBatch: ColumnarBatch): ColumnarBatch = {
val lColumns = GpuColumnVector.extractColumns(origBatch)
val rColumns = GpuColumnVector.extractColumns(retBatch)
new ColumnarBatch(lColumns.map(_.incRefCount()) ++ rColumns.map(_.incRefCount()),
origBatch.numRows())
}

override def next(): ColumnarBatch = {
val numRows = queue.peekBatchSize
// Update the expected batch size for next read
pyRunner.minReadTargetBatchSize = numRows
withResource(outputBatchIterator.next()) { cbFromPython =>
assert(cbFromPython.numRows() == numRows)
withResource(queue.remove()) { origBatch =>
numOutputBatches += 1
numOutputRows += numRows
projectResult(combine(origBatch, cbFromPython))
}
}
}
} // End of new Iterator
val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows,
numOutputBatches).map(projectResult(_))
} else {
// Empty partition, return the input iterator directly
inputIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.nvidia.spark.rapids._
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python._
import org.apache.spark.sql.execution.python.PythonUDFRunner
import org.apache.spark.sql.rapids.execution.python.{BufferToStreamWriter, GpuArrowPythonRunner}
import org.apache.spark.sql.rapids.execution.python.{BufferToStreamWriter, GpuArrowPythonRunner, GpuPythonArrowOutput, GpuPythonRunnerBase}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
Expand All @@ -54,12 +54,10 @@ class GpuGroupUDFArrowPythonRunner(
timeZoneId: String,
conf: Map[String, String],
batchSize: Long,
semWait: GpuMetric,
onDataWriteFinished: () => Unit,
override val pythonOutSchema: StructType,
minReadTargetBatchSize: Int)
extends GpuArrowPythonRunner(funcs, evalType, argOffsets, pythonInSchema, timeZoneId, conf,
batchSize, semWait, onDataWriteFinished, pythonOutSchema, minReadTargetBatchSize) {
val semWait: GpuMetric,
val pythonOutSchema: StructType)
extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets)
with GpuPythonArrowOutput {

protected override def newWriterThread(
env: SparkEnv,
Expand Down Expand Up @@ -91,7 +89,7 @@ class GpuGroupUDFArrowPythonRunner(
GpuSemaphore.releaseIfNecessary(TaskContext.get())
})
// Flatten the names of nested struct columns, required by cudf Arrow IPC writer.
flattenNames(pythonInSchema).foreach { case (name, nullable) =>
GpuArrowPythonRunner.flattenNames(pythonInSchema).foreach { case (name, nullable) =>
if (nullable) {
builder.withColumnNames(name)
} else {
Expand Down Expand Up @@ -121,19 +119,8 @@ class GpuGroupUDFArrowPythonRunner(
// tell serializer we are done
dataOut.writeInt(0)
dataOut.flush()
if (onDataWriteFinished != null) onDataWriteFinished()
}
}

private def flattenNames(d: DataType, nullable: Boolean=true): Seq[(String, Boolean)] =
d match {
case s: StructType =>
s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable))
case m: MapType =>
flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable)
case a: ArrayType => flattenNames(a.elementType, nullable)
case _ => Nil
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,7 @@ case class GpuFlatMapGroupsInPandasExec(
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
spillCallback.semaphoreWaitTime,
onDataWriteFinished = null,
pythonOutputSchema,
// We can not assert the result batch from Python has the same row number with the
// input batch. Because Grouped Map UDF allows the output of arbitrary length.
// So try to read as many as possible by specifying `minReadTargetBatchSize` as
// `Int.MaxValue` here.
Int.MaxValue)
pythonOutputSchema)

executePython(pyInputIter, localOutput, pyRunner, mNumOutputRows, mNumOutputBatches)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ import com.nvidia.spark.rapids.Arm

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.BasePythonRunner
import org.apache.spark.sql.vectorized.ColumnarBatch

// pid is not a constructor argument in 30x and 31x
abstract class ShimBasePythonRunner[IN, OUT](
funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions],
evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]]
) extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets)
) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets)
with Arm {
protected abstract class ShimReaderIterator(
stream: DataInputStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ import com.nvidia.spark.rapids.Arm

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.BasePythonRunner
import org.apache.spark.sql.vectorized.ColumnarBatch

// pid is not a constructor argument in 30x and 31x
abstract class ShimBasePythonRunner[IN, OUT](
funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions],
evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]]
) extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets)
) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets)
with Arm {
protected abstract class ShimReaderIterator(
stream: DataInputStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,7 @@ case class GpuFlatMapGroupsInPandasExec(
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
spillCallback.semaphoreWaitTime,
onDataWriteFinished = null,
pythonOutputSchema,
// We can not assert the result batch from Python has the same row number with the
// input batch. Because Grouped Map UDF allows the output of arbitrary length.
// So try to read as many as possible by specifying `minReadTargetBatchSize` as
// `Int.MaxValue` here.
Int.MaxValue)
pythonOutputSchema)
} else {
new GpuArrowPythonRunner(
chainedFunc,
Expand All @@ -171,9 +165,7 @@ case class GpuFlatMapGroupsInPandasExec(
pythonRunnerConf,
Int.MaxValue,
spillCallback.semaphoreWaitTime,
onDataWriteFinished = null,
pythonOutputSchema,
Int.MaxValue)
pythonOutputSchema)
}

executePython(pyInputIter, localOutput, pyRunner, mNumOutputRows, mNumOutputBatches)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import com.nvidia.spark.rapids.Arm

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.BasePythonRunner
import org.apache.spark.sql.vectorized.ColumnarBatch

abstract class ShimBasePythonRunner[IN, OUT](
funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions],
evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]]
) extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets)
) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets)
with Arm {
protected abstract class ShimReaderIterator(
stream: DataInputStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,7 @@ case class GpuFlatMapGroupsInPandasExec(
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
spillCallback.semaphoreWaitTime,
onDataWriteFinished = null,
pythonOutputSchema,
// We can not assert the result batch from Python has the same row number with the
// input batch. Because Grouped Map UDF allows the output of arbitrary length.
// So try to read as many as possible by specifying `minReadTargetBatchSize` as
// `Int.MaxValue` here.
Int.MaxValue)
pythonOutputSchema)
} else {
new GpuArrowPythonRunner(
chainedFunc,
Expand All @@ -172,9 +166,7 @@ case class GpuFlatMapGroupsInPandasExec(
pythonRunnerConf,
Int.MaxValue,
spillCallback.semaphoreWaitTime,
onDataWriteFinished = null,
pythonOutputSchema,
Int.MaxValue)
pythonOutputSchema)
}

executePython(pyInputIter, localOutput, pyRunner, mNumOutputRows, mNumOutputBatches)
Expand Down
Loading

0 comments on commit 03b1164

Please sign in to comment.