Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ReplicateRows #4388

Merged
merged 6 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.RegExpExtract"></a>spark.rapids.sql.expression.RegExpExtract|`regexp_extract`|RegExpExtract|false|This is disabled by default because the implementation is not 100% compatible. See the compatibility guide for more information.|
<a name="sql.expression.RegExpReplace"></a>spark.rapids.sql.expression.RegExpReplace|`regexp_replace`|RegExpReplace support for string literal input patterns|false|This is disabled by default because the implementation is not 100% compatible. See the compatibility guide for more information.|
<a name="sql.expression.Remainder"></a>spark.rapids.sql.expression.Remainder|`%`, `mod`|Remainder or modulo|true|None|
<a name="sql.expression.ReplicateRows"></a>spark.rapids.sql.expression.ReplicateRows| |Given an input row replicates the row N times|true|None|
<a name="sql.expression.Rint"></a>spark.rapids.sql.expression.Rint|`rint`|Rounds up a double value to the nearest double equal to an integer|true|None|
<a name="sql.expression.Round"></a>spark.rapids.sql.expression.Round|`round`|Round an expression to d decimal places using HALF_UP rounding mode|true|None|
<a name="sql.expression.RowNumber"></a>spark.rapids.sql.expression.RowNumber|`row_number`|Window function that returns the index for the row within the aggregation window|true|None|
Expand Down
47 changes: 47 additions & 0 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -10119,6 +10119,53 @@ are limited.
<th>UDT</th>
</tr>
<tr>
<td rowSpan="2">ReplicateRows</td>
<td rowSpan="2"> </td>
<td rowSpan="2">Given an input row replicates the row N times</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>input</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><em>PS<br/>UTC is only supported TZ for TIMESTAMP</em></td>
<td>S</td>
<td><em>PS<br/>max DECIMAL precision of 18</em></td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>max child DECIMAL precision of 18;<br/>UTC is only supported TZ for child TIMESTAMP;<br/>unsupported child types BINARY, CALENDAR, MAP, UDT</em></td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="4">Rint</td>
<td rowSpan="4">`rint`</td>
<td rowSpan="4">Rounds up a double value to the nearest double equal to an integer</td>
Expand Down
17 changes: 17 additions & 0 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,23 @@ def test_hash_avg_nulls_partial_only(data_gen):
conf=_no_nans_float_conf_partial
)

@approximate_float
@ignore_order
@incompat
@pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimal, ids=idfn)
def test_intersectAll(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen, length=100).intersectAll(gen_df(spark, data_gen, length=100)),
conf=allow_negative_scale_of_decimal_conf)

@approximate_float
@ignore_order
@incompat
@pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimal, ids=idfn)
def test_exceptAll(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen, length=100).exceptAll(gen_df(spark, data_gen, length=100).filter('a != b')),
conf=allow_negative_scale_of_decimal_conf)

@approximate_float
@ignore_order(local=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Generator}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Generator, ReplicateRows}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{GenerateExec, SparkPlan}
import org.apache.spark.sql.rapids.GpuCreateArray
import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, MapType, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, MapType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

