Skip to content

Commit

Permalink
support GpuConcat on ArrayType (NVIDIA#2379)
Browse files Browse the repository at this point in the history
Closes NVIDIA#2013

Support GpuConcat on ArrayType. And introduce some refinement for GpuConcat on StringType.

Signed-off-by: sperlingxx <[email protected]>
  • Loading branch information
sperlingxx authored and abellina committed May 21, 2021
1 parent 184dd1a commit cdbd6bb
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 78 deletions.
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Ceil"></a>spark.rapids.sql.expression.Ceil|`ceiling`, `ceil`|Ceiling of a number|true|None|
<a name="sql.expression.CheckOverflow"></a>spark.rapids.sql.expression.CheckOverflow| |CheckOverflow after arithmetic operations between DecimalType data|true|None|
<a name="sql.expression.Coalesce"></a>spark.rapids.sql.expression.Coalesce|`coalesce`|Returns the first non-null argument if exists. Otherwise, null|true|None|
<a name="sql.expression.Concat"></a>spark.rapids.sql.expression.Concat|`concat`|String concatenate NO separator|true|None|
<a name="sql.expression.Concat"></a>spark.rapids.sql.expression.Concat|`concat`|List/String concatenate|true|None|
<a name="sql.expression.Contains"></a>spark.rapids.sql.expression.Contains| |Contains|true|None|
<a name="sql.expression.Cos"></a>spark.rapids.sql.expression.Cos|`cos`|Cosine|true|None|
<a name="sql.expression.Cosh"></a>spark.rapids.sql.expression.Cosh|`cosh`|Hyperbolic cosine|true|None|
Expand Down
6 changes: 3 additions & 3 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -3463,7 +3463,7 @@ Accelerator support is described below.
<tr>
<td rowSpan="4">Concat</td>
<td rowSpan="4">`concat`</td>
<td rowSpan="4">String concatenate NO separator</td>
<td rowSpan="4">List/String concatenate</td>
<td rowSpan="4">None</td>
<td rowSpan="2">project</td>
<td>input</td>
Expand All @@ -3481,7 +3481,7 @@ Accelerator support is described below.
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -3502,7 +3502,7 @@ Accelerator support is described below.
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
59 changes: 59 additions & 0 deletions integration_tests/src/main/python/collection_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,69 @@
from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from pyspark.sql.types import *
from string_test import mk_str_gen
import pyspark.sql.functions as f

nested_gens = [ArrayGen(LongGen()),
StructGen([("a", LongGen())]),
MapGen(StringGen(pattern='key_[0-9]', nullable=False), StringGen())]
# additional test for NonNull Array because of https://github.com/rapidsai/cudf/pull/8181
non_nested_array_gens = [ArrayGen(sub_gen, nullable=nullable)
for nullable in [True, False] for sub_gen in all_gen + [null_gen]]

@pytest.mark.parametrize('data_gen', non_nested_array_gens, ids=idfn)
def test_concat_list(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).selectExpr('concat(a)'))

assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).selectExpr('concat(a, b)'))

assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark, data_gen, data_gen, data_gen
).selectExpr('concat(a, b, c)'))

def test_empty_concat_list():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, ArrayGen(LongGen())).selectExpr('concat()'))

@pytest.mark.parametrize('data_gen', non_nested_array_gens, ids=idfn)
def test_concat_list_with_lit(data_gen):
array_lit = gen_scalar(data_gen)
array_lit2 = gen_scalar(data_gen)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).select(
f.concat(f.col('a'),
f.col('b'),
f.lit(array_lit).cast(data_gen.data_type))))

assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).select(
f.concat(f.lit(array_lit).cast(data_gen.data_type),
f.col('a'),
f.lit(array_lit2).cast(data_gen.data_type))))

assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).select(
f.concat(f.lit(array_lit).cast(data_gen.data_type),
f.lit(array_lit2).cast(data_gen.data_type))))

