Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support GpuConcat on ArrayType #2379

Merged
merged 12 commits into from
May 20, 2021
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Ceil"></a>spark.rapids.sql.expression.Ceil|`ceiling`, `ceil`|Ceiling of a number|true|None|
<a name="sql.expression.CheckOverflow"></a>spark.rapids.sql.expression.CheckOverflow| |CheckOverflow after arithmetic operations between DecimalType data|true|None|
<a name="sql.expression.Coalesce"></a>spark.rapids.sql.expression.Coalesce|`coalesce`|Returns the first non-null argument if exists. Otherwise, null|true|None|
<a name="sql.expression.Concat"></a>spark.rapids.sql.expression.Concat|`concat`|String concatenate NO separator|true|None|
<a name="sql.expression.Concat"></a>spark.rapids.sql.expression.Concat|`concat`|List/String concatenate|true|None|
<a name="sql.expression.Contains"></a>spark.rapids.sql.expression.Contains| |Contains|true|None|
<a name="sql.expression.Cos"></a>spark.rapids.sql.expression.Cos|`cos`|Cosine|true|None|
<a name="sql.expression.Cosh"></a>spark.rapids.sql.expression.Cosh|`cosh`|Hyperbolic cosine|true|None|
Expand Down
6 changes: 3 additions & 3 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -3463,7 +3463,7 @@ Accelerator support is described below.
<tr>
<td rowSpan="4">Concat</td>
<td rowSpan="4">`concat`</td>
<td rowSpan="4">String concatenate NO separator</td>
<td rowSpan="4">List/String concatenate</td>
<td rowSpan="4">None</td>
<td rowSpan="2">project</td>
<td>input</td>
Expand All @@ -3481,7 +3481,7 @@ Accelerator support is described below.
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -3502,7 +3502,7 @@ Accelerator support is described below.
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><em>PS* (missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT)</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
38 changes: 38 additions & 0 deletions integration_tests/src/main/python/collection_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,48 @@
from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from pyspark.sql.types import *
from string_test import mk_str_gen
import pyspark.sql.functions as f

nested_gens = [ArrayGen(LongGen()),
StructGen([("a", LongGen())]),
MapGen(StringGen(pattern='key_[0-9]', nullable=False), StringGen())]
# additional test for NonNull Array because of https://github.com/rapidsai/cudf/pull/8181
non_nested_array_gens = [ArrayGen(sub_gen, nullable=nullable)
wjxiz1992 marked this conversation as resolved.
Show resolved Hide resolved
for nullable in [True, False] for sub_gen in all_gen + [null_gen]]

@pytest.mark.parametrize('data_gen', non_nested_array_gens, ids=idfn)
def test_concat_list(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).selectExpr('concat(a)'))

assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).selectExpr('concat(a, b)'))

assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark, data_gen, data_gen, data_gen
revans2 marked this conversation as resolved.
Show resolved Hide resolved
).selectExpr('concat(a, b, c)'))

def test_empty_concat_list():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, ArrayGen(LongGen())).selectExpr('concat()'))

def test_concat_string():
gen = mk_str_gen('.{0,5}')
(s1, s2) = gen_scalars(gen, 2, force_no_nulls=True)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, gen).select(
f.concat(),
f.concat(f.col('a')),
f.concat(s1),
f.concat(f.col('a'), f.col('b')),
f.concat(f.col('a'), f.col('b'), f.col('a')),
f.concat(s1, f.col('b')),
f.concat(f.col('a'), s2),
f.concat(f.lit(None).cast('string'), f.col('b')),
f.concat(f.col('a'), f.lit(None).cast('string')),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have an all nulls test case? I guess this line might hit it if col a can generate nulls.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar for arrays, do we want an all nulls test case for arrays

Copy link
Collaborator Author

@sperlingxx sperlingxx May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be frank, I am not sure whether there exists rows consisting of all null fields among test data for concat_array , with default random seed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think the all null case may be not that special, since the output row will be null only if one input field is null.

f.concat(f.lit(''), f.col('b')),
f.concat(f.col('a'), f.lit(''))))

