Skip to content

Commit

Permalink
Support explode outer (NVIDIA#2215)
Browse files Browse the repository at this point in the history
support explode_outer and posexplode_outer with corresponding test cases
  • Loading branch information
sperlingxx authored Apr 22, 2021
1 parent 4b4a7a1 commit 6f04038
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 10 deletions.
4 changes: 2 additions & 2 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.EqualNullSafe"></a>spark.rapids.sql.expression.EqualNullSafe|`<=>`|Check if the values are equal including nulls <=>|true|None|
<a name="sql.expression.EqualTo"></a>spark.rapids.sql.expression.EqualTo|`=`, `==`|Check if the values are equal|true|None|
<a name="sql.expression.Exp"></a>spark.rapids.sql.expression.Exp|`exp`|Euler's number e raised to a power|true|None|
<a name="sql.expression.Explode"></a>spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime.|true|None|
<a name="sql.expression.Explode"></a>spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array.|true|None|
<a name="sql.expression.Expm1"></a>spark.rapids.sql.expression.Expm1|`expm1`|Euler's number e raised to a power minus 1|true|None|
<a name="sql.expression.Floor"></a>spark.rapids.sql.expression.Floor|`floor`|Floor of a number|true|None|
<a name="sql.expression.FromUnixTime"></a>spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None|
Expand Down Expand Up @@ -213,7 +213,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Not"></a>spark.rapids.sql.expression.Not|`!`, `not`|Boolean not operator|true|None|
<a name="sql.expression.Or"></a>spark.rapids.sql.expression.Or|`or`|Logical OR|true|None|
<a name="sql.expression.Pmod"></a>spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None|
<a name="sql.expression.PosExplode"></a>spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime.|true|None|
<a name="sql.expression.PosExplode"></a>spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array.|true|None|
<a name="sql.expression.Pow"></a>spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None|
<a name="sql.expression.PromotePrecision"></a>spark.rapids.sql.expression.PromotePrecision| |PromotePrecision before arithmetic operations between DecimalType data|true|None|
<a name="sql.expression.PythonUDF"></a>spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None|
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -5683,7 +5683,7 @@ Accelerator support is described below.
<tr>
<td rowSpan="2">Explode</td>
<td rowSpan="2">`explode`, `explode_outer`</td>
<td rowSpan="2">Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime.</td>
<td rowSpan="2">Given an input array produces a sequence of rows for each value in the array.</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>input</td>
Expand Down Expand Up @@ -11073,7 +11073,7 @@ Accelerator support is described below.
<tr>
<td rowSpan="2">PosExplode</td>
<td rowSpan="2">`posexplode_outer`, `posexplode`</td>
<td rowSpan="2">Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime.</td>
<td rowSpan="2">Given an input array produces a sequence of rows for each value in the array.</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>input</td>
Expand Down
42 changes: 42 additions & 0 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,26 @@ def test_explode_nested_array_data(spark_tmp_path, data_gen):
'a', 'explode(b) as c').selectExpr('a', 'explode(c)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_explode_outer_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(data_gen)]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_explode_outer_nested_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr(
'a', 'explode_outer(b) as c').selectExpr('a', 'explode_outer(c)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand Down Expand Up @@ -108,3 +128,25 @@ def test_posexplode_nested_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr(
'a', 'posexplode(b) as (pos, c)').selectExpr('a', 'pos', 'posexplode(c)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_posexplode_outer_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(data_gen)]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_posexplode_nested_outer_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr(
'a', 'posexplode_outer(b) as (pos, c)').selectExpr(
'a', 'pos', 'posexplode_outer(c)'),
conf=conf_to_enforce_split_input)
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,10 @@ case class GpuExplode(child: Expression) extends GpuExplodeBase {
require(inputBatch.numCols() - 1 == generatorOffset,
"Internal Error GpuExplode supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)
val explodeFun = (t: Table) =>
if (outer) t.explodeOuter(generatorOffset) else t.explode(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(table.explode(generatorOffset)) { exploded =>
withResource(explodeFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
Expand All @@ -362,8 +364,10 @@ case class GpuPosExplode(child: Expression) extends GpuExplodeBase {
"Internal Error GpuPosExplode supports one and only one input attribute.")
val schema = resultSchema(
GpuColumnVector.extractTypes(inputBatch), generatorOffset, includePos = true)
val explodePosFun = (t: Table) =>
if (outer) t.explodeOuterPosition(generatorOffset) else t.explodePosition(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(table.explodePosition(generatorOffset)) { exploded =>
withResource(explodePosFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2471,8 +2471,7 @@ object GpuOverrides {
GpuMakeDecimal(child, a.precision, a.scale, a.nullOnOverflow)
}),
expr[Explode](
"Given an input array produces a sequence of rows for each value in the array. "
+ "Explode with outer Generate is not supported under GPU runtime." ,
"Given an input array produces a sequence of rows for each value in the array.",
ExprChecks.unaryProject(
// Here is a walk-around representation, since multi-level nested type is not supported yet.
// related issue: https://github.com/NVIDIA/spark-rapids/issues/1901
Expand All @@ -2483,11 +2482,11 @@ object GpuOverrides {
TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)),
(a, conf, p, r) => new GeneratorExprMeta[Explode](a, conf, p, r) {
override val supportOuter: Boolean = true
override def convertToGpu(): GpuExpression = GpuExplode(childExprs.head.convertToGpu())
}),
expr[PosExplode](
"Given an input array produces a sequence of rows for each value in the array. "
+ "PosExplode with outer Generate is not supported under GPU runtime." ,
"Given an input array produces a sequence of rows for each value in the array.",
ExprChecks.unaryProject(
// Here is a walk-around representation, since multi-level nested type is not supported yet.
// related issue: https://github.com/NVIDIA/spark-rapids/issues/1901
Expand All @@ -2499,6 +2498,7 @@ object GpuOverrides {
TypeSig.ARRAY.nested(
TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY)),
(a, conf, p, r) => new GeneratorExprMeta[PosExplode](a, conf, p, r) {
override val supportOuter: Boolean = true
override def convertToGpu(): GpuExpression = GpuPosExplode(childExprs.head.convertToGpu())
}),
expr[CollectList](
Expand Down

0 comments on commit 6f04038

Please sign in to comment.