From 6f04038a2dfa5da37684fea3b89ecd42a42fe134 Mon Sep 17 00:00:00 2001 From: Alfred Xu Date: Thu, 22 Apr 2021 08:09:36 +0800 Subject: [PATCH] Support explode outer (#2215) support explode_outer and posexplode_outer with corresponding test cases --- docs/configs.md | 4 +- docs/supported_ops.md | 4 +- .../src/main/python/generate_expr_test.py | 42 +++++++++++++++++++ .../nvidia/spark/rapids/GpuGenerateExec.scala | 8 +++- .../nvidia/spark/rapids/GpuOverrides.scala | 8 ++-- 5 files changed, 56 insertions(+), 10 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 2f4ad2304f9..3088e954050 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -162,7 +162,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.EqualNullSafe|`<=>`|Check if the values are equal including nulls <=>|true|None| spark.rapids.sql.expression.EqualTo|`=`, `==`|Check if the values are equal|true|None| spark.rapids.sql.expression.Exp|`exp`|Euler's number e raised to a power|true|None| -spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime.|true|None| +spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array.|true|None| spark.rapids.sql.expression.Expm1|`expm1`|Euler's number e raised to a power minus 1|true|None| spark.rapids.sql.expression.Floor|`floor`|Floor of a number|true|None| spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None| @@ -213,7 +213,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Not|`!`, `not`|Boolean not operator|true|None| spark.rapids.sql.expression.Or|`or`|Logical OR|true|None| spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None| -spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime.|true|None| +spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array.|true|None| spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None| spark.rapids.sql.expression.PromotePrecision| |PromotePrecision before arithmetic operations between DecimalType data|true|None| spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 1adc9ae4647..87d4bd8acff 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -5683,7 +5683,7 @@ Accelerator support is described below. Explode `explode`, `explode_outer` -Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime. +Given an input array produces a sequence of rows for each value in the array. None project input @@ -11073,7 +11073,7 @@ Accelerator support is described below. PosExplode `posexplode_outer`, `posexplode` -Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime. +Given an input array produces a sequence of rows for each value in the array. None project input diff --git a/integration_tests/src/main/python/generate_expr_test.py b/integration_tests/src/main/python/generate_expr_test.py index d3e283b8ae6..e41d49b35f4 100644 --- a/integration_tests/src/main/python/generate_expr_test.py +++ b/integration_tests/src/main/python/generate_expr_test.py @@ -69,6 +69,26 @@ def test_explode_nested_array_data(spark_tmp_path, data_gen): 'a', 'explode(b) as c').selectExpr('a', 'explode(c)'), conf=conf_to_enforce_split_input) +#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +def test_explode_outer_array_data(spark_tmp_path, data_gen): + data_gen = [int_gen, ArrayGen(data_gen)] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode_outer(b)'), + conf=conf_to_enforce_split_input) + +#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +def test_explode_outer_nested_array_data(spark_tmp_path, data_gen): + data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: two_col_df(spark, *data_gen).selectExpr( + 'a', 'explode_outer(b) as c').selectExpr('a', 'explode_outer(c)'), + conf=conf_to_enforce_split_input) #sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -108,3 +128,25 @@ def test_posexplode_nested_array_data(spark_tmp_path, data_gen): lambda spark: two_col_df(spark, *data_gen).selectExpr( 'a', 'posexplode(b) as (pos, c)').selectExpr('a', 'pos', 'posexplode(c)'), conf=conf_to_enforce_split_input) + +#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +def test_posexplode_outer_array_data(spark_tmp_path, data_gen): + data_gen = [int_gen, ArrayGen(data_gen)] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode_outer(b)'), + conf=conf_to_enforce_split_input) + +#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +def test_posexplode_nested_outer_array_data(spark_tmp_path, data_gen): + data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: two_col_df(spark, *data_gen).selectExpr( + 'a', 'posexplode_outer(b) as (pos, c)').selectExpr( + 'a', 'pos', 'posexplode_outer(c)'), + conf=conf_to_enforce_split_input) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala index 9fdb3b90bd4..13f55290033 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala @@ -342,8 +342,10 @@ case class GpuExplode(child: Expression) extends GpuExplodeBase { require(inputBatch.numCols() - 1 == generatorOffset, "Internal Error GpuExplode supports one and only one input attribute.") val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset) + val explodeFun = (t: Table) => + if (outer) t.explodeOuter(generatorOffset) else t.explode(generatorOffset) withResource(GpuColumnVector.from(inputBatch)) { table => - withResource(table.explode(generatorOffset)) { exploded => + withResource(explodeFun(table)) { exploded => GpuColumnVector.from(exploded, schema) } } @@ -362,8 +364,10 @@ case class GpuPosExplode(child: Expression) extends GpuExplodeBase { "Internal Error GpuPosExplode supports one and only one input attribute.") val schema = resultSchema( GpuColumnVector.extractTypes(inputBatch), generatorOffset, includePos = true) + val explodePosFun = (t: Table) => + if (outer) t.explodeOuterPosition(generatorOffset) else t.explodePosition(generatorOffset) withResource(GpuColumnVector.from(inputBatch)) { table => - withResource(table.explodePosition(generatorOffset)) { exploded => + withResource(explodePosFun(table)) { exploded => GpuColumnVector.from(exploded, schema) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 48a793ecd96..3565b9bf1ad 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2471,8 +2471,7 @@ object GpuOverrides { GpuMakeDecimal(child, a.precision, a.scale, a.nullOnOverflow) }), expr[Explode]( - "Given an input array produces a sequence of rows for each value in the array. " - + "Explode with outer Generate is not supported under GPU runtime." , + "Given an input array produces a sequence of rows for each value in the array.", ExprChecks.unaryProject( // Here is a walk-around representation, since multi-level nested type is not supported yet. // related issue: https://github.com/NVIDIA/spark-rapids/issues/1901 @@ -2483,11 +2482,11 @@ object GpuOverrides { TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY), (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)), (a, conf, p, r) => new GeneratorExprMeta[Explode](a, conf, p, r) { + override val supportOuter: Boolean = true override def convertToGpu(): GpuExpression = GpuExplode(childExprs.head.convertToGpu()) }), expr[PosExplode]( - "Given an input array produces a sequence of rows for each value in the array. " - + "PosExplode with outer Generate is not supported under GPU runtime." , + "Given an input array produces a sequence of rows for each value in the array.", ExprChecks.unaryProject( // Here is a walk-around representation, since multi-level nested type is not supported yet. // related issue: https://github.com/NVIDIA/spark-rapids/issues/1901 @@ -2499,6 +2498,7 @@ object GpuOverrides { TypeSig.ARRAY.nested( TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY)), (a, conf, p, r) => new GeneratorExprMeta[PosExplode](a, conf, p, r) { + override val supportOuter: Boolean = true override def convertToGpu(): GpuExpression = GpuPosExplode(childExprs.head.convertToGpu()) }), expr[CollectList](