@pytest.mark.parametrize('data_gen', all_gen + nested_gens, ids=idfn)
@pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn)
Expand Down
16 changes: 0 additions & 16 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,6 @@ def test_endswith():
f.col('a').endswith(None),
f.col('a').endswith('A\ud720')))

# We currently only support strings, but this should be extended to other types
# later on
def test_concat():
gen = mk_str_gen('.{0,5}')
(s1, s2) = gen_scalars(gen, 2, force_no_nulls=True)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, gen).select(
f.concat(f.col('a'), f.col('b')),
f.concat(f.col('a'), f.col('b'), f.col('a')),
f.concat(s1, f.col('b')),
f.concat(f.col('a'), s2),
f.concat(f.lit(None).cast('string'), f.col('b')),
f.concat(f.col('a'), f.lit(None).cast('string')),
f.concat(f.lit(''), f.col('b')),
f.concat(f.col('a'), f.lit(''))))

def test_substring():
gen = mk_str_gen('.{0,30}')
assert_gpu_and_cpu_are_equal_collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2473,10 +2473,13 @@ object GpuOverrides {
GpuEndsWith(lhs, rhs)
}),
expr[Concat](
"String concatenate NO separator",
ExprChecks.projectNotLambda(TypeSig.STRING,
"List/String concatenate",
ExprChecks.projectNotLambda((TypeSig.STRING + TypeSig.ARRAY).nested(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL),
jlowe marked this conversation as resolved.
Show resolved Hide resolved
(TypeSig.STRING + TypeSig.BINARY + TypeSig.ARRAY).nested(TypeSig.all),
repeatingParamCheck = Some(RepeatingParamCheck("input", TypeSig.STRING,
repeatingParamCheck = Some(RepeatingParamCheck("input",
(TypeSig.STRING + TypeSig.ARRAY).nested(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL),
(TypeSig.STRING + TypeSig.BINARY + TypeSig.ARRAY).nested(TypeSig.all)))),
(a, conf, p, r) => new ComplexTypeMergingExprMeta[Concat](a, conf, p, r) {
override def convertToGpu(child: Seq[Expression]): GpuExpression = GpuConcat(child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,63 @@

package org.apache.spark.sql.rapids

import ai.rapids.cudf.{ColumnVector, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuScalar, GpuUnaryExpression}
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ColumnView, Scalar}
import com.nvidia.spark.rapids.{GpuBinaryExpression, GpuColumnVector, GpuComplexTypeMergingExpression, GpuExpressionsUtils, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

case class GpuSize(child: Expression, legacySizeOfNull: Boolean)
extends GpuUnaryExpression {
case class GpuConcat(children: Seq[Expression]) extends GpuComplexTypeMergingExpression {

require(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType],
s"The size function doesn't support the operand type ${child.dataType}")
@transient override lazy val dataType: DataType = {
if (children.isEmpty) {
StringType
} else {
super.dataType
}
}

override def dataType: DataType = IntegerType
override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable
override def nullable: Boolean = children.exists(_.nullable)

override protected def doColumnar(input: GpuColumnVector): ColumnVector = {
override def columnarEval(batch: ColumnarBatch): Any = dataType match {
// Explicitly return null for empty concat as Spark, since cuDF doesn't support empty concat.
case dt if children.isEmpty => GpuScalar.from(null, dt)
// For single column concat, we pass the result of child node to avoid extra cuDF call.
case _ if children.length == 1 => children.head.columnarEval(batch)
case StringType => stringConcat(batch)
case ArrayType(_, _) => listConcat(batch)
case _ => throw new IllegalArgumentException(s"unsupported dataType $dataType")
}

// Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering
// MapData is represented as List of Struct in terms of cuDF.
withResource(input.getBase.countElements()) { collectionSize =>
if (legacySizeOfNull) {
withResource(Scalar.fromInt(-1)) { nullScalar =>
withResource(input.getBase.isNull) { inputIsNull =>
inputIsNull.ifElse(nullScalar, collectionSize)
}
private def stringConcat(batch: ColumnarBatch): GpuColumnVector = {
withResource(ArrayBuffer.empty[ColumnVector]) { buffer =>
// build input buffer
children.foreach {
buffer += GpuExpressionsUtils.columnarEvalToColumn(_, batch).getBase
}
// run string concatenate
GpuColumnVector.from(
ColumnVector.stringConcatenate(buffer.toArray[ColumnView]), StringType)
}
}

private def listConcat(batch: ColumnarBatch): GpuColumnVector = {
withResource(ArrayBuffer[ColumnVector]()) { buffer =>
// build input buffer
children.foreach { child =>
child.columnarEval(batch) match {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
case cv: GpuColumnVector => buffer += cv.getBase
case _ => throw new UnsupportedOperationException("Unsupported GpuScalar of List")
}
} else {
collectionSize.incRefCount()
}
// run list concatenate
GpuColumnVector.from(ColumnVector.listConcatenateByRow(buffer: _*), dataType)
}
}
}
Expand Down Expand Up @@ -164,3 +190,30 @@ case class GpuElementAt(left: Expression, right: Expression, failOnError: Boolea

override def prettyName: String = "element_at"
}

case class GpuSize(child: Expression, legacySizeOfNull: Boolean)
extends GpuUnaryExpression {

require(child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType],
s"The size function doesn't support the operand type ${child.dataType}")

override def dataType: DataType = IntegerType
override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable

override protected def doColumnar(input: GpuColumnVector): ColumnVector = {

// Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering
// MapData is represented as List of Struct in terms of cuDF.
withResource(input.getBase.countElements()) { collectionSize =>
if (legacySizeOfNull) {
withResource(Scalar.fromInt(-1)) { nullScalar =>
withResource(input.getBase.isNull) { inputIsNull =>
inputIsNull.ifElse(nullScalar, collectionSize)
}
}
} else {
collectionSize.incRefCount()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@

package org.apache.spark.sql.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ColumnView, DType, PadSide, Scalar, Table}
import ai.rapids.cudf.{ColumnVector, DType, PadSide, Scalar, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, Predicate, StringSplit, SubstringIndex}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

abstract class GpuUnaryString2StringExpression extends GpuUnaryExpression with ExpectsInputTypes {
Expand Down Expand Up @@ -267,36 +263,6 @@ case class GpuStringTrimRight(column: Expression, trimParameters: Option[Express
GpuColumnVector.from(column.getBase.rstrip(t), dataType)
}

case class GpuConcat(children: Seq[Expression]) extends GpuComplexTypeMergingExpression {
override def dataType: DataType = StringType
override def nullable: Boolean = children.exists(_.nullable)

override def columnarEval(batch: ColumnarBatch): Any = {
var nullStrScalar: Scalar = null
var emptyStrScalar: Scalar = null
val columns: ArrayBuffer[ColumnVector] = new ArrayBuffer[ColumnVector](children.size)
try {
children.foreach { childExpr =>
withResource(GpuExpressionsUtils.columnarEvalToColumn(childExpr, batch)) {
gcv => columns += gcv.getBase.incRefCount()
}
}
emptyStrScalar = Scalar.fromString("")
nullStrScalar = Scalar.fromNull(DType.STRING)
GpuColumnVector.from(ColumnVector.stringConcatenate(emptyStrScalar, nullStrScalar,
columns.toArray[ColumnView]), dataType)
} finally {
columns.safeClose()
if (emptyStrScalar != null) {
emptyStrScalar.close()
}
if (nullStrScalar != null) {
nullStrScalar.close()
}
}
}
}

case class GpuContains(left: Expression, right: Expression) extends GpuBinaryExpression
with Predicate with ImplicitCastInputTypes with NullIntolerant {

Expand Down