Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support GpuConcat on ArrayType #2379

Merged
merged 12 commits into from
May 20, 2021
28 changes: 28 additions & 0 deletions integration_tests/src/main/python/collection_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,38 @@
from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from pyspark.sql.types import *
from string_test import mk_str_gen
import pyspark.sql.functions as f

nested_gens = [ArrayGen(LongGen()),
StructGen([("a", LongGen())]),
MapGen(StringGen(pattern='key_[0-9]', nullable=False), StringGen())]
# additional test for NonNull Array because of https://github.com/rapidsai/cudf/pull/8181
non_nested_array_gens = [ArrayGen(sub_gen, nullable=nullable)
wjxiz1992 marked this conversation as resolved.
Show resolved Hide resolved
for nullable in [True, False] for sub_gen in all_gen + [null_gen]]

@pytest.mark.parametrize('data_gen', non_nested_array_gens, ids=idfn)
def test_concat_list(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen).selectExpr('concat(a, b)'))

assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark, data_gen, data_gen, data_gen
revans2 marked this conversation as resolved.
Show resolved Hide resolved
).selectExpr('concat(a, b, c)'))

def test_concat_string():
gen = mk_str_gen('.{0,5}')
(s1, s2) = gen_scalars(gen, 2, force_no_nulls=True)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, gen).select(
f.concat(f.col('a'), f.col('b')),
f.concat(f.col('a'), f.col('b'), f.col('a')),
f.concat(s1, f.col('b')),
f.concat(f.col('a'), s2),
f.concat(f.lit(None).cast('string'), f.col('b')),
f.concat(f.col('a'), f.lit(None).cast('string')),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have an all nulls test case? I guess this line might hit it if col a can generate nulls.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar for arrays, do we want an all nulls test case for arrays

Copy link
Collaborator Author

@sperlingxx sperlingxx May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be frank, I am not sure whether there exists rows consisting of all null fields among test data for concat_array , with default random seed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think the all null case may be not that special, since the output row will be null only if one input field is null.

f.concat(f.lit(''), f.col('b')),
f.concat(f.col('a'), f.lit(''))))

@pytest.mark.parametrize('data_gen', all_gen + nested_gens, ids=idfn)
@pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn)
Expand Down
16 changes: 0 additions & 16 deletions integration_tests/src/main/python/string_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,6 @@ def test_endswith():
f.col('a').endswith(None),
f.col('a').endswith('A\ud720')))

# We currently only support strings, but this should be extended to other types
# later on
def test_concat():
gen = mk_str_gen('.{0,5}')
(s1, s2) = gen_scalars(gen, 2, force_no_nulls=True)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, gen).select(
f.concat(f.col('a'), f.col('b')),
f.concat(f.col('a'), f.col('b'), f.col('a')),
f.concat(s1, f.col('b')),
f.concat(f.col('a'), s2),
f.concat(f.lit(None).cast('string'), f.col('b')),
f.concat(f.col('a'), f.lit(None).cast('string')),
f.concat(f.lit(''), f.col('b')),
f.concat(f.col('a'), f.lit(''))))

