Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support explode outer #2215

Merged
merged 6 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.EqualNullSafe"></a>spark.rapids.sql.expression.EqualNullSafe|`<=>`|Check if the values are equal including nulls <=>|true|None|
<a name="sql.expression.EqualTo"></a>spark.rapids.sql.expression.EqualTo|`=`, `==`|Check if the values are equal|true|None|
<a name="sql.expression.Exp"></a>spark.rapids.sql.expression.Exp|`exp`|Euler's number e raised to a power|true|None|
<a name="sql.expression.Explode"></a>spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime.|true|None|
<a name="sql.expression.Explode"></a>spark.rapids.sql.expression.Explode|`explode`, `explode_outer`|Given an input array produces a sequence of rows for each value in the array.|true|None|
<a name="sql.expression.Expm1"></a>spark.rapids.sql.expression.Expm1|`expm1`|Euler's number e raised to a power minus 1|true|None|
<a name="sql.expression.Floor"></a>spark.rapids.sql.expression.Floor|`floor`|Floor of a number|true|None|
<a name="sql.expression.FromUnixTime"></a>spark.rapids.sql.expression.FromUnixTime|`from_unixtime`|Get the string from a unix timestamp|true|None|
Expand Down Expand Up @@ -213,7 +213,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Not"></a>spark.rapids.sql.expression.Not|`!`, `not`|Boolean not operator|true|None|
<a name="sql.expression.Or"></a>spark.rapids.sql.expression.Or|`or`|Logical OR|true|None|
<a name="sql.expression.Pmod"></a>spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None|
<a name="sql.expression.PosExplode"></a>spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime.|true|None|
<a name="sql.expression.PosExplode"></a>spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array.|true|None|
<a name="sql.expression.Pow"></a>spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None|
<a name="sql.expression.PromotePrecision"></a>spark.rapids.sql.expression.PromotePrecision| |PromotePrecision before arithmetic operations between DecimalType data|true|None|
<a name="sql.expression.PythonUDF"></a>spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None|
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -5660,7 +5660,7 @@ Accelerator support is described below.
<tr>
<td rowSpan="2">Explode</td>
<td rowSpan="2">`explode`, `explode_outer`</td>
<td rowSpan="2">Given an input array produces a sequence of rows for each value in the array. Explode with outer Generate is not supported under GPU runtime.</td>
<td rowSpan="2">Given an input array produces a sequence of rows for each value in the array.</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>input</td>
Expand Down Expand Up @@ -11050,7 +11050,7 @@ Accelerator support is described below.
<tr>
<td rowSpan="2">PosExplode</td>
<td rowSpan="2">`posexplode_outer`, `posexplode`</td>
<td rowSpan="2">Given an input array produces a sequence of rows for each value in the array. PosExplode with outer Generate is not supported under GPU runtime.</td>
<td rowSpan="2">Given an input array produces a sequence of rows for each value in the array.</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>input</td>
Expand Down
42 changes: 42 additions & 0 deletions integration_tests/src/main/python/generate_expr_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,26 @@ def test_explode_nested_array_data(spark_tmp_path, data_gen):
'a', 'explode(b) as c').selectExpr('a', 'explode(c)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_explode_outer_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(data_gen)]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'explode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_explode_outer_nested_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr(
'a', 'explode_outer(b) as c').selectExpr('a', 'explode_outer(c)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
Expand Down Expand Up @@ -108,3 +128,25 @@ def test_posexplode_nested_array_data(spark_tmp_path, data_gen):
lambda spark: two_col_df(spark, *data_gen).selectExpr(
'a', 'posexplode(b) as (pos, c)').selectExpr('a', 'pos', 'posexplode(c)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_posexplode_outer_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(data_gen)]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr('a', 'posexplode_outer(b)'),
conf=conf_to_enforce_split_input)

#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_posexplode_nested_outer_array_data(spark_tmp_path, data_gen):
data_gen = [int_gen, ArrayGen(ArrayGen(data_gen))]
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, *data_gen).selectExpr(
'a', 'posexplode_outer(b) as (pos, c)').selectExpr(
'a', 'pos', 'posexplode_outer(c)'),
conf=conf_to_enforce_split_input)
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,10 @@ case class GpuExplode(child: Expression) extends GpuExplodeBase {
require(inputBatch.numCols() - 1 == generatorOffset,
"Internal Error GpuExplode supports one and only one input attribute.")
val schema = resultSchema(GpuColumnVector.extractTypes(inputBatch), generatorOffset)
val explodeFun = (t: Table) =>
if (outer) t.explodeOuter(generatorOffset) else t.explode(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(table.explode(generatorOffset)) { exploded =>
withResource(explodeFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
Expand All @@ -362,8 +364,10 @@ case class GpuPosExplode(child: Expression) extends GpuExplodeBase {
"Internal Error GpuPosExplode supports one and only one input attribute.")
val schema = resultSchema(
GpuColumnVector.extractTypes(inputBatch), generatorOffset, includePos = true)
val explodePosFun = (t: Table) =>
if (outer) t.explodeOuterPosition(generatorOffset) else t.explodePosition(generatorOffset)
withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(table.explodePosition(generatorOffset)) { exploded =>
withResource(explodePosFun(table)) { exploded =>
GpuColumnVector.from(exploded, schema)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2471,8 +2471,7 @@ object GpuOverrides {
GpuMakeDecimal(child, a.precision, a.scale, a.nullOnOverflow)
}),
expr[Explode](
"Given an input array produces a sequence of rows for each value in the array. "
+ "Explode with outer Generate is not supported under GPU runtime." ,
"Given an input array produces a sequence of rows for each value in the array.",
ExprChecks.unaryProject(
// Here is a walk-around representation, since multi-level nested type is not supported yet.
// related issue: https://github.com/NVIDIA/spark-rapids/issues/1901
Expand All @@ -2483,11 +2482,11 @@ object GpuOverrides {
TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY),
(TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)),
(a, conf, p, r) => new GeneratorExprMeta[Explode](a, conf, p, r) {
override val supportOuter: Boolean = true
override def convertToGpu(): GpuExpression = GpuExplode(childExprs.head.convertToGpu())
}),
expr[PosExplode](
"Given an input array produces a sequence of rows for each value in the array. "
+ "PosExplode with outer Generate is not supported under GPU runtime." ,
"Given an input array produces a sequence of rows for each value in the array.",
ExprChecks.unaryProject(
// Here is a walk-around representation, since multi-level nested type is not supported yet.
// related issue: https://github.com/NVIDIA/spark-rapids/issues/1901
Expand All @@ -2499,6 +2498,7 @@ object GpuOverrides {
TypeSig.ARRAY.nested(
TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + TypeSig.ARRAY)),
(a, conf, p, r) => new GeneratorExprMeta[PosExplode](a, conf, p, r) {
override val supportOuter: Boolean = true
override def convertToGpu(): GpuExpression = GpuPosExplode(childExprs.head.convertToGpu())
}),
expr[CollectList](
Expand Down