def test_concat_string():
gen = mk_str_gen('.{0,5}')
(s1, s2) = gen_scalars(gen, 2, force_no_nulls=True)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, gen).select(
f.concat(),
f.concat(f.col('a')),
f.concat(s1),
f.concat(f.col('a'), f.col('b')),
f.concat(f.col('a'), f.col('b'), f.col('a')),
f.concat(s1, f.col('b')),
f.concat(f.col('a'), s2),
f.concat(f.lit(None).cast('string'), f.col('b')),
f.concat(f.col('a'), f.lit(None).cast('string')),
f.concat(f.lit(''), f.col('b')),
f.concat(f.col('a'), f.lit(''))))

@pytest.mark.parametrize('data_gen', all_gen + nested_gens, ids=idfn)
@pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn)
Expand Down
16 changes: 0 additions & 16 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,6 @@ def test_endswith():
f.col('a').endswith(None),
f.col('a').endswith('A\ud720')))

# We currently only support strings, but this should be extended to other types
# later on
def test_concat():
gen = mk_str_gen('.{0,5}')
(s1, s2) = gen_scalars(gen, 2, force_no_nulls=True)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, gen).select(
f.concat(f.col('a'), f.col('b')),
f.concat(f.col('a'), f.col('b'), f.col('a')),
f.concat(s1, f.col('b')),
f.concat(f.col('a'), s2),
f.concat(f.lit(None).cast('string'), f.col('b')),
f.concat(f.col('a'), f.lit(None).cast('string')),
f.concat(f.lit(''), f.col('b')),
f.concat(f.col('a'), f.lit(''))))

