Skip to content

Commit

Permalink
refine && merge
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <[email protected]>
  • Loading branch information
sperlingxx committed May 17, 2021
1 parent 0d76e0c commit ce02997
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 135 deletions.
10 changes: 10 additions & 0 deletions integration_tests/src/main/python/collection_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,28 @@

@pytest.mark.parametrize('data_gen', non_nested_array_gens, ids=idfn)
def test_concat_list(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).selectExpr('concat(a)'))

assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).selectExpr('concat(a, b)'))

assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark, data_gen, data_gen, data_gen
).selectExpr('concat(a, b, c)'))

def test_empty_concat_list():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, ArrayGen(LongGen())).selectExpr('concat()'))

def test_concat_string():
gen = mk_str_gen('.{0,5}')
(s1, s2) = gen_scalars(gen, 2, force_no_nulls=True)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, gen).select(
f.concat(),
f.concat(f.col('a')),
f.concat(s1),
f.concat(f.col('a'), f.col('b')),
f.concat(f.col('a'), f.col('b'), f.col('a')),
f.concat(s1, f.col('b')),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,62 @@

package org.apache.spark.sql.rapids

import ai.rapids.cudf.{ColumnVector, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuScalar, GpuUnaryExpression}
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ColumnView, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuExpressionsUtils, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

case class GpuSize(child: Expression, legacySizeOfNull: Boolean)
extends GpuUnaryExpression {
case class GpuConcat(children: Seq[Expression]) extends GpuComplexTypeMergingExpression {

require(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType],
s"The size function doesn't support the operand type ${child.dataType}")
@transient override lazy val dataType: DataType = {
if (children.isEmpty) {
StringType
} else {
super.dataType
}
}

override def dataType: DataType = IntegerType
override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable
override def nullable: Boolean = children.exists(_.nullable)

override protected def doColumnar(input: GpuColumnVector): ColumnVector = {
override def columnarEval(batch: ColumnarBatch): Any = dataType match {
// Explicitly return null for empty concat as Spark, since cuDF doesn't support empty concat.
case dt if children.isEmpty => GpuScalar.from(null, dt)
// For single column concat, we pass the result of child node to avoid extra cuDF call.
case _ if children.length == 1 => children.head.columnarEval(batch)
case StringType => stringConcat(batch)
case ArrayType(_, _) => listConcat(batch)
case _ => throw new IllegalArgumentException(s"unsupported dataType $dataType")
}

// Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering
// MapData is represented as List of Struct in terms of cuDF.
withResource(input.getBase.countElements()) { collectionSize =>
if (legacySizeOfNull) {
withResource(Scalar.fromInt(-1)) { nullScalar =>
withResource(input.getBase.isNull) { inputIsNull =>
inputIsNull.ifElse(nullScalar, collectionSize)
}
private def stringConcat(batch: ColumnarBatch): GpuColumnVector = {
withResource(ArrayBuffer.empty[ColumnVector]) { buffer =>
// build input buffer
children.foreach {
buffer += GpuExpressionsUtils.columnarEvalToColumn(_, batch).getBase
}
// run string concatenate
GpuColumnVector.from(
ColumnVector.stringConcatenate(buffer.toArray[ColumnView]), StringType)
}
}

private def listConcat(batch: ColumnarBatch): GpuColumnVector = {
withResource(ArrayBuffer[ColumnVector]()) { buffer =>
// build input buffer
children.foreach { child =>
child.columnarEval(batch) match {
case cv: GpuColumnVector => buffer += cv.getBase
case _ => throw new UnsupportedOperationException("Unsupported GpuScalar of List")
}
} else {
collectionSize.incRefCount()
}
// run list concatenate
GpuColumnVector.from(ColumnVector.listConcatenateByRow(buffer: _*), dataType)
}
}
}
Expand Down Expand Up @@ -129,3 +155,30 @@ case class GpuElementAt(left: Expression, right: Expression)

override def prettyName: String = "element_at"
}

case class GpuSize(child: Expression, legacySizeOfNull: Boolean)
extends GpuUnaryExpression {

require(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType],
s"The size function doesn't support the operand type ${child.dataType}")

override def dataType: DataType = IntegerType
override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable

override protected def doColumnar(input: GpuColumnVector): ColumnVector = {

// Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering
// MapData is represented as List of Struct in terms of cuDF.
withResource(input.getBase.countElements()) { collectionSize =>
if (legacySizeOfNull) {
withResource(Scalar.fromInt(-1)) { nullScalar =>
withResource(input.getBase.isNull) { inputIsNull =>
inputIsNull.ifElse(nullScalar, collectionSize)
}
}
} else {
collectionSize.incRefCount()
}
}
}
}

0 comments on commit ce02997

Please sign in to comment.