From 6653e3f04d56e83acfc12f649ba2c41e4937e99a Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Fri, 26 Mar 2021 10:21:14 +0800 Subject: [PATCH 1/5] support explode with GenerateOuter Signed-off-by: sperlingxx --- .../src/main/python/generate_expr_test.py | 42 +++++++++++++++++++ .../nvidia/spark/rapids/GpuGenerateExec.scala | 8 +++- .../nvidia/spark/rapids/GpuOverrides.scala | 2 + 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/generate_expr_test.py b/integration_tests/src/main/python/generate_expr_test.py index d3e283b8ae6..2ae776ba09b 100644 --- a/integration_tests/src/main/python/generate_expr_test.py +++ b/integration_tests/src/main/python/generate_expr_test.py @@ -69,6 +69,27 @@ def test_explode_nested_array_data(spark_tmp_path, data_gen): 'a', 'explode(b) as c').selectExpr('a', 'explode(c)'), conf=conf_to_enforce_split_input) +#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +def test_explode_outer_array_data(spark_tmp_path, data_gen): + data_gen = [int_gen, ArrayGen(data_gen)] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode_outer(b)'), + conf=conf_to_enforce_split_input) + +#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +def test_explode_outer_nested_array_data(spark_tmp_path, data_gen): + data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: two_col_df(spark, *data_gen).selectExpr( + 'a', 'explode_outer(b) as c').selectExpr('a', 'explode_outer(c)'), + conf=conf_to_enforce_split_input) + #sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -108,3 +129,24 @@ def test_posexplode_nested_array_data(spark_tmp_path, data_gen): lambda spark: two_col_df(spark, *data_gen).selectExpr( 'a', 'posexplode(b) as (pos, c)').selectExpr('a', 'pos', 'posexplode(c)'), conf=conf_to_enforce_split_input) + +#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +def test_posexplode_outer_array_data(spark_tmp_path, data_gen): + data_gen = [int_gen, ArrayGen(data_gen)] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode(b)'), + conf=conf_to_enforce_split_input) + +#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 +# After 3.1.0 is the min spark version we can drop this +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +def test_posexplode_nested_outer_array_data(spark_tmp_path, data_gen): + data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))] + assert_gpu_and_cpu_are_equal_collect( + lambda spark: two_col_df(spark, *data_gen).selectExpr( + 'a', 'posexplode_outer(b) as (pos, c)').selectExpr('a', 'pos', 'posexplode_outer(c)'), + conf=conf_to_enforce_split_input) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala index 9fdb3b90bd4..13f55290033 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala @@ -342,8 +342,10 @@ case class GpuExplode(child: Expression) extends GpuExplodeBase { require(inputBatch.numCols() - 1 == generatorOffset, "Internal Error GpuExplode supports one and only one input attribute.") val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset) + val explodeFun = (t: Table) => + if (outer) t.explodeOuter(generatorOffset) else t.explode(generatorOffset) withResource(GpuColumnVector.from(inputBatch)) { table => - withResource(table.explode(generatorOffset)) { exploded => + withResource(explodeFun(table)) { exploded => GpuColumnVector.from(exploded, schema) } } @@ -362,8 +364,10 @@ case class GpuPosExplode(child: Expression) extends GpuExplodeBase { "Internal Error GpuPosExplode supports one and only one input attribute.") val schema = resultSchema( GpuColumnVector.extractTypes(inputBatch), generatorOffset, includePos = true) + val explodePosFun = (t: Table) => + if (outer) t.explodeOuterPosition(generatorOffset) else t.explodePosition(generatorOffset) withResource(GpuColumnVector.from(inputBatch)) { table => - withResource(table.explodePosition(generatorOffset)) { exploded => + withResource(explodePosFun(table)) { exploded => GpuColumnVector.from(exploded, schema) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 874b4df020a..249b9eb7087 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2405,6 +2405,7 @@ object GpuOverrides { TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY), (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)), (a, conf, p, r) => new GeneratorExprMeta[Explode](a, conf, p, r) { + override val supportOuter: Boolean = true override def convertToGpu(): GpuExpression = GpuExplode(childExprs(0).convertToGpu()) }), expr[PosExplode]( @@ -2421,6 +2422,7 @@ object GpuOverrides { TypeSig.ARRAY.nested( TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY)), (a, conf, p, r) => new GeneratorExprMeta[PosExplode](a, conf, p, r) { + override val supportOuter: Boolean = true override def convertToGpu(): GpuExpression = GpuPosExplode(childExprs(0).convertToGpu()) }), expr[CollectList]( From 7c3ee8500d64cf2be7a269b21cfd8e4988758898 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 31 Mar 2021 13:46:14 +0800 Subject: [PATCH 2/5] fix typo --- integration_tests/src/main/python/generate_expr_test.py | 2 +- .../main/scala/com/nvidia/spark/rapids/GpuOverrides.scala | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/integration_tests/src/main/python/generate_expr_test.py b/integration_tests/src/main/python/generate_expr_test.py index 2ae776ba09b..57ce5eb0df8 100644 --- a/integration_tests/src/main/python/generate_expr_test.py +++ b/integration_tests/src/main/python/generate_expr_test.py @@ -137,7 +137,7 @@ def test_posexplode_nested_array_data(spark_tmp_path, data_gen): def test_posexplode_outer_array_data(spark_tmp_path, data_gen): data_gen = [int_gen, ArrayGen(data_gen)] assert_gpu_and_cpu_are_equal_collect( - lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode(b)'), + lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode_outer(b)'), conf=conf_to_enforce_split_input) #sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 249b9eb7087..3c024aa128a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2393,8 +2393,7 @@ object GpuOverrides { GpuMakeDecimal(child, a.precision, a.scale, a.nullOnOverflow) }), expr[Explode]( - "Given an input array produces a sequence of rows for each value in the array. " - + "Explode with outer Generate is not supported under GPU runtime." , + "Given an input array produces a sequence of rows for each value in the array.", ExprChecks.unaryProject( // Here is a walk-around representation, since multi-level nested type is not supported yet. // related issue: https://github.com/NVIDIA/spark-rapids/issues/1901 @@ -2409,8 +2408,7 @@ object GpuOverrides { override def convertToGpu(): GpuExpression = GpuExplode(childExprs(0).convertToGpu()) }), expr[PosExplode]( - "Given an input array produces a sequence of rows for each value in the array. " - + "PosExplode with outer Generate is not supported under GPU runtime." , + "Given an input array produces a sequence of rows for each value in the array.", ExprChecks.unaryProject( // Here is a walk-around representation, since multi-level nested type is not supported yet. // related issue: https://github.com/NVIDIA/spark-rapids/issues/1901 From 0b964f8603d416519b78a5c0fcb45b6cd730ce18 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Thu, 1 Apr 2021 10:10:20 +0800 Subject: [PATCH 3/5] update doc Signed-off-by: sperlingxx --- docs/configs.md | 4 ++-- docs/supported_ops.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 8abcf8f00a2..14e0df08894 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -153,7 +153,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.EqualNullSafe|`<=>`|Check if the values are equal including nulls <=>|true|None| spark.rapids.sql.expression.EqualTo|`=`, `==`|Check if the values are equal|true|None| spark.rapids.sql.expression.Exp|`exp`|Euler's number e raised to a power|true|None| -spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime.|true|None| +spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array.|true|None| spark.rapids.sql.expression.Expm1|`expm1`|Euler's number e raised to a power minus 1|true|None| spark.rapids.sql.expression.Floor|`floor`|Floor of a number|true|None| spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None| @@ -203,7 +203,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Not|`!`, `not`|Boolean not operator|true|None| spark.rapids.sql.expression.Or|`or`|Logical OR|true|None| spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None| -spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime.|true|None| +spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array.|true|None| spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None| spark.rapids.sql.expression.PromotePrecision| |PromotePrecision before arithmetic operations between DecimalType data|true|None| spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index fe803eb5d21..d33efdd3dd6 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -5328,7 +5328,7 @@ Accelerator support is described below. Explode `explode`, `explode_outer` -Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime. +Given an input array produces a sequence of rows for each value in the array. None project input @@ -10286,7 +10286,7 @@ Accelerator support is described below. PosExplode `posexplode_outer`, `posexplode` -Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime. +Given an input array produces a sequence of rows for each value in the array. None project input From f389cbd6df2b31e85809e45d46fa336bc82916fc Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 21 Apr 2021 12:04:18 +0800 Subject: [PATCH 4/5] sign Signed-off-by: sperlingxx --- integration_tests/src/main/python/generate_expr_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integration_tests/src/main/python/generate_expr_test.py b/integration_tests/src/main/python/generate_expr_test.py index 57ce5eb0df8..5f5c7c23c0b 100644 --- a/integration_tests/src/main/python/generate_expr_test.py +++ b/integration_tests/src/main/python/generate_expr_test.py @@ -90,7 +90,6 @@ def test_explode_outer_nested_array_data(spark_tmp_path, data_gen): 'a', 'explode_outer(b) as c').selectExpr('a', 'explode_outer(c)'), conf=conf_to_enforce_split_input) - #sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) From dbb3a64d9468b0ea5efbd092ce384b324614bd50 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Wed, 21 Apr 2021 14:29:52 +0800 Subject: [PATCH 5/5] append sign Signed-off-by: sperlingxx --- integration_tests/src/main/python/generate_expr_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/generate_expr_test.py b/integration_tests/src/main/python/generate_expr_test.py index 5f5c7c23c0b..e41d49b35f4 100644 --- a/integration_tests/src/main/python/generate_expr_test.py +++ b/integration_tests/src/main/python/generate_expr_test.py @@ -147,5 +147,6 @@ def test_posexplode_nested_outer_array_data(spark_tmp_path, data_gen): data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))] assert_gpu_and_cpu_are_equal_collect( lambda spark: two_col_df(spark, *data_gen).selectExpr( - 'a', 'posexplode_outer(b) as (pos, c)').selectExpr('a', 'pos', 'posexplode_outer(c)'), + 'a', 'posexplode_outer(b) as (pos, c)').selectExpr( + 'a', 'pos', 'posexplode_outer(c)'), conf=conf_to_enforce_split_input)