def test_substring():
gen = mk_str_gen('.{0,30}')
assert_gpu_and_cpu_are_equal_collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2500,10 +2500,13 @@ object GpuOverrides {
GpuEndsWith(lhs, rhs)
}),
expr[Concat](
"String concatenate NO separator",
ExprChecks.projectNotLambda(TypeSig.STRING,
"List/String concatenate",
ExprChecks.projectNotLambda((TypeSig.STRING + TypeSig.ARRAY).nested(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL),
(TypeSig.STRING + TypeSig.BINARY + TypeSig.ARRAY).nested(TypeSig.all),
repeatingParamCheck = Some(RepeatingParamCheck("input", TypeSig.STRING,
repeatingParamCheck = Some(RepeatingParamCheck("input",
(TypeSig.STRING + TypeSig.ARRAY).nested(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL),
(TypeSig.STRING + TypeSig.BINARY + TypeSig.ARRAY).nested(TypeSig.all)))),
(a, conf, p, r) => new ComplexTypeMergingExprMeta[Concat](a, conf, p, r) {
override def convertToGpu(child: Seq[Expression]): GpuExpression = GpuConcat(child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,61 @@

package org.apache.spark.sql.rapids

import ai.rapids.cudf.{ColumnVector, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuScalar, GpuUnaryExpression}
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ColumnView, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuExpressionsUtils, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.GpuExpressionsUtils.columnarEvalToColumn
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

case class GpuSize(child: Expression, legacySizeOfNull: Boolean)
extends GpuUnaryExpression {
case class GpuConcat(children: Seq[Expression]) extends GpuComplexTypeMergingExpression {

require(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType],
s"The size function doesn't support the operand type ${child.dataType}")
@transient override lazy val dataType: DataType = {
if (children.isEmpty) {
StringType
} else {
super.dataType
}
}

override def dataType: DataType = IntegerType
override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable
override def nullable: Boolean = children.exists(_.nullable)

override protected def doColumnar(input: GpuColumnVector): ColumnVector = {
override def columnarEval(batch: ColumnarBatch): Any = dataType match {
// Explicitly return null for empty concat as Spark, since cuDF doesn't support empty concat.
case dt if children.isEmpty => GpuScalar.from(null, dt)
// For single column concat, we pass the result of child node to avoid extra cuDF call.
case _ if children.length == 1 => children.head.columnarEval(batch)
case StringType => stringConcat(batch)
case ArrayType(_, _) => listConcat(batch)
case _ => throw new IllegalArgumentException(s"unsupported dataType $dataType")
}

// Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering
// MapData is represented as List of Struct in terms of cuDF.
withResource(input.getBase.countElements()) { collectionSize =>
if (legacySizeOfNull) {
withResource(Scalar.fromInt(-1)) { nullScalar =>
withResource(input.getBase.isNull) { inputIsNull =>
inputIsNull.ifElse(nullScalar, collectionSize)
}
}
} else {
collectionSize.incRefCount()
private def stringConcat(batch: ColumnarBatch): GpuColumnVector = {
withResource(ArrayBuffer.empty[ColumnVector]) { buffer =>
// build input buffer
children.foreach {
buffer += columnarEvalToColumn(_, batch).getBase
}
// run string concatenate
GpuColumnVector.from(
ColumnVector.stringConcatenate(buffer.toArray[ColumnView]), StringType)
}
}

private def listConcat(batch: ColumnarBatch): GpuColumnVector = {
withResource(ArrayBuffer[ColumnVector]()) { buffer =>
// build input buffer
children.foreach {
buffer += columnarEvalToColumn(_, batch).getBase
}
// run list concatenate
GpuColumnVector.from(ColumnVector.listConcatenateByRow(buffer: _*), dataType)
}
}
}
Expand Down Expand Up @@ -164,3 +188,30 @@ case class GpuElementAt(left: Expression, right: Expression, failOnError: Boolea

override def prettyName: String = "element_at"
}

case class GpuSize(child: Expression, legacySizeOfNull: Boolean)
extends GpuUnaryExpression {

require(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType],
s"The size function doesn't support the operand type ${child.dataType}")

override def dataType: DataType = IntegerType
override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable

override protected def doColumnar(input: GpuColumnVector): ColumnVector = {

// Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering
// MapData is represented as List of Struct in terms of cuDF.
withResource(input.getBase.countElements()) { collectionSize =>
if (legacySizeOfNull) {
withResource(Scalar.fromInt(-1)) { nullScalar =>
withResource(input.getBase.isNull) { inputIsNull =>
inputIsNull.ifElse(nullScalar, collectionSize)
}
}
} else {
collectionSize.incRefCount()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@

package org.apache.spark.sql.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ColumnView, DType, PadSide, Scalar, Table}
import ai.rapids.cudf.{ColumnVector, DType, PadSide, Scalar, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, Predicate, StringSplit, SubstringIndex}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

abstract class GpuUnaryString2StringExpression extends GpuUnaryExpression with ExpectsInputTypes {
Expand Down Expand Up @@ -267,36 +263,6 @@ case class GpuStringTrimRight(column: Expression, trimParameters: Option[Express
GpuColumnVector.from(column.getBase.rstrip(t), dataType)
}

case class GpuConcat(children: Seq[Expression]) extends GpuComplexTypeMergingExpression {
override def dataType: DataType = StringType
override def nullable: Boolean = children.exists(_.nullable)

override def columnarEval(batch: ColumnarBatch): Any = {
var nullStrScalar: Scalar = null
var emptyStrScalar: Scalar = null
val columns: ArrayBuffer[ColumnVector] = new ArrayBuffer[ColumnVector](children.size)
try {
children.foreach { childExpr =>
withResource(GpuExpressionsUtils.columnarEvalToColumn(childExpr, batch)) {
gcv => columns += gcv.getBase.incRefCount()
}
}
emptyStrScalar = Scalar.fromString("")
nullStrScalar = Scalar.fromNull(DType.STRING)
GpuColumnVector.from(ColumnVector.stringConcatenate(emptyStrScalar, nullStrScalar,
columns.toArray[ColumnView]), dataType)
} finally {
columns.safeClose()
if (emptyStrScalar != null) {
emptyStrScalar.close()
}
if (nullStrScalar != null) {
nullStrScalar.close()
}
}
}
}

case class GpuContains(left: Expression, right: Expression) extends GpuBinaryExpression
with Predicate with ImplicitCastInputTypes with NullIntolerant {

Expand Down

0 comments on commit cdbd6bb

Please sign in to comment.