class GpuGenerateExecSparkPlanMeta(
Expand Down Expand Up @@ -68,6 +68,22 @@ abstract class GeneratorExprMeta[INPUT <: Generator](
val supportOuter: Boolean = false
}

/**
* Base class for metadata around GeneratorExprMeta.
*/
abstract class ReplicateRowsExprMeta[INPUT <: ReplicateRows](
gen: INPUT,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends GeneratorExprMeta[INPUT](gen, conf, parent, rule) {

override final def convertToGpu(): GpuExpression =
convertToGpu(childExprs.map(_.convertToGpu()))

def convertToGpu(childExprs: Seq[Expression]): GpuExpression
}

/**
* GPU overrides of `Generator`, corporate with `GpuGenerateExec`.
*/
Expand Down Expand Up @@ -164,6 +180,63 @@ trait GpuGenerator extends GpuUnevaluable {
}
}

case class GpuReplicateRows(children: Seq[Expression]) extends GpuGenerator {
revans2 marked this conversation as resolved.
Show resolved Hide resolved

override def elementSchema: StructType =
StructType(children.tail.zipWithIndex.map {
case (e, index) => StructField(s"col$index", e.dataType)
})

override def generate(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean): ColumnarBatch = {

val schema = GpuColumnVector.extractTypes(inputBatch)
val vectors = GpuColumnVector.extractBases(inputBatch)
val replicateVector = vectors(generatorOffset)
revans2 marked this conversation as resolved.
Show resolved Hide resolved

def replicateRows(inputTable: Table, genOffset: ColumnVector): Table = {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
inputTable.repeat(genOffset)
}

withResource(GpuColumnVector.from(inputBatch)) { table =>
withResource(replicateRows(table, replicateVector)) { replicatedTable =>
GpuColumnVector.from(replicatedTable, schema)
}
}
}

override def inputSplitIndices(inputBatch: ColumnarBatch,
generatorOffset: Int,
outer: Boolean,
targetSizeBytes: Long): Array[Int] = {
val vectors = GpuColumnVector.extractBases(inputBatch)
val inputRows = inputBatch.numRows()
if (inputRows == 0) return Array()
revans2 marked this conversation as resolved.
Show resolved Hide resolved

// Calculate the number of rows that needs to be replicated. Here we find the mean of the
revans2 marked this conversation as resolved.
Show resolved Hide resolved
// generator column. Multiplying the mean with size of projected columns would give us the
// approximate memory required.
val meanOutputRows = math.ceil(vectors(generatorOffset).mean().getDouble)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
val estimatedOutputRows = meanOutputRows * inputRows

// input size of columns to be repeated
val repeatColsInputSize = vectors.slice(0, generatorOffset).map(_.getDeviceMemorySize).sum
// estimated total output size
val estimatedOutputSizeBytes = repeatColsInputSize * estimatedOutputRows / inputRows

// how may splits will we need to keep the output size under the target size
val numSplitsForTargetSize = math.ceil(estimatedOutputSizeBytes / targetSizeBytes).toInt
// how may splits will we need to keep the output rows under max value
val numSplitsForTargetRow = math.ceil(estimatedOutputRows / Int.MaxValue).toInt
// how may splits will we need to keep replicateRows working safely
val numSplits = numSplitsForTargetSize max numSplitsForTargetRow

if (numSplits == 0) Array()
else GpuBatchUtils.generateSplitIndices(inputRows, numSplits)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
}

abstract class GpuExplodeBase extends GpuUnevaluableUnaryExpression with GpuGenerator {

/** The position of an element within the collection should also be returned. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,22 @@ object GpuOverrides extends Logging {
override val supportOuter: Boolean = true
override def convertToGpu(): GpuExpression = GpuPosExplode(childExprs.head.convertToGpu())
}),
expr[ReplicateRows](
"Given an input row replicates the row N times",
ExprChecks.projectOnly(
// The plan is optimized to run HashAggregate on the rows to be replicated. So this
revans2 marked this conversation as resolved.
Show resolved Hide resolved
// currently supports DECIMAL 64 and HashAggregateExec doesn't support DECIMAL 128 yet.
TypeSig.ARRAY.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64 +
TypeSig.ARRAY + TypeSig.STRUCT),
TypeSig.ARRAY.nested(TypeSig.all),
repeatingParamCheck = Some(RepeatingParamCheck("input",
(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_64
+ TypeSig.ARRAY + TypeSig.STRUCT).nested(),
TypeSig.all))),
(a, conf, p, r) => new ReplicateRowsExprMeta[ReplicateRows](a, conf, p, r) {
override def convertToGpu(childExpr: Seq[Expression]): GpuExpression =
GpuReplicateRows(childExpr)
}),
expr[CollectList](
"Collect a list of non-unique elements, not supported in reduction",
// GpuCollectList is not yet supported in Reduction context.
Expand Down