diff --git a/docs/configs.md b/docs/configs.md
index 708fcbbfb06..c42b009fd20 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -415,7 +415,7 @@ Name | Description | Default Value | Notes
spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
spark.rapids.sql.exec.AggregateInPandasExec|The backend for an Aggregation Pandas UDF, this accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.|true|None|
spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled|true|None|
-spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports scheduling GPU resources for the Python process when enabled|false|This is disabled by default because Performance is not ideal now|
+spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.|false|This is disabled by default because Performance is not ideal with many small groups|
spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Flat Map Groups Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.|true|None|
spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled.|true|None|
spark.rapids.sql.exec.WindowInPandasExec|The backend for Window Aggregation Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled. For now it only supports row based window frame.|false|This is disabled by default because it only supports row based frame for now|
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index c890ed89f74..fb0c79d7611 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -1132,6 +1132,30 @@ Accelerator supports are described below.
NS |
+FlatMapCoGroupsInPandasExec |
+The backend for CoGrouped Aggregation Pandas UDF. Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled. |
+This is disabled by default because Performance is not ideal with many small groups |
+Input/Output |
+S |
+S |
+S |
+S |
+S |
+S |
+S |
+S |
+PS UTC is only supported TZ for TIMESTAMP |
+S |
+NS |
+NS |
+NS |
+NS |
+NS |
+NS |
+NS |
+NS |
+
+
FlatMapGroupsInPandasExec |
The backend for Flat Map Groups Pandas UDF, Accelerates the data transfer between the Java process and the Python process. It also supports scheduling GPU resources for the Python process when enabled. |
None |
@@ -1204,6 +1228,30 @@ Accelerator supports are described below.
NS |
+Executor |
+Description |
+Notes |
+Param(s) |
+BOOLEAN |
+BYTE |
+SHORT |
+INT |
+LONG |
+FLOAT |
+DOUBLE |
+DATE |
+TIMESTAMP |
+STRING |
+DECIMAL |
+NULL |
+BINARY |
+CALENDAR |
+ARRAY |
+MAP |
+STRUCT |
+UDT |
+
+
WindowExec |
Window-operator backend |
None |
diff --git a/integration_tests/src/main/python/udf_cudf_test.py b/integration_tests/src/main/python/udf_cudf_test.py
index 7fbf76664a8..ead92abb832 100644
--- a/integration_tests/src/main/python/udf_cudf_test.py
+++ b/integration_tests/src/main/python/udf_cudf_test.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2020-2021, NVIDIA CORPORATION.
+# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -37,7 +37,7 @@
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType
from spark_session import with_cpu_session, with_gpu_session
-from marks import allow_non_gpu, cudf_udf
+from marks import cudf_udf
_conf = {
@@ -300,7 +300,6 @@ def gpu_run(spark):
# ======= Test CoGroup Map In Pandas =======
-@allow_non_gpu('GpuFlatMapCoGroupsInPandasExec','PythonUDF')
@cudf_udf
def test_cogroup(enable_cudf_udf):
def cpu_run(spark):
diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py
index 4cb1a72ee3f..aa25f45480a 100644
--- a/integration_tests/src/main/python/udf_test.py
+++ b/integration_tests/src/main/python/udf_test.py
@@ -32,7 +32,7 @@
raise AssertionError("incorrect pyarrow version during required testing " + str(e))
pytestmark = pytest.mark.skip(reason=str(e))
-from asserts import assert_gpu_and_cpu_are_equal_collect
+from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from data_gen import *
from marks import incompat, approximate_float, allow_non_gpu, ignore_order
from pyspark.sql import Window
@@ -44,7 +44,7 @@
arrow_udf_conf = {
'spark.sql.execution.arrow.pyspark.enabled': 'true',
'spark.rapids.sql.exec.WindowInPandasExec': 'true',
- 'spark.rapids.sql.exec.AggregateInPandasExec': 'true'
+ 'spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec': 'true'
}
data_gens_nested_for_udf = arrow_array_gens + arrow_struct_gens
@@ -323,7 +323,6 @@ def create_df(spark, data_gen, left_length, right_length):
@ignore_order
-@allow_non_gpu('FlatMapCoGroupsInPandasExec', 'PythonUDF', 'Alias')
@pytest.mark.parametrize('data_gen', [ShortGen(nullable=False)], ids=idfn)
def test_cogroup_apply_udf(data_gen):
def asof_join(l, r):
@@ -335,3 +334,18 @@ def do_it(spark):
right.groupby('a')).applyInPandas(
asof_join, schema="a int, b int")
assert_gpu_and_cpu_are_equal_collect(do_it, conf=arrow_udf_conf)
+
+
+@ignore_order
+@allow_non_gpu('FlatMapCoGroupsInPandasExec')
+def test_cogroup_apply_fallback():
+ def asof_join(l, r):
+ return r
+
+ def do_it(spark):
+ left = two_col_df(spark, int_gen, int_gen, length=100)
+ right = two_col_df(spark, short_gen, int_gen, length=100)
+ return left.groupby('a').cogroup(
+ right.groupby('a')).applyInPandas(
+ asof_join, schema="a int, b int")
+ assert_gpu_fallback_collect(do_it, 'FlatMapCoGroupsInPandasExec', conf=arrow_udf_conf)
diff --git a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala
index eeef0f8685e..27abaaa834f 100644
--- a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala
+++ b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/GpuWindowInPandasExec.scala
@@ -19,7 +19,6 @@ package com.nvidia.spark.rapids.shims
import scala.collection.mutable.ArrayBuffer
import com.nvidia.spark.rapids._
-import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import org.apache.spark.TaskContext
@@ -27,7 +26,7 @@ import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.rapids.execution.python.{BatchQueue, GpuArrowPythonRunner, GpuPythonHelper, GpuPythonUDF, GpuWindowInPandasExecBase, GroupingIterator}
+import org.apache.spark.sql.rapids.execution.python.{BatchQueue, CombiningIterator, GpuArrowPythonRunner, GpuPythonHelper, GpuPythonUDF, GpuWindowInPandasExecBase, GroupingIterator}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -86,11 +85,8 @@ case class GpuWindowInPandasExec(
// Override doExecuteColumnar so we use the correct GpuArrowPythonRunner
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val numInputRows = gpuLongMetric(NUM_INPUT_ROWS)
- val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES)
- val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
- val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
- val spillCallback = GpuMetric.makeSpillCallback(allMetrics)
+ val (numInputRows, numInputBatches, numOutputRows, numOutputBatches,
+ spillCallback) = commonGpuMetrics()
val sessionLocalTimeZone = conf.sessionLocalTimeZone
// 1) Unwrap the expressions and build some info data:
@@ -228,45 +224,12 @@ case class GpuWindowInPandasExec(
/* The whole group data should be written in a single call, so here is unlimited */
Int.MaxValue,
spillCallback.semaphoreWaitTime,
- () => queue.finish(),
- pythonOutputSchema)
+ pythonOutputSchema,
+ () => queue.finish())
- val outputBatchIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
- new Iterator[ColumnarBatch] {
- // for hasNext we are waiting on the queue to have something inserted into it
- // instead of waiting for a result to be ready from python. The reason for this
- // is to let us know the target number of rows in the batch that we want when reading.
- // It is a bit hacked up but it works. In the future when we support spilling we should
- // store the number of rows separate from the batch. That way we can get the target batch
- // size out without needing to grab the GpuSemaphore which we cannot do if we might block
- // on a read operation.
- // Besides, when the queue is empty, need to call the `hasNext` of the out iterator to
- // trigger reading and handling the control data followed with the stream data.
- override def hasNext: Boolean = queue.hasNext || outputBatchIterator.hasNext
-
- private [this] def combine(
- origBatch: ColumnarBatch,
- retBatch: ColumnarBatch): ColumnarBatch = {
- val lColumns = GpuColumnVector.extractColumns(origBatch)
- val rColumns = GpuColumnVector.extractColumns(retBatch)
- new ColumnarBatch(lColumns.map(_.incRefCount()) ++ rColumns.map(_.incRefCount()),
- origBatch.numRows())
- }
-
- override def next(): ColumnarBatch = {
- val numRows = queue.peekBatchSize
- // Update the expected batch size for next read
- pyRunner.minReadTargetBatchSize = numRows
- withResource(outputBatchIterator.next()) { cbFromPython =>
- assert(cbFromPython.numRows() == numRows)
- withResource(queue.remove()) { origBatch =>
- numOutputBatches += 1
- numOutputRows += numRows
- projectResult(combine(origBatch, cbFromPython))
- }
- }
- }
- } // End of new Iterator
+ val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
+ new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows,
+ numOutputBatches).map(projectResult(_))
} else {
// Empty partition, return the input iterator directly
inputIter
diff --git a/sql-plugin/src/main/311+-db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala b/sql-plugin/src/main/311+-db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala
index 58fca943cb9..2ab3f88ee0b 100644
--- a/sql-plugin/src/main/311+-db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala
+++ b/sql-plugin/src/main/311+-db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupUDFArrowPythonRunner.scala
@@ -28,7 +28,7 @@ import com.nvidia.spark.rapids._
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python._
import org.apache.spark.sql.execution.python.PythonUDFRunner
-import org.apache.spark.sql.rapids.execution.python.{BufferToStreamWriter, GpuArrowPythonRunner}
+import org.apache.spark.sql.rapids.execution.python.{BufferToStreamWriter, GpuArrowPythonRunner, GpuPythonArrowOutput, GpuPythonRunnerBase}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
@@ -54,12 +54,10 @@ class GpuGroupUDFArrowPythonRunner(
timeZoneId: String,
conf: Map[String, String],
batchSize: Long,
- semWait: GpuMetric,
- onDataWriteFinished: () => Unit,
- override val pythonOutSchema: StructType,
- minReadTargetBatchSize: Int)
- extends GpuArrowPythonRunner(funcs, evalType, argOffsets, pythonInSchema, timeZoneId, conf,
- batchSize, semWait, onDataWriteFinished, pythonOutSchema, minReadTargetBatchSize) {
+ val semWait: GpuMetric,
+ val pythonOutSchema: StructType)
+ extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets)
+ with GpuPythonArrowOutput {
protected override def newWriterThread(
env: SparkEnv,
@@ -91,7 +89,7 @@ class GpuGroupUDFArrowPythonRunner(
GpuSemaphore.releaseIfNecessary(TaskContext.get())
})
// Flatten the names of nested struct columns, required by cudf Arrow IPC writer.
- flattenNames(pythonInSchema).foreach { case (name, nullable) =>
+ GpuArrowPythonRunner.flattenNames(pythonInSchema).foreach { case (name, nullable) =>
if (nullable) {
builder.withColumnNames(name)
} else {
@@ -121,19 +119,8 @@ class GpuGroupUDFArrowPythonRunner(
// tell serializer we are done
dataOut.writeInt(0)
dataOut.flush()
- if (onDataWriteFinished != null) onDataWriteFinished()
}
}
-
- private def flattenNames(d: DataType, nullable: Boolean=true): Seq[(String, Boolean)] =
- d match {
- case s: StructType =>
- s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable))
- case m: MapType =>
- flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable)
- case a: ArrayType => flattenNames(a.elementType, nullable)
- case _ => Nil
- }
}
}
}
diff --git a/sql-plugin/src/main/311+-nondb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala b/sql-plugin/src/main/311+-nondb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala
index acc9be2e399..cc6f95a4efc 100644
--- a/sql-plugin/src/main/311+-nondb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala
+++ b/sql-plugin/src/main/311+-nondb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala
@@ -149,13 +149,7 @@ case class GpuFlatMapGroupsInPandasExec(
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
spillCallback.semaphoreWaitTime,
- onDataWriteFinished = null,
- pythonOutputSchema,
- // We can not assert the result batch from Python has the same row number with the
- // input batch. Because Grouped Map UDF allows the output of arbitrary length.
- // So try to read as many as possible by specifying `minReadTargetBatchSize` as
- // `Int.MaxValue` here.
- Int.MaxValue)
+ pythonOutputSchema)
executePython(pyInputIter, localOutput, pyRunner, mNumOutputRows, mNumOutputBatches)
} else {
diff --git a/sql-plugin/src/main/311until320-nondb/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala b/sql-plugin/src/main/311until320-nondb/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala
index 9a613fbb676..71472618926 100644
--- a/sql-plugin/src/main/311until320-nondb/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala
+++ b/sql-plugin/src/main/311until320-nondb/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala
@@ -24,13 +24,12 @@ import com.nvidia.spark.rapids.Arm
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.BasePythonRunner
-import org.apache.spark.sql.vectorized.ColumnarBatch
// pid is not a constructor argument in 30x and 31x
abstract class ShimBasePythonRunner[IN, OUT](
funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions],
evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]]
-) extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets)
+) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets)
with Arm {
protected abstract class ShimReaderIterator(
stream: DataInputStream,
diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala b/sql-plugin/src/main/31xdb/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala
index b3fbdb985b0..cb18e29cc60 100644
--- a/sql-plugin/src/main/31xdb/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala
+++ b/sql-plugin/src/main/31xdb/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala
@@ -24,13 +24,12 @@ import com.nvidia.spark.rapids.Arm
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.BasePythonRunner
-import org.apache.spark.sql.vectorized.ColumnarBatch
// pid is not a constructor argument in 30x and 31x
abstract class ShimBasePythonRunner[IN, OUT](
funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions],
evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]]
-) extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets)
+) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets)
with Arm {
protected abstract class ShimReaderIterator(
stream: DataInputStream,
diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala
index 0b87c1d2182..575e42dc53d 100644
--- a/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala
+++ b/sql-plugin/src/main/31xdb/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala
@@ -154,13 +154,7 @@ case class GpuFlatMapGroupsInPandasExec(
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
spillCallback.semaphoreWaitTime,
- onDataWriteFinished = null,
- pythonOutputSchema,
- // We can not assert the result batch from Python has the same row number with the
- // input batch. Because Grouped Map UDF allows the output of arbitrary length.
- // So try to read as many as possible by specifying `minReadTargetBatchSize` as
- // `Int.MaxValue` here.
- Int.MaxValue)
+ pythonOutputSchema)
} else {
new GpuArrowPythonRunner(
chainedFunc,
@@ -171,9 +165,7 @@ case class GpuFlatMapGroupsInPandasExec(
pythonRunnerConf,
Int.MaxValue,
spillCallback.semaphoreWaitTime,
- onDataWriteFinished = null,
- pythonOutputSchema,
- Int.MaxValue)
+ pythonOutputSchema)
}
executePython(pyInputIter, localOutput, pyRunner, mNumOutputRows, mNumOutputBatches)
diff --git a/sql-plugin/src/main/320+/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala b/sql-plugin/src/main/320+/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala
index 661c6eaf8a0..8f7f68427cc 100644
--- a/sql-plugin/src/main/320+/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala
+++ b/sql-plugin/src/main/320+/scala/org/apache/spark/rapids/shims/api/python/ShimBasePythonRunner.scala
@@ -24,12 +24,11 @@ import com.nvidia.spark.rapids.Arm
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.BasePythonRunner
-import org.apache.spark.sql.vectorized.ColumnarBatch
abstract class ShimBasePythonRunner[IN, OUT](
funcs : scala.Seq[org.apache.spark.api.python.ChainedPythonFunctions],
evalType : scala.Int, argOffsets : scala.Array[scala.Array[scala.Int]]
-) extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets)
+) extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets)
with Arm {
protected abstract class ShimReaderIterator(
stream: DataInputStream,
diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala
index d78704c5ebc..a80ec0c49cd 100644
--- a/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala
+++ b/sql-plugin/src/main/321db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuFlatMapGroupsInPandasExec.scala
@@ -155,13 +155,7 @@ case class GpuFlatMapGroupsInPandasExec(
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
spillCallback.semaphoreWaitTime,
- onDataWriteFinished = null,
- pythonOutputSchema,
- // We can not assert the result batch from Python has the same row number with the
- // input batch. Because Grouped Map UDF allows the output of arbitrary length.
- // So try to read as many as possible by specifying `minReadTargetBatchSize` as
- // `Int.MaxValue` here.
- Int.MaxValue)
+ pythonOutputSchema)
} else {
new GpuArrowPythonRunner(
chainedFunc,
@@ -172,9 +166,7 @@ case class GpuFlatMapGroupsInPandasExec(
pythonRunnerConf,
Int.MaxValue,
spillCallback.semaphoreWaitTime,
- onDataWriteFinished = null,
- pythonOutputSchema,
- Int.MaxValue)
+ pythonOutputSchema)
}
executePython(pyInputIter, localOutput, pyRunner, mNumOutputRows, mNumOutputBatches)
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index d69f494ad6f..0b54594289f 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -4061,11 +4061,12 @@ object GpuOverrides extends Logging {
e.evalType)
}),
exec[FlatMapCoGroupsInPandasExec](
- "The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports" +
- " scheduling GPU resources for the Python process when enabled",
- ExecChecks.hiddenHack(),
+ "The backend for CoGrouped Aggregation Pandas UDF. Accelerates the data transfer" +
+ " between the Java process and the Python process. It also supports scheduling GPU" +
+ " resources for the Python process when enabled.",
+ ExecChecks(TypeSig.commonCudfTypes, TypeSig.all),
(flatCoPy, conf, p, r) => new GpuFlatMapCoGroupsInPandasExecMeta(flatCoPy, conf, p, r))
- .disabledByDefault("Performance is not ideal now"),
+ .disabledByDefault("Performance is not ideal with many small groups"),
exec[FlatMapGroupsInPandasExec](
"The backend for Flat Map Groups Pandas UDF, Accelerates the data transfer between the" +
" Java process and the Python process. It also supports scheduling GPU resources" +
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/execution/python/rapids/GpuPandasUtils.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/execution/python/rapids/GpuPandasUtils.scala
deleted file mode 100644
index acf2c211bcb..00000000000
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/execution/python/rapids/GpuPandasUtils.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2020, NVIDIA CORPORATION.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.python.rapids
-
-import org.apache.spark.api.python.BasePythonRunner
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.python.PandasGroupUtils
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-/*
- * This is to expose the APIs of PandasGroupUtils to rapids Execs
- */
-private[sql] object GpuPandasUtils {
-
- def executePython[T](
- data: Iterator[T],
- output: Seq[Attribute],
- runner: BasePythonRunner[T, ColumnarBatch]): Iterator[InternalRow] = {
- PandasGroupUtils.executePython(data, output, runner)
- }
-
- def groupAndProject(
- input: Iterator[InternalRow],
- groupingAttributes: Seq[Attribute],
- inputSchema: Seq[Attribute],
- dedupSchema: Seq[Attribute]):
- Iterator[(InternalRow, Iterator[InternalRow])] = {
- PandasGroupUtils.groupAndProject(input, groupingAttributes, inputSchema, dedupSchema)
- }
-
- def resolveArgOffsets(
- child: SparkPlan, groupingAttributes: Seq[Attribute]): (Seq[Attribute], Array[Int]) = {
- PandasGroupUtils.resolveArgOffsets(child, groupingAttributes)
- }
-}
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala
index c04f0c02ae3..08ee400c735 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/BatchGroupUtils.scala
@@ -24,6 +24,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -190,10 +191,10 @@ private[python] object BatchGroupUtils extends Arm {
* @param outputBatches a metric to record the output batches.
* @return an iterator of the resulting batches from the Python runner.
*/
- def executePython(
- pyInputIterator: Iterator[ColumnarBatch],
+ def executePython[IN](
+ pyInputIterator: Iterator[IN],
output: Seq[Attribute],
- pyRunner: GpuArrowPythonRunner,
+ pyRunner: GpuPythonRunnerBase[IN],
outputRows: GpuMetric,
outputBatches: GpuMetric): Iterator[ColumnarBatch] = {
val context = TaskContext.get()
@@ -376,7 +377,7 @@ private[python] object BatchGroupedIterator extends Arm {
* @param numOutputRows a metric for output rows.
* @param numOutputBatches a metric for output batches
*/
-private[python] class CombiningIterator(
+class CombiningIterator(
inputBatchQueue: BatchQueue,
pythonOutputIter: Iterator[ColumnarBatch],
pythonArrowReader: GpuPythonArrowOutput,
@@ -395,7 +396,7 @@ private[python] class CombiningIterator(
override def next(): ColumnarBatch = {
val numRows = inputBatchQueue.peekBatchSize
// Updates the expected batch size for next read
- pythonArrowReader.updateMinReadTargetBatchSize(numRows)
+ pythonArrowReader.setMinReadTargetBatchSize(numRows)
// Reads next batch from Python and combines it with the input batch by the left side.
withResource(pythonOutputIter.next()) { cbFromPython =>
assert(cbFromPython.numRows() == numRows)
@@ -414,3 +415,134 @@ private[python] class CombiningIterator(
}
}
+
+/**
+ * Iterates over the left and right BatchGroupedIterators and returns the cogrouped data,
+ * i.e. each record is rows having the same grouping key from the two BatchGroupedIterators.
+ *
+ * Note: we assume the output of each BatchGroupedIterator is ordered by the grouping key.
+ */
+class CoGroupedIterator(
+ leftGroupedIter: Iterator[ColumnarBatch],
+ leftSchema: Seq[Attribute],
+ leftGroupOffsets: Seq[Int],
+ rightGroupedIter: Iterator[ColumnarBatch],
+ rightSchema: Seq[Attribute],
+ rightGroupOffsets: Seq[Int]) extends Iterator[(ColumnarBatch, ColumnarBatch)] with Arm {
+
+ // Same with CPU, use the left grouping key for comparison.
+ private val groupSchema = leftGroupOffsets.map(leftSchema(_))
+ private val keyOrdering =
+ GenerateOrdering.generate(groupSchema.map(SortOrder(_, Ascending)), groupSchema)
+
+ private var currentLeftData: ColumnarBatch = _
+ private var currentRightData: ColumnarBatch = _
+
+ // An empty table is required to indicate an empty group according to
+ // the cuDF Arrow writer and the communication protocol.
+ // We don't want to create multiple empty batches, instead leverage the ref count.
+ private lazy val emptyLeftBatch: ColumnarBatch =
+ GpuColumnVector.emptyBatch(StructType.fromAttributes(leftSchema))
+ private lazy val emptyRightBatch: ColumnarBatch =
+ GpuColumnVector.emptyBatch(StructType.fromAttributes(rightSchema))
+
+ // Suppose runs inside a task context.
+ TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+ Seq(currentLeftData, currentRightData, emptyLeftBatch, emptyRightBatch)
+ .filter(_ != null)
+ .safeClose()
+ }
+
+ override def hasNext: Boolean = {
+ if (currentLeftData == null && leftGroupedIter.hasNext) {
+ currentLeftData = leftGroupedIter.next()
+ }
+ closeOnExcept(Option(currentLeftData)) { _ =>
+ if (currentRightData == null && rightGroupedIter.hasNext) {
+ currentRightData = rightGroupedIter.next()
+ }
+ }
+
+ currentLeftData != null || currentRightData != null
+ }
+
+ override def next(): (ColumnarBatch, ColumnarBatch) = {
+ assert(hasNext)
+
+ if (currentLeftData.eq(null)) {
+ // left is null, right is not null, consume the right data.
+ rightOnly()
+ } else if (currentRightData.eq(null)) {
+ // left is not null, right is null, consume the left data.
+ leftOnly()
+ } else {
+ // Neither is null.
+ val leftKey = getHostKeyBatch(currentLeftData, leftSchema, leftGroupOffsets)
+ val rightKey = closeOnExcept(leftKey) { _ =>
+ getHostKeyBatch(currentRightData, rightSchema, rightGroupOffsets)
+ }
+ val compared = withResource(Seq(leftKey, rightKey)) { _ =>
+ compareHostKeyBatch(leftKey, rightKey)
+ }
+ if (compared < 0) {
+ // the grouping key of left is smaller, consume the left data.
+ leftOnly()
+ } else if (compared > 0) {
+ // the grouping key of right is smaller, consume the right data.
+ rightOnly()
+ } else {
+ // left and right have the same grouping key, consume both of them.
+ val result = (currentLeftData, currentRightData)
+ currentLeftData = null
+ currentRightData = null
+ result
+ }
+ }
+ }
+
+ private def leftOnly(): (ColumnarBatch, ColumnarBatch) = {
+ val result = (currentLeftData, GpuColumnVector.incRefCounts(emptyRightBatch))
+ currentLeftData = null
+ result
+ }
+
+ private def rightOnly(): (ColumnarBatch, ColumnarBatch) = {
+ val result = (GpuColumnVector.incRefCounts(emptyLeftBatch), currentRightData)
+ currentRightData = null
+ result
+ }
+
+ private def getHostKeyBatch(
+ batch: ColumnarBatch,
+ schema: Seq[Attribute],
+ groupKeys: Seq[Int]): ColumnarBatch = {
+
+ val groupAttrs = groupKeys.map(schema(_))
+ val keyRefs = GpuBindReferences.bindGpuReferences(groupAttrs, schema)
+ val oneRowKeyTable = withResource(GpuProjectExec.project(batch, keyRefs)) { keyBatch =>
+ withResource(GpuColumnVector.from(keyBatch)) { keyTable =>
+ // The group batch will not be empty, since an empty group will be skipped when
+ // doing group splitting previously.
+ // Only one row is needed since keys are the same in a group.
+ withResource(cudf.ColumnVector.fromInts(0)) { gatherMap =>
+ keyTable.gather(gatherMap)
+ }
+ }
+ }
+ withResource(oneRowKeyTable) { _ =>
+ val hostCols = GpuColumnVector.extractHostColumns(oneRowKeyTable,
+ groupAttrs.map(_.dataType).toArray)
+ new ColumnarBatch(hostCols.toArray, oneRowKeyTable.getRowCount.toInt)
+ }
+ }
+
+ private def compareHostKeyBatch(leftKey: ColumnarBatch, rightKey: ColumnarBatch): Int = {
+ // This is not an ETL operation, so cuDF does not have this kind of comparison API,
+ // Then here borrows the CPU way.
+ // Assume the data of the input batches are on host.
+ assert(leftKey.numRows() > 0 && rightKey.numRows() > 0)
+ val leftKeyRow = leftKey.rowIterator().next()
+ val rightKeyRow = rightKey.rowIterator().next()
+ keyOrdering.compare(leftKeyRow, rightKeyRow)
+ }
+}
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala
index 377c0f1ad42..ec7f6593bbc 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala
@@ -251,8 +251,8 @@ case class GpuAggregateInPandasExec(
// The whole group data should be written in a single call, so here is unlimited
Int.MaxValue,
spillCallback.semaphoreWaitTime,
- () => queue.finish(),
- StructType.fromAttributes(pyOutAttributes))
+ StructType.fromAttributes(pyOutAttributes),
+ () => queue.finish())
val pyOutputIterator = pyRunner.compute(pyInputIter, context.partitionId(), context)
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala
index 009f3ac3453..95cda337197 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala
@@ -19,33 +19,23 @@
package org.apache.spark.sql.rapids.execution.python
-import java.io.{DataInputStream, DataOutputStream}
-import java.net.Socket
-import java.util.concurrent.atomic.AtomicBoolean
-
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf._
import com.nvidia.spark.rapids._
-import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
-import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.TaskContext
import org.apache.spark.api.python._
-import org.apache.spark.rapids.shims.api.python.ShimBasePythonRunner
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.python.PythonUDFRunner
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.Utils
/**
* This iterator will round incoming batches to multiples of targetRoundoff rows, if possible.
@@ -244,246 +234,6 @@ class BatchQueue extends AutoCloseable with Arm {
}
}
-/**
- * A trait that can be mixed-in with `GpuArrowPythonRunner`. It implements the logic from
- * Python (Arrow) to GPU/JVM (ColumnarBatch).
- */
-trait GpuPythonArrowOutput extends Arm { self: GpuArrowPythonRunner =>
-
- /**
- * Update the expected batch size for next reading.
- */
- private[python] final def updateMinReadTargetBatchSize(size: Int) = {
- self.minReadTargetBatchSize = size
- }
-
- def semWait: GpuMetric
-
- protected def newReaderIterator(
- stream: DataInputStream,
- writerThread: WriterThread,
- startTime: Long,
- env: SparkEnv,
- worker: Socket,
- releasedOrClosed: AtomicBoolean,
- context: TaskContext
- ): Iterator[ColumnarBatch] = {
- newReaderIterator(stream, writerThread, startTime, env, worker, None, releasedOrClosed,
- context)
- }
-
- protected def newReaderIterator(
- stream: DataInputStream,
- writerThread: WriterThread,
- startTime: Long,
- env: SparkEnv,
- worker: Socket,
- pid: Option[Int],
- releasedOrClosed: AtomicBoolean,
- context: TaskContext): Iterator[ColumnarBatch] = {
-
- new ShimReaderIterator(stream, writerThread, startTime, env, worker, pid, releasedOrClosed,
- context) {
-
- private[this] var arrowReader: StreamedTableReader = _
-
- context.addTaskCompletionListener[Unit] { _ =>
- if (arrowReader != null) {
- arrowReader.close()
- arrowReader = null
- }
- }
-
- private var batchLoaded = true
-
- protected override def read(): ColumnarBatch = {
- if (writerThread.exception.isDefined) {
- throw writerThread.exception.get
- }
- try {
- // Because of batching and other things we have to be sure that we release the semaphore
- // before any operation that could block. This is because we are using multiple threads
- // for a single task and the GpuSemaphore might not wake up both threads associated with
- // the task, so a reader can be blocked waiting for data, while a writer is waiting on
- // the semaphore
- GpuSemaphore.releaseIfNecessary(TaskContext.get())
- if (arrowReader != null && batchLoaded) {
- // The GpuSemaphore is acquired in a callback
- val table =
- withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ =>
- arrowReader.getNextIfAvailable(self.minReadTargetBatchSize)
- }
- if (table == null) {
- batchLoaded = false
- arrowReader.close()
- arrowReader = null
- read()
- } else {
- withResource(table) { table =>
- batchLoaded = true
- GpuColumnVector.from(table, GpuColumnVector.extractTypes(self.pythonOutSchema))
- }
- }
- } else {
- stream.readInt() match {
- case SpecialLengths.START_ARROW_STREAM =>
- val builder = ArrowIPCOptions.builder()
- builder.withCallback(() =>
- GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWait))
- arrowReader = Table.readArrowIPCChunked(builder.build(),
- new StreamToBufferProvider(stream))
- read()
- case SpecialLengths.TIMING_DATA =>
- handleTimingData()
- read()
- case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
- throw handlePythonException()
- case SpecialLengths.END_OF_DATA_SECTION =>
- handleEndOfDataSection()
- null
- }
- }
- } catch handleException
- }
- }
- }
-}
-
-
-/**
- * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream.
- */
-class GpuArrowPythonRunner(
- funcs: Seq[ChainedPythonFunctions],
- evalType: Int,
- argOffsets: Array[Array[Int]],
- pythonInSchema: StructType,
- timeZoneId: String,
- conf: Map[String, String],
- batchSize: Long,
- val semWait: GpuMetric,
- onDataWriteFinished: () => Unit,
- val pythonOutSchema: StructType,
- var minReadTargetBatchSize: Int = 1)
- extends ShimBasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets)
- with GpuPythonArrowOutput {
-
- override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
- require(
- bufferSize >= 4,
- "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
- s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
-
- protected override def newWriterThread(
- env: SparkEnv,
- worker: Socket,
- inputIterator: Iterator[ColumnarBatch],
- partitionIndex: Int,
- context: TaskContext): WriterThread = {
- new WriterThread(env, worker, inputIterator, partitionIndex, context) {
-
- protected override def writeCommand(dataOut: DataOutputStream): Unit = {
-
- // Write config for the worker as a number of key -> value pairs of strings
- dataOut.writeInt(conf.size)
- for ((k, v) <- conf) {
- PythonRDD.writeUTF(k, dataOut)
- PythonRDD.writeUTF(v, dataOut)
- }
-
- PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
- }
-
- protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
- val writer = {
- val builder = ArrowIPCWriterOptions.builder()
- builder.withMaxChunkSize(batchSize)
- builder.withCallback((table: Table) => {
- table.close()
- GpuSemaphore.releaseIfNecessary(TaskContext.get())
- })
- // Flatten the names of nested struct columns, required by cudf arrow IPC writer.
- flattenNames(pythonInSchema).foreach { case (name, nullable) =>
- if (nullable) {
- builder.withColumnNames(name)
- } else {
- builder.withNotNullableColumnNames(name)
- }
- }
- Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut))
- }
- Utils.tryWithSafeFinally {
- while(inputIterator.hasNext) {
- val table = withResource(inputIterator.next()) { nextBatch =>
- GpuColumnVector.from(nextBatch)
- }
- withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ =>
- // The callback will handle closing table and releasing the semaphore
- writer.write(table)
- }
- }
- // The iterator can grab the semaphore even on an empty batch
- GpuSemaphore.releaseIfNecessary(TaskContext.get())
- } {
- writer.close()
- dataOut.flush()
- if (onDataWriteFinished != null) onDataWriteFinished()
- }
- }
-
- private def flattenNames(d: DataType, nullable: Boolean=true): Seq[(String, Boolean)] =
- d match {
- case s: StructType =>
- s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable))
- case m: MapType =>
- flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable)
- case a: ArrayType => flattenNames(a.elementType, nullable)
- case _ => Nil
- }
- }
- }
-}
-
-class BufferToStreamWriter(outputStream: DataOutputStream) extends HostBufferConsumer with Arm {
- private[this] val tempBuffer = new Array[Byte](128 * 1024)
-
- override def handleBuffer(hostBuffer: HostMemoryBuffer, length: Long): Unit = {
- withResource(hostBuffer) { buffer =>
- var len = length
- var offset: Long = 0
- while(len > 0) {
- val toCopy = math.min(tempBuffer.length, len).toInt
- buffer.getBytes(tempBuffer, 0, offset, toCopy)
- outputStream.write(tempBuffer, 0, toCopy)
- len = len - toCopy
- offset = offset + toCopy
- }
- }
- }
-}
-
-class StreamToBufferProvider(inputStream: DataInputStream) extends HostBufferProvider {
- private[this] val tempBuffer = new Array[Byte](128 * 1024)
-
- override def readInto(hostBuffer: HostMemoryBuffer, length: Long): Long = {
- var amountLeft = length
- var totalRead : Long = 0
- while (amountLeft > 0) {
- val amountToRead = Math.min(tempBuffer.length, amountLeft).toInt
- val amountRead = inputStream.read(tempBuffer, 0, amountToRead)
- if (amountRead <= 0) {
- // Reached EOF
- amountLeft = 0
- } else {
- amountLeft -= amountRead
- hostBuffer.setBytes(totalRead, tempBuffer, 0, amountRead)
- totalRead += amountRead
- }
- }
- totalRead
- }
-}
-
/**
* A physical plan that evaluates a [[GpuPythonUDF]]. The transformation of the data to arrow
* happens on the GPU (practically a noop), But execution of the UDFs are on the CPU.
@@ -492,7 +242,7 @@ case class GpuArrowEvalPythonExec(
udfs: Seq[GpuPythonUDF],
resultAttrs: Seq[Attribute],
child: SparkPlan,
- evalType: Int) extends ShimUnaryExecNode with GpuExec {
+ evalType: Int) extends ShimUnaryExecNode with GpuPythonExecBase {
// We split the input batch up into small pieces when sending to python for compatibility reasons
override def coalesceAfter: Boolean = true
@@ -517,24 +267,9 @@ case class GpuArrowEvalPythonExec(
private val sessionLocalTimeZone = conf.sessionLocalTimeZone
private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
-
- override protected def doExecute(): RDD[InternalRow] =
- throw new IllegalStateException(s"Row-based execution should not occur for $this")
-
- override lazy val allMetrics: Map[String, GpuMetric] = Map(
- NUM_OUTPUT_ROWS -> createMetric(outputRowsLevel, DESCRIPTION_NUM_OUTPUT_ROWS),
- NUM_OUTPUT_BATCHES -> createMetric(outputBatchesLevel, DESCRIPTION_NUM_OUTPUT_BATCHES),
- NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
- NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES)
- ) ++ spillMetrics
-
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
- val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
- val numInputRows = gpuLongMetric(NUM_INPUT_ROWS)
- val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES)
- val semWait = gpuLongMetric(SEMAPHORE_WAIT_TIME)
- val spillCallback = GpuMetric.makeSpillCallback(allMetrics)
+ val (numInputRows, numInputBatches, numOutputRows, numOutputBatches,
+ spillCallback) = commonGpuMetrics()
lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf)
@@ -611,47 +346,13 @@ case class GpuArrowEvalPythonExec(
timeZone,
runnerConf,
targetBatchSize,
- semWait,
- () => queue.finish(),
- pythonOutputSchema)
-
- val outputBatchIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
-
- new Iterator[ColumnarBatch] {
- // for hasNext we are waiting on the queue to have something inserted into it
- // instead of waiting for a result to be ready from python. The reason for this
- // is to let us know the target number of rows in the batch that we want when reading.
- // It is a bit hacked up but it works. In the future when we support spilling we should
- // store the number of rows separate from the batch. That way we can get the target batch
- // size out without needing to grab the GpuSemaphore which we cannot do if we might block
- // on a read operation.
- // Besides, when the queue is empty, need to call the `hasNext` of the out iterator to
- // trigger reading and handling the control data followed with the stream data.
- override def hasNext: Boolean = queue.hasNext || outputBatchIterator.hasNext
-
- private [this] def combine(
- origBatch: ColumnarBatch,
- retBatch: ColumnarBatch): ColumnarBatch = {
- val lColumns = GpuColumnVector.extractColumns(origBatch)
- val rColumns = GpuColumnVector.extractColumns(retBatch)
- new ColumnarBatch(lColumns.map(_.incRefCount()) ++ rColumns.map(_.incRefCount()),
- origBatch.numRows())
- }
+ spillCallback.semaphoreWaitTime,
+ pythonOutputSchema,
+ () => queue.finish())
- override def next(): ColumnarBatch = {
- val numRows = queue.peekBatchSize
- // Update the expected batch size for next read
- pyRunner.minReadTargetBatchSize = numRows
- withResource(outputBatchIterator.next()) { cbFromPython =>
- assert(cbFromPython.numRows() == numRows)
- withResource(queue.remove()) { origBatch =>
- numOutputBatches += 1
- numOutputRows += numRows
- combine(origBatch, cbFromPython)
- }
- }
- }
- }
+ val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
+ new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows,
+ numOutputBatches)
} else {
// Empty partition, return it directly
iter
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala
new file mode 100644
index 00000000000..ab56643e6af
--- /dev/null
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowPythonRunner.scala
@@ -0,0 +1,295 @@
+/*
+ * Copyright (c) 2022, NVIDIA CORPORATION.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.rapids.execution.python
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.util.concurrent.atomic.AtomicBoolean
+
+import ai.rapids.cudf._
+import com.nvidia.spark.rapids._
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python._
+import org.apache.spark.rapids.shims.api.python.ShimBasePythonRunner
+import org.apache.spark.sql.execution.python.PythonUDFRunner
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+
+class BufferToStreamWriter(outputStream: DataOutputStream) extends HostBufferConsumer with Arm {
+ private[this] val tempBuffer = new Array[Byte](128 * 1024)
+
+ override def handleBuffer(hostBuffer: HostMemoryBuffer, length: Long): Unit = {
+ withResource(hostBuffer) { buffer =>
+ var len = length
+ var offset: Long = 0
+ while(len > 0) {
+ val toCopy = math.min(tempBuffer.length, len).toInt
+ buffer.getBytes(tempBuffer, 0, offset, toCopy)
+ outputStream.write(tempBuffer, 0, toCopy)
+ len = len - toCopy
+ offset = offset + toCopy
+ }
+ }
+ }
+}
+
+class StreamToBufferProvider(inputStream: DataInputStream) extends HostBufferProvider {
+ private[this] val tempBuffer = new Array[Byte](128 * 1024)
+
+ override def readInto(hostBuffer: HostMemoryBuffer, length: Long): Long = {
+ var amountLeft = length
+ var totalRead : Long = 0
+ while (amountLeft > 0) {
+ val amountToRead = Math.min(tempBuffer.length, amountLeft).toInt
+ val amountRead = inputStream.read(tempBuffer, 0, amountToRead)
+ if (amountRead <= 0) {
+ // Reached EOF
+ amountLeft = 0
+ } else {
+ amountLeft -= amountRead
+ hostBuffer.setBytes(totalRead, tempBuffer, 0, amountRead)
+ totalRead += amountRead
+ }
+ }
+ totalRead
+ }
+}
+
+/**
+ * A trait that can be mixed-in with `GpuPythonRunnerBase`. It implements the logic from
+ * Python (Arrow) to GPU/JVM (ColumnarBatch).
+ */
+trait GpuPythonArrowOutput extends Arm { _: GpuPythonRunnerBase[_] =>
+
+ /**
+ * Default to `Int.MaxValue` to try to read as many as possible.
+ * Change it by calling `setMinReadTargetBatchSize` before a reading.
+ */
+ private var minReadTargetBatchSize: Int = Int.MaxValue
+
+ /**
+ * Update the expected batch size for next reading.
+ */
+ private[python] final def setMinReadTargetBatchSize(size: Int): Unit = {
+ minReadTargetBatchSize = size
+ }
+
+ def pythonOutSchema: StructType
+
+ def semWait: GpuMetric
+
+ protected def newReaderIterator(
+ stream: DataInputStream,
+ writerThread: WriterThread,
+ startTime: Long,
+ env: SparkEnv,
+ worker: Socket,
+ releasedOrClosed: AtomicBoolean,
+ context: TaskContext
+ ): Iterator[ColumnarBatch] = {
+ newReaderIterator(stream, writerThread, startTime, env, worker, None, releasedOrClosed,
+ context)
+ }
+
+ protected def newReaderIterator(
+ stream: DataInputStream,
+ writerThread: WriterThread,
+ startTime: Long,
+ env: SparkEnv,
+ worker: Socket,
+ pid: Option[Int],
+ releasedOrClosed: AtomicBoolean,
+ context: TaskContext): Iterator[ColumnarBatch] = {
+
+ new ShimReaderIterator(stream, writerThread, startTime, env, worker, pid, releasedOrClosed,
+ context) {
+
+ private[this] var arrowReader: StreamedTableReader = _
+
+ context.addTaskCompletionListener[Unit] { _ =>
+ if (arrowReader != null) {
+ arrowReader.close()
+ arrowReader = null
+ }
+ }
+
+ private var batchLoaded = true
+
+ protected override def read(): ColumnarBatch = {
+ if (writerThread.exception.isDefined) {
+ throw writerThread.exception.get
+ }
+ try {
+ // Because of batching and other things we have to be sure that we release the semaphore
+ // before any operation that could block. This is because we are using multiple threads
+ // for a single task and the GpuSemaphore might not wake up both threads associated with
+ // the task, so a reader can be blocked waiting for data, while a writer is waiting on
+ // the semaphore
+ GpuSemaphore.releaseIfNecessary(TaskContext.get())
+ if (arrowReader != null && batchLoaded) {
+ // The GpuSemaphore is acquired in a callback
+ val table =
+ withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ =>
+ arrowReader.getNextIfAvailable(minReadTargetBatchSize)
+ }
+ if (table == null) {
+ batchLoaded = false
+ arrowReader.close()
+ arrowReader = null
+ read()
+ } else {
+ withResource(table) { table =>
+ batchLoaded = true
+ GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema))
+ }
+ }
+ } else {
+ stream.readInt() match {
+ case SpecialLengths.START_ARROW_STREAM =>
+ val builder = ArrowIPCOptions.builder()
+ builder.withCallback(() =>
+ GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWait))
+ arrowReader = Table.readArrowIPCChunked(builder.build(),
+ new StreamToBufferProvider(stream))
+ read()
+ case SpecialLengths.TIMING_DATA =>
+ handleTimingData()
+ read()
+ case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+ throw handlePythonException()
+ case SpecialLengths.END_OF_DATA_SECTION =>
+ handleEndOfDataSection()
+ null
+ }
+ }
+ } catch handleException
+ }
+ }
+ }
+}
+
+/**
+ * Base class of GPU Python runners who will be mixed with GpuPythonArrowOutput
+ * to produce columnar batches.
+ */
+abstract class GpuPythonRunnerBase[IN](
+ funcs: Seq[ChainedPythonFunctions],
+ evalType: Int,
+ argOffsets: Array[Array[Int]])
+ extends ShimBasePythonRunner[IN, ColumnarBatch](funcs, evalType, argOffsets)
+
+/**
+ * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream.
+ */
+class GpuArrowPythonRunner(
+ funcs: Seq[ChainedPythonFunctions],
+ evalType: Int,
+ argOffsets: Array[Array[Int]],
+ pythonInSchema: StructType,
+ timeZoneId: String,
+ conf: Map[String, String],
+ batchSize: Long,
+ val semWait: GpuMetric,
+ val pythonOutSchema: StructType,
+ onDataWriteFinished: () => Unit = null)
+ extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets)
+ with GpuPythonArrowOutput {
+
+ override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+ require(
+ bufferSize >= 4,
+ "Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+ s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+ protected override def newWriterThread(
+ env: SparkEnv,
+ worker: Socket,
+ inputIterator: Iterator[ColumnarBatch],
+ partitionIndex: Int,
+ context: TaskContext): WriterThread = {
+ new WriterThread(env, worker, inputIterator, partitionIndex, context) {
+
+ protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+
+ // Write config for the worker as a number of key -> value pairs of strings
+ dataOut.writeInt(conf.size)
+ for ((k, v) <- conf) {
+ PythonRDD.writeUTF(k, dataOut)
+ PythonRDD.writeUTF(v, dataOut)
+ }
+
+ PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+ }
+
+ protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
+ val writer = {
+ val builder = ArrowIPCWriterOptions.builder()
+ builder.withMaxChunkSize(batchSize)
+ builder.withCallback((table: Table) => {
+ table.close()
+ GpuSemaphore.releaseIfNecessary(TaskContext.get())
+ })
+ // Flatten the names of nested struct columns, required by cudf arrow IPC writer.
+ GpuArrowPythonRunner.flattenNames(pythonInSchema).foreach { case (name, nullable) =>
+ if (nullable) {
+ builder.withColumnNames(name)
+ } else {
+ builder.withNotNullableColumnNames(name)
+ }
+ }
+ Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut))
+ }
+
+ Utils.tryWithSafeFinally {
+ while(inputIterator.hasNext) {
+ val table = withResource(inputIterator.next()) { nextBatch =>
+ GpuColumnVector.from(nextBatch)
+ }
+ withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ =>
+ // The callback will handle closing table and releasing the semaphore
+ writer.write(table)
+ }
+ }
+ // The iterator can grab the semaphore even on an empty batch
+ GpuSemaphore.releaseIfNecessary(TaskContext.get())
+ } {
+ writer.close()
+ dataOut.flush()
+ if (onDataWriteFinished != null) onDataWriteFinished()
+ }
+ }
+ }
+ }
+}
+
+object GpuArrowPythonRunner {
+ def flattenNames(d: DataType, nullable: Boolean = true): Seq[(String, Boolean)] =
+ d match {
+ case s: StructType =>
+ s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable))
+ case m: MapType =>
+ flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable)
+ case a: ArrayType => flattenNames(a.elementType, nullable)
+ case _ => Nil
+ }
+}
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuCoGroupedArrowPythonRunner.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuCoGroupedArrowPythonRunner.scala
new file mode 100644
index 00000000000..14ece2286d6
--- /dev/null
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuCoGroupedArrowPythonRunner.scala
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2022, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.rapids.execution.python
+
+import java.io.DataOutputStream
+import java.net.Socket
+
+import ai.rapids.cudf.{ArrowIPCWriterOptions, NvtxColor, NvtxRange, Table}
+import com.nvidia.spark.rapids.{GpuColumnVector, GpuMetric, GpuSemaphore}
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRDD}
+import org.apache.spark.sql.execution.python.PythonUDFRunner
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+
+/**
+ * Python UDF Runner for cogrouped UDFs, designed for `GpuFlatMapCoGroupsInPandasExec` only.
+ *
+ * It sends Arrow bathes from two different DataFrames, groups them in Python,
+ * and receive it back in JVM as batches of single DataFrame.
+ */
+class GpuCoGroupedArrowPythonRunner(
+ funcs: Seq[ChainedPythonFunctions],
+ evalType: Int,
+ argOffsets: Array[Array[Int]],
+ leftSchema: StructType,
+ rightSchema: StructType,
+ timeZoneId: String,
+ conf: Map[String, String],
+ batchSize: Int,
+ val semWait: GpuMetric,
+ val pythonOutSchema: StructType)
+ extends GpuPythonRunnerBase[(ColumnarBatch, ColumnarBatch)](funcs, evalType, argOffsets)
+ with GpuPythonArrowOutput {
+
+ protected override def newWriterThread(
+ env: SparkEnv,
+ worker: Socket,
+ inputIterator: Iterator[(ColumnarBatch, ColumnarBatch)],
+ partitionIndex: Int,
+ context: TaskContext): WriterThread = {
+ new WriterThread(env, worker, inputIterator, partitionIndex, context) {
+
+ protected override def writeCommand(dataOut: DataOutputStream): Unit = {
+
+ // Write config for the worker as a number of key -> value pairs of strings
+ dataOut.writeInt(conf.size)
+ for ((k, v) <- conf) {
+ PythonRDD.writeUTF(k, dataOut)
+ PythonRDD.writeUTF(v, dataOut)
+ }
+
+ PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
+ }
+
+ protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = {
+ // For each we first send the number of dataframes in each group then send
+ // first df, then send second df. End of data is marked by sending 0.
+ while (inputIterator.hasNext) {
+ dataOut.writeInt(2)
+ val (leftGroupBatch, rightGroupBatch) = inputIterator.next()
+ withResource(Seq(leftGroupBatch, rightGroupBatch)) { _ =>
+ writeGroupBatch(leftGroupBatch, leftSchema, dataOut)
+ writeGroupBatch(rightGroupBatch, rightSchema, dataOut)
+ }
+ }
+ // The iterator can grab the semaphore even on an empty batch
+ GpuSemaphore.releaseIfNecessary(TaskContext.get())
+ dataOut.writeInt(0)
+ }
+
+ private def writeGroupBatch(groupBatch: ColumnarBatch, batchSchema: StructType,
+ dataOut: DataOutputStream): Unit = {
+ val writer = {
+ val builder = ArrowIPCWriterOptions.builder()
+ builder.withMaxChunkSize(batchSize)
+ builder.withCallback((table: Table) => {
+ table.close()
+ GpuSemaphore.releaseIfNecessary(TaskContext.get())
+ })
+ // Flatten the names of nested struct columns, required by cudf arrow IPC writer.
+ GpuArrowPythonRunner.flattenNames(batchSchema).foreach { case (name, nullable) =>
+ if (nullable) {
+ builder.withColumnNames(name)
+ } else {
+ builder.withNotNullableColumnNames(name)
+ }
+ }
+ Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut))
+ }
+
+ Utils.tryWithSafeFinally {
+ withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ =>
+ // The callback will handle closing table and releasing the semaphore
+ writer.write(GpuColumnVector.from(groupBatch))
+ }
+ } {
+ writer.close()
+ dataOut.flush()
+ }
+ } // end of writeGroup
+ }
+ } // end of newWriterThread
+}
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala
index 4d61a65c6d4..9687d8619da 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapCoGroupsInPandasExec.scala
@@ -23,13 +23,12 @@ import com.nvidia.spark.rapids.shims.ShimBinaryExecNode
import org.apache.spark.TaskContext
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
-import org.apache.spark.sql.execution.{CoGroupedIterator, SparkPlan}
-import org.apache.spark.sql.execution.python.{CoGroupedArrowPythonRunner, FlatMapCoGroupsInPandasExec}
-import org.apache.spark.sql.execution.python.rapids.GpuPandasUtils._
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.python.FlatMapCoGroupsInPandasExec
+import org.apache.spark.sql.rapids.execution.python.BatchGroupUtils._
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
@@ -45,47 +44,69 @@ class GpuFlatMapCoGroupsInPandasExecMeta(
override def noReplacementPossibleMessage(reasons: String): String =
s"cannot run even partially on the GPU because $reasons"
- // Ignore the expressions since columnar way is not supported yet
- override val childExprs: Seq[BaseExprMeta[_]] = Seq.empty
+ override def tagPlanForGpu(): Unit = {
+ // Fall back to CPU when the two grouping columns in a pair have different types.
+ // e.g.
+ // Left grouping column is (a: Int),
+ // Right grouping columns are (a: String, b: Int)
+ // The first pair has different types. This is a meaningless case,
+ // but Spark can run unexpectedly due to no type check in the UnsafeRow, while GPU
+ // will throw an exception if Java Assertion is enabled.
+ if (flatPandas.leftGroup.zipWithIndex.exists { case (leftGpAttr, at) =>
+ flatPandas.rightGroup.lift(at).exists { rightGpAttr =>
+ !leftGpAttr.dataType.sameType(rightGpAttr.dataType)
+ }
+ }) {
+ willNotWorkOnGpu("grouping columns from two DataFrames have different types.")
+ }
+ }
+
+ private val leftGroupingAttrs: Seq[BaseExprMeta[Attribute]] =
+ flatPandas.leftGroup.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
+
+ private val rightGroupingAttrs: Seq[BaseExprMeta[Attribute]] =
+ flatPandas.rightGroup.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
+
+ private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
+ flatPandas.func.asInstanceOf[PythonUDF], conf, Some(this))
+
+ private val resultAttrs: Seq[BaseExprMeta[Attribute]] =
+ flatPandas.output.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
+
+ override val childExprs: Seq[BaseExprMeta[_]] =
+ leftGroupingAttrs ++ rightGroupingAttrs ++ resultAttrs :+ udf
override def convertToGpu(): GpuExec = {
val Seq(left, right) = childPlans.map(_.convertIfNeeded())
GpuFlatMapCoGroupsInPandasExec(
- flatPandas.leftGroup, flatPandas.rightGroup,
- flatPandas.func,
- flatPandas.output,
+ leftGroupingAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
+ rightGroupingAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
+ udf.convertToGpu(),
+ resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
left,
right
)
}
}
-/*
- *
- * This GpuFlatMapCoGroupsInPandasExec aims at supporting running Pandas functional code
- * on GPU at Python side.
- *
- * (Currently it will not run on GPU itself, since the columnar way is not implemented yet.)
+/**
+ * GPU version of Spark's `FlatMapCoGroupsInPandasExec`
*
+ * This node aims at accelerating the data transfer between JVM and Python for GPU pipeline, and
+ * scheduling GPU resources for its Python processes.
*/
case class GpuFlatMapCoGroupsInPandasExec(
leftGroup: Seq[Attribute],
rightGroup: Seq[Attribute],
- func: Expression,
+ udf: Expression,
output: Seq[Attribute],
left: SparkPlan,
right: SparkPlan)
- extends SparkPlan with ShimBinaryExecNode with GpuExec {
+ extends SparkPlan with ShimBinaryExecNode with GpuPythonExecBase {
- override def supportsColumnar = false
- override def doExecuteColumnar(): RDD[ColumnarBatch] = {
- throw new IllegalStateException(s"Columnar execution is not supported by $this yet")
- }
-
- // Most code is copied from FlatMapCoGroupsInPandasExec, except two GPU related calls
private val sessionLocalTimeZone = conf.sessionLocalTimeZone
private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
- private val pandasFunction = func.asInstanceOf[PythonUDF].func
+ private val pandasFunction = udf.asInstanceOf[GpuPythonUDF].func
private val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
override def producedAttributes: AttributeSet = AttributeSet(output)
@@ -103,39 +124,57 @@ case class GpuFlatMapCoGroupsInPandasExec(
rightGroup.map(SortOrder(_, Ascending)) :: Nil
}
- override protected def doExecute(): RDD[InternalRow] = {
- lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf)
-
- val (leftDedup, leftArgOffsets) = resolveArgOffsets(left, leftGroup)
- val (rightDedup, rightArgOffsets) = resolveArgOffsets(right, rightGroup)
-
- // Map cogrouped rows to ArrowPythonRunner results, Only execute if partition is not empty
- left.execute().zipPartitions(right.execute()) { (leftData, rightData) =>
- if (leftData.isEmpty && rightData.isEmpty) Iterator.empty else {
+ /* Rapids things start */
+ // One batch as input to keep the integrity for each group
+ override def childrenCoalesceGoal: Seq[CoalesceGoal] =
+ Seq(RequireSingleBatch, RequireSingleBatch)
- val leftGrouped = groupAndProject(leftData, leftGroup, left.output, leftDedup)
- val rightGrouped = groupAndProject(rightData, rightGroup, right.output, rightDedup)
- val data = new CoGroupedIterator(leftGrouped, rightGrouped, leftGroup)
- .map { case (_, l, r) => (l, r) }
-
- // Start of GPU things
- if (isPythonOnGpuEnabled) {
- GpuPythonHelper.injectGpuInfo(chainedFunc, isPythonOnGpuEnabled)
- PythonWorkerSemaphore.acquireIfNecessary(TaskContext.get())
- }
- // End of GPU things
+ override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val (numInputRows, numInputBatches, numOutputRows, numOutputBatches,
+ spillCallback) = commonGpuMetrics()
+ lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf)
+ // Python wraps the resulting columns in a single struct column.
+ val pythonOutputSchema = StructType(
+ StructField("out_struct", StructType.fromAttributes(output)) :: Nil)
+
+ // Resolve the argument offsets and related attributes.
+ val GroupArgs(leftDedupAttrs, leftArgOffsets, leftGroupingOffsets) =
+ resolveArgOffsets(left, leftGroup)
+ val GroupArgs(rightDedupAttrs, rightArgOffsets, rightGroupingOffsets) =
+ resolveArgOffsets(right, rightGroup)
+
+ left.executeColumnar().zipPartitions(right.executeColumnar()) { (leftIter, rightIter) =>
+ if (isPythonOnGpuEnabled) {
+ GpuPythonHelper.injectGpuInfo(chainedFunc, isPythonOnGpuEnabled)
+ PythonWorkerSemaphore.acquireIfNecessary(TaskContext.get())
+ }
- val runner = new CoGroupedArrowPythonRunner(
+ // Only execute if partition is not empty
+ if (leftIter.isEmpty && rightIter.isEmpty) Iterator.empty else {
+ // project and group for left and right
+ val leftGroupedIter = projectAndGroup(leftIter, left.output, leftDedupAttrs,
+ leftGroupingOffsets, numInputRows, numInputBatches, spillCallback)
+ val rightGroupedIter = projectAndGroup(rightIter, right.output, rightDedupAttrs,
+ rightGroupingOffsets, numInputRows, numInputBatches, spillCallback)
+ // Cogroup the data
+ val pyInputIter = new CoGroupedIterator(leftGroupedIter, leftDedupAttrs,
+ leftGroupingOffsets, rightGroupedIter, rightDedupAttrs, rightGroupingOffsets)
+
+ val pyRunner = new GpuCoGroupedArrowPythonRunner(
chainedFunc,
PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF,
Array(leftArgOffsets ++ rightArgOffsets),
- StructType.fromAttributes(leftDedup),
- StructType.fromAttributes(rightDedup),
+ StructType.fromAttributes(leftDedupAttrs),
+ StructType.fromAttributes(rightDedupAttrs),
sessionLocalTimeZone,
- pythonRunnerConf)
+ pythonRunnerConf,
+ // The whole group data should be written in a single call, so here is unlimited
+ Int.MaxValue,
+ spillCallback.semaphoreWaitTime,
+ pythonOutputSchema)
- executePython(data, output, runner)
+ executePython(pyInputIter, output, pyRunner, numOutputRows, numOutputBatches)
}
}
- }
+ } // end of doExecuteColumnar
}
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala
index 86ce99f3d5e..1eb94af5481 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala
@@ -139,14 +139,7 @@ case class GpuMapInPandasExec(
pythonRunnerConf,
batchSize,
spillCallback.semaphoreWaitTime,
- onDataWriteFinished = null,
- pythonOutputSchema,
- // We can not assert the result batch from Python has the same row number with the
- // input batch. Because Map Pandas UDF allows the output of arbitrary length
- // and columns.
- // Then try to read as many as possible by specifying `minReadTargetBatchSize` as
- // `Int.MaxValue` here.
- Int.MaxValue)
+ pythonOutputSchema)
executePython(pyInputIterator, output, pyRunner, mNumOutputRows, mNumOutputBatches)
} else {
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala
index cae989f4621..88eeb9e88cd 100644
--- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf
import ai.rapids.cudf.{GroupByAggregation, NullPolicy, OrderByArg}
import com.nvidia.spark.rapids._
-import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
@@ -30,7 +29,6 @@ import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
import org.apache.spark.TaskContext
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.python._
@@ -177,7 +175,7 @@ class GroupingIterator(
* The base class of GpuWindowInPandasExec in different shim layers
*
*/
-trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuExec {
+trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuPythonExecBase {
def windowExpression: Seq[Expression]
def gpuPartitionSpec: Seq[Expression]
@@ -395,22 +393,9 @@ trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuExec {
new ColumnarBatch(boundsCVs ++ dataCVs.map(_.incRefCount()), numRows)
}
- override lazy val allMetrics: Map[String, GpuMetric] = Map(
- NUM_OUTPUT_ROWS -> createMetric(outputRowsLevel, DESCRIPTION_NUM_OUTPUT_ROWS),
- NUM_OUTPUT_BATCHES -> createMetric(outputBatchesLevel, DESCRIPTION_NUM_OUTPUT_BATCHES),
- NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS),
- NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES)
- ) ++ spillMetrics
-
- override protected def doExecute(): RDD[InternalRow] =
- throw new IllegalStateException(s"Row-based execution should not occur for $this")
-
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val numInputRows = gpuLongMetric(NUM_INPUT_ROWS)
- val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES)
- val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
- val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
- val spillCallback = GpuMetric.makeSpillCallback(allMetrics)
+ val (numInputRows, numInputBatches, numOutputRows, numOutputBatches,
+ spillCallback) = commonGpuMetrics()
val sessionLocalTimeZone = conf.sessionLocalTimeZone
// 1) Unwrap the expressions and build some info data:
@@ -548,45 +533,12 @@ trait GpuWindowInPandasExecBase extends ShimUnaryExecNode with GpuExec {
/* The whole group data should be written in a single call, so here is unlimited */
Int.MaxValue,
spillCallback.semaphoreWaitTime,
- () => queue.finish(),
- pythonOutputSchema)
-
- val outputBatchIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
- new Iterator[ColumnarBatch] {
- // for hasNext we are waiting on the queue to have something inserted into it
- // instead of waiting for a result to be ready from python. The reason for this
- // is to let us know the target number of rows in the batch that we want when reading.
- // It is a bit hacked up but it works. In the future when we support spilling we should
- // store the number of rows separate from the batch. That way we can get the target batch
- // size out without needing to grab the GpuSemaphore which we cannot do if we might block
- // on a read operation.
- // Besides, when the queue is empty, need to call the `hasNext` of the out iterator to
- // trigger reading and handling the control data followed with the stream data.
- override def hasNext: Boolean = queue.hasNext || outputBatchIterator.hasNext
-
- private [this] def combine(
- origBatch: ColumnarBatch,
- retBatch: ColumnarBatch): ColumnarBatch = {
- val lColumns = GpuColumnVector.extractColumns(origBatch)
- val rColumns = GpuColumnVector.extractColumns(retBatch)
- new ColumnarBatch(lColumns.map(_.incRefCount()) ++ rColumns.map(_.incRefCount()),
- origBatch.numRows())
- }
+ pythonOutputSchema,
+ () => queue.finish())
- override def next(): ColumnarBatch = {
- val numRows = queue.peekBatchSize
- // Update the expected batch size for next read
- pyRunner.minReadTargetBatchSize = numRows
- withResource(outputBatchIterator.next()) { cbFromPython =>
- assert(cbFromPython.numRows() == numRows)
- withResource(queue.remove()) { origBatch =>
- numOutputBatches += 1
- numOutputRows += numRows
- projectResult(combine(origBatch, cbFromPython))
- }
- }
- }
- } // End of new Iterator
+ val outputIterator = pyRunner.compute(pyInputIterator, context.partitionId(), context)
+ new CombiningIterator(queue, outputIterator, pyRunner, numOutputRows,
+ numOutputBatches).map(projectResult(_))
} else {
// Empty partition, return the input iterator directly
inputIter
diff --git a/tools/src/main/resources/operatorsScore.csv b/tools/src/main/resources/operatorsScore.csv
index edb1599bc0b..4d5d0b1ba4a 100644
--- a/tools/src/main/resources/operatorsScore.csv
+++ b/tools/src/main/resources/operatorsScore.csv
@@ -30,6 +30,7 @@ ShuffledHashJoinExec,3.0
SortMergeJoinExec,22.7
AggregateInPandasExec,1.2
ArrowEvalPythonExec,1.2
+FlatMapCoGroupsInPandasExec,3.0
FlatMapGroupsInPandasExec,1.2
MapInPandasExec,1.2
WindowInPandasExec,1.2
diff --git a/tools/src/main/resources/supportedExecs.csv b/tools/src/main/resources/supportedExecs.csv
index 9809884f315..03f009b9f06 100644
--- a/tools/src/main/resources/supportedExecs.csv
+++ b/tools/src/main/resources/supportedExecs.csv
@@ -40,6 +40,7 @@ SortMergeJoinExec,S,None,condition,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,N
SortMergeJoinExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS
AggregateInPandasExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,NS,NS,NS,NS
ArrowEvalPythonExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,PS,NS,PS,NS
+FlatMapCoGroupsInPandasExec,NS,This is disabled by default because Performance is not ideal with many small groups,Input/Output,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,NS,NS,NS,NS
FlatMapGroupsInPandasExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,NS,NS,NS,NS
MapInPandasExec,S,None,Input/Output,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,PS,NS,PS,NS
WindowInPandasExec,NS,This is disabled by default because it only supports row based frame for now,Input/Output,S,S,S,S,S,S,S,S,PS,S,NS,NS,NS,NS,PS,NS,NS,NS