From 892b4c5dd57e7b393c68f7e6e35167403baca81b Mon Sep 17 00:00:00 2001 From: Liangcai Li Date: Wed, 24 Mar 2021 08:17:28 +0800 Subject: [PATCH] Support running scalar pandas UDF with array type. (#1944) This PR is to support running scalar pandas UDF with array type. Add array type signature for related expressions and plans. Flatten the names of nested struct columns from schema, which is also required by the cudf Arrow IPC writer. This PR depends on rapidsai/cudf#7598 closes #1912 Signed-off-by: Firestarman --- docs/supported_ops.md | 20 ++++++------ integration_tests/src/main/python/data_gen.py | 20 ++++++++++++ integration_tests/src/main/python/udf_test.py | 13 ++++++++ .../nvidia/spark/rapids/GpuOverrides.scala | 6 ++-- .../python/GpuArrowEvalPythonExec.scala | 31 +++++++++++++------ 5 files changed, 68 insertions(+), 22 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 7682d2f16ba..c112c80a359 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -699,9 +699,9 @@ Accelerator supports are described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS @@ -10573,9 +10573,9 @@ Accelerator support is described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS @@ -10616,9 +10616,9 @@ Accelerator support is described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS @@ -10659,9 +10659,9 @@ Accelerator support is described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS @@ -10702,9 +10702,9 @@ Accelerator support is described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 0abb3b72597..22d9d94ddc7 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -826,6 +826,26 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit] +# Pyarrow will complain the error as below if the timestamp is out of range for both CPU and GPU, +# so narrow down the time range to avoid exceptions causing test failures. +# +# "pyarrow.lib.ArrowInvalid: Casting from timestamp[us, tz=UTC] to timestamp[ns] +# would result in out of bounds timestamp: 51496791452587000" +# +# This issue has been fixed in pyarrow by the PR https://github.com/apache/arrow/pull/7169 +# However it still requires PySpark to specify the new argument "timestamp_as_object". +arrow_common_gen = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, + string_gen, boolean_gen, date_gen, + TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc), + end=datetime(2262, 1, 1, tzinfo=timezone.utc))] + +arrow_array_gens = [ArrayGen(subGen) for subGen in arrow_common_gen] + nested_array_gens_sample + +arrow_one_level_struct_gen = StructGen([ + ['child'+str(i), sub_gen] for i, sub_gen in enumerate(arrow_common_gen)]) + +arrow_struct_gens = [arrow_one_level_struct_gen, + StructGen([['child0', ArrayGen(short_gen)], ['child1', arrow_one_level_struct_gen]])] # This function adds a new column named uniq_int where each row # has a new unique integer value. It just starts at 0 and diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index 95cbd8cbeb8..36ba96ac499 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -45,6 +45,8 @@ 'spark.rapids.sql.exec.WindowInPandasExec': 'true' } +data_gens_nested_for_udf = arrow_array_gens + arrow_struct_gens + #################################################################### # NOTE: pytest does not play well with pyspark udfs, because pyspark # tries to import the dependencies for top level functions and @@ -78,6 +80,17 @@ def iterator_add(to_process: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[ conf=arrow_udf_conf) +@pytest.mark.parametrize('data_gen', data_gens_nested_for_udf, ids=idfn) +def test_pandas_scalar_udf_nested_type(data_gen): + def nested_size(nested): + return pd.Series([nested.size]).repeat(len(nested)) + + my_udf = f.pandas_udf(nested_size, returnType=LongType()) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, data_gen).select(my_udf(f.col('a'))), + conf=arrow_udf_conf) + + @approximate_float @allow_non_gpu('AggregateInPandasExec', 'PythonUDF', 'Alias') @pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 1d32fe53b33..b5cddb63421 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1989,7 +1989,7 @@ object GpuOverrides { TypeSig.all, repeatingParamCheck = Some(RepeatingParamCheck( "param", - TypeSig.commonCudfTypes, + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), TypeSig.all))), (a, conf, p, r) => new ExprMeta[PythonUDF](a, conf, p, r) { override def replaceMessage: String = "not block GPU acceleration" @@ -2649,7 +2649,9 @@ object GpuOverrides { "The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the" + " Java process and the Python process. It also supports scheduling GPU resources" + " for the Python process when enabled", - ExecChecks(TypeSig.commonCudfTypes, TypeSig.all), + ExecChecks( + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all), (e, conf, p, r) => new SparkPlanMeta[ArrowEvalPythonExec](e, conf, p, r) { val udfs: Seq[BaseExprMeta[PythonUDF]] = diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index 318e7dfbd74..cd0770757b0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -26,14 +26,14 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{AggregationOnColumn, ArrowIPCOptions, ArrowIPCWriterOptions, ColumnVector, HostBufferConsumer, HostBufferProvider, HostMemoryBuffer, NvtxColor, NvtxRange, StreamedTableReader, Table} -import com.nvidia.spark.rapids.{Arm, ConcatAndConsumeAll, GpuAggregateWindowFunction, GpuBindReferences, GpuColumnVector, GpuColumnVectorFromBuffer, GpuExec, GpuMetric, GpuProjectExec, GpuSemaphore, GpuUnevaluable, RapidsBuffer, SpillableColumnarBatch, SpillPriorities} +import ai.rapids.cudf._ +import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonRDD, SpecialLengths} +import org.apache.spark.api.python._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.util.toPrettySQL import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.python.PythonUDFRunner import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils @@ -436,12 +436,13 @@ class GpuArrowPythonRunner( table.close() GpuSemaphore.releaseIfNecessary(TaskContext.get()) }) - pythonInSchema.foreach { field => - if (field.nullable) { - builder.withColumnNames(field.name) - } else { - builder.withNotNullableColumnNames(field.name) - } + // Flatten the names of nested struct columns, required by cudf arrow IPC writer. + flattenNames(pythonInSchema).foreach { case (name, nullable) => + if (nullable) { + builder.withColumnNames(name) + } else { + builder.withNotNullableColumnNames(name) + } } Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) } @@ -463,6 +464,16 @@ class GpuArrowPythonRunner( if (onDataWriteFinished != null) onDataWriteFinished() } } + + private def flattenNames(d: DataType, nullable: Boolean=true): Seq[(String, Boolean)] = + d match { + case s: StructType => + s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable)) + case m: MapType => + flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable) + case a: ArrayType => flattenNames(a.elementType, nullable) + case _ => Nil + } } } }