diff --git a/docs/configs.md b/docs/configs.md
index 2f4ad2304f9..3088e954050 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -162,7 +162,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
spark.rapids.sql.expression.EqualNullSafe|`<=>`|Check if the values are equal including nulls <=>|true|None|
spark.rapids.sql.expression.EqualTo|`=`, `==`|Check if the values are equal|true|None|
spark.rapids.sql.expression.Exp|`exp`|Euler's number e raised to a power|true|None|
-spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime.|true|None|
+spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array.|true|None|
spark.rapids.sql.expression.Expm1|`expm1`|Euler's number e raised to a power minus 1|true|None|
spark.rapids.sql.expression.Floor|`floor`|Floor of a number|true|None|
spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None|
@@ -213,7 +213,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
spark.rapids.sql.expression.Not|`!`, `not`|Boolean not operator|true|None|
spark.rapids.sql.expression.Or|`or`|Logical OR|true|None|
spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None|
-spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime.|true|None|
+spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array.|true|None|
spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None|
spark.rapids.sql.expression.PromotePrecision| |PromotePrecision before arithmetic operations between DecimalType data|true|None|
spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None|
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index 1adc9ae4647..87d4bd8acff 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -5683,7 +5683,7 @@ Accelerator support is described below.
Explode |
`explode`, `explode_outer` |
-Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime. |
+Given an input array produces a sequence of rows for each value in the array. |
None |
project |
input |
@@ -11073,7 +11073,7 @@ Accelerator support is described below.
PosExplode |
`posexplode_outer`, `posexplode` |
-Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime. |
+Given an input array produces a sequence of rows for each value in the array. |
None |
project |
input |
diff --git a/integration_tests/src/main/python/generate_expr_test.py b/integration_tests/src/main/python/generate_expr_test.py
index d3e283b8ae6..e41d49b35f4 100644
--- a/integration_tests/src/main/python/generate_expr_test.py
+++ b/integration_tests/src/main/python/generate_expr_test.py
@@ -69,6 +69,26 @@ def test_explode_nested_array_data(spark_tmp_path, data_gen):
'a', 'explode(b) as c').selectExpr('a', 'explode(c)'),
conf=conf_to_enforce_split_input)
+#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
+# After 3.1.0 is the min spark version we can drop this
+@ignore_order(local=True)
+@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
+def test_explode_outer_array_data(spark_tmp_path, data_gen):
+ data_gen = [int_gen, ArrayGen(data_gen)]
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode_outer(b)'),
+ conf=conf_to_enforce_split_input)
+
+#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
+# After 3.1.0 is the min spark version we can drop this
+@ignore_order(local=True)
+@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
+def test_explode_outer_nested_array_data(spark_tmp_path, data_gen):
+ data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))]
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: two_col_df(spark, *data_gen).selectExpr(
+ 'a', 'explode_outer(b) as c').selectExpr('a', 'explode_outer(c)'),
+ conf=conf_to_enforce_split_input)
#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@@ -108,3 +128,25 @@ def test_posexplode_nested_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr(
'a', 'posexplode(b) as (pos, c)').selectExpr('a', 'pos', 'posexplode(c)'),
conf=conf_to_enforce_split_input)
+
+#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
+# After 3.1.0 is the min spark version we can drop this
+@ignore_order(local=True)
+@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
+def test_posexplode_outer_array_data(spark_tmp_path, data_gen):
+ data_gen = [int_gen, ArrayGen(data_gen)]
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode_outer(b)'),
+ conf=conf_to_enforce_split_input)
+
+#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
+# After 3.1.0 is the min spark version we can drop this
+@ignore_order(local=True)
+@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
+def test_posexplode_nested_outer_array_data(spark_tmp_path, data_gen):
+ data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))]
+ assert_gpu_and_cpu_are_equal_collect(
+ lambda spark: two_col_df(spark, *data_gen).selectExpr(
+ 'a', 'posexplode_outer(b) as (pos, c)').selectExpr(
+ 'a', 'pos', 'posexplode_outer(c)'),
+ conf=conf_to_enforce_split_input)
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala
index 9fdb3b90bd4..13f55290033 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuGenerateExec.scala
@@ -342,8 +342,10 @@ case class GpuExplode(child: Expression) extends GpuExplodeBase {
require(inputBatch.numCols() - 1 == generatorOffset,
"Internal Error GpuExplode supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)
+ val explodeFun = (t: Table) =>
+ if (outer) t.explodeOuter(generatorOffset) else t.explode(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
- withResource(table.explode(generatorOffset)) { exploded =>
+ withResource(explodeFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
@@ -362,8 +364,10 @@ case class GpuPosExplode(child: Expression) extends GpuExplodeBase {
"Internal Error GpuPosExplode supports one and only one input attribute.")
val schema = resultSchema(
GpuColumnVector.extractTypes(inputBatch), generatorOffset, includePos = true)
+ val explodePosFun = (t: Table) =>
+ if (outer) t.explodeOuterPosition(generatorOffset) else t.explodePosition(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
- withResource(table.explodePosition(generatorOffset)) { exploded =>
+ withResource(explodePosFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index 48a793ecd96..3565b9bf1ad 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -2471,8 +2471,7 @@ object GpuOverrides {
GpuMakeDecimal(child, a.precision, a.scale, a.nullOnOverflow)
}),
expr[Explode](
- "Given an input array produces a sequence of rows for each value in the array. "
- + "Explode with outer Generate is not supported under GPU runtime." ,
+ "Given an input array produces a sequence of rows for each value in the array.",
ExprChecks.unaryProject(
// Here is a walk-around representation, since multi-level nested type is not supported yet.
// related issue: https://github.com/NVIDIA/spark-rapids/issues/1901
@@ -2483,11 +2482,11 @@ object GpuOverrides {
TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)),
(a, conf, p, r) => new GeneratorExprMeta[Explode](a, conf, p, r) {
+ override val supportOuter: Boolean = true
override def convertToGpu(): GpuExpression = GpuExplode(childExprs.head.convertToGpu())
}),
expr[PosExplode](
- "Given an input array produces a sequence of rows for each value in the array. "
- + "PosExplode with outer Generate is not supported under GPU runtime." ,
+ "Given an input array produces a sequence of rows for each value in the array.",
ExprChecks.unaryProject(
// Here is a walk-around representation, since multi-level nested type is not supported yet.
// related issue: https://github.com/NVIDIA/spark-rapids/issues/1901
@@ -2499,6 +2498,7 @@ object GpuOverrides {
TypeSig.ARRAY.nested(
TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY)),
(a, conf, p, r) => new GeneratorExprMeta[PosExplode](a, conf, p, r) {
+ override val supportOuter: Boolean = true
override def convertToGpu(): GpuExpression = GpuPosExplode(childExprs.head.convertToGpu())
}),
expr[CollectList](