def test_substring():
gen = mk_str_gen('.{0,30}')
assert_gpu_and_cpu_are_equal_collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2427,10 +2427,13 @@ object GpuOverrides {
GpuEndsWith(lhs, rhs)
}),
expr[Concat](
"String concatenate NO separator",
ExprChecks.projectNotLambda(TypeSig.STRING,
"List/String concatenate",
ExprChecks.projectNotLambda((TypeSig.STRING + TypeSig.ARRAY).nested(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL),
jlowe marked this conversation as resolved.
Show resolved Hide resolved
(TypeSig.STRING + TypeSig.BINARY + TypeSig.ARRAY).nested(TypeSig.all),
repeatingParamCheck = Some(RepeatingParamCheck("input", TypeSig.STRING,
repeatingParamCheck = Some(RepeatingParamCheck("input",
(TypeSig.STRING + TypeSig.ARRAY).nested(
TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL),
(TypeSig.STRING + TypeSig.BINARY + TypeSig.ARRAY).nested(TypeSig.all)))),
(a, conf, p, r) => new ComplexTypeMergingExprMeta[Concat](a, conf, p, r) {
override def convertToGpu(child: Seq[Expression]): GpuExpression = GpuConcat(child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,75 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.ColumnVector
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ColumnView}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

case class GpuConcat(children: Seq[Expression]) extends GpuComplexTypeMergingExpression {

@transient override lazy val dataType: DataType = {
if (children.isEmpty) {
StringType
} else {
super.dataType
}
}

override def nullable: Boolean = children.exists(_.nullable)

override def columnarEval(batch: ColumnarBatch): Any = dataType match {
case StringType => stringConcat(batch)
case ArrayType(_, _) => listConcat(batch)
}
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved

private def stringConcat(batch: ColumnarBatch): GpuColumnVector = {
val rows = batch.numRows()

withResource(ArrayBuffer[ColumnVector]()) { buffer =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here and later: ArrayBuffer.empty[ColumnVector] for readability.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

withResource(GpuScalar.from(null, StringType)) { nullScalar =>
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
// build input buffer
children.foreach { child =>
child.columnarEval(batch) match {
case cv: GpuColumnVector =>
buffer += cv.getBase
case null =>
buffer += GpuColumnVector.from(nullScalar, rows, StringType).getBase
case sv: Any =>
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
val scalar = GpuScalar.from(sv.asInstanceOf[UTF8String].toString, StringType)
withResource(scalar) { scalar =>
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
buffer += GpuColumnVector.from(scalar, rows, StringType).getBase
}
}
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
}
// run string concatenate
withResource(GpuScalar.from("", StringType)) { emptyScalar =>
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
GpuColumnVector.from(ColumnVector.stringConcatenate(emptyScalar, nullScalar,
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
buffer.toArray[ColumnView]), StringType)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

private def listConcat(batch: ColumnarBatch): GpuColumnVector = {
withResource(ArrayBuffer[ColumnVector]()) { buffer =>
// build input buffer
children.foreach { child =>
child.columnarEval(batch) match {
case cv: GpuColumnVector => buffer += cv.getBase
case _ => throw new UnsupportedOperationException("Unsupported GpuScalar of List")
}
}
// run list concatenate
GpuColumnVector.from(ColumnVector.listConcatenateByRow(buffer: _*), dataType)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

case class GpuSize(child: Expression, legacySizeOfNull: Boolean)
extends GpuUnaryExpression {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,11 @@

package org.apache.spark.sql.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ColumnVector, ColumnView, DType, PadSide, Scalar, Table}
import ai.rapids.cudf.{ColumnVector, DType, PadSide, Scalar, Table}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, Predicate, StringSplit, SubstringIndex}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

abstract class GpuUnaryString2StringExpression extends GpuUnaryExpression with ExpectsInputTypes {
Expand Down Expand Up @@ -262,46 +258,6 @@ case class GpuStringTrimRight(column: Expression, trimParameters: Option[Express
GpuColumnVector.from(column.getBase.rstrip(t), dataType)
}

case class GpuConcat(children: Seq[Expression]) extends GpuComplexTypeMergingExpression {
override def dataType: DataType = StringType
override def nullable: Boolean = children.exists(_.nullable)

override def columnarEval(batch: ColumnarBatch): Any = {
var nullStrScalar: Scalar = null
var emptyStrScalar: Scalar = null
val rows = batch.numRows()
val childEvals: ArrayBuffer[Any] = new ArrayBuffer[Any](children.length)
val columns: ArrayBuffer[ColumnVector] = new ArrayBuffer[ColumnVector]()
try {
nullStrScalar = GpuScalar.from(null, StringType)
children.foreach(childEvals += _.columnarEval(batch))
childEvals.foreach {
case vector: GpuColumnVector =>
columns += vector.getBase
case col => if (col == null) {
columns += GpuColumnVector.from(nullStrScalar, rows, StringType).getBase
} else {
withResource(GpuScalar.from(col.asInstanceOf[UTF8String].toString, StringType)) {
stringScalar =>
columns += GpuColumnVector.from(stringScalar, rows, StringType).getBase
}
}
}
emptyStrScalar = GpuScalar.from("", StringType)
GpuColumnVector.from(ColumnVector.stringConcatenate(emptyStrScalar, nullStrScalar,
columns.toArray[ColumnView]), dataType)
} finally {
columns.safeClose()
if (emptyStrScalar != null) {
emptyStrScalar.close()
}
if (nullStrScalar != null) {
nullStrScalar.close()
}
}
}
}

case class GpuContains(left: Expression, right: Expression) extends GpuBinaryExpression
with Predicate with ImplicitCastInputTypes with NullIntolerant {

Expand Down