Skip to content

Commit

Permalink
Avoid issues where more scalars that expected show up in an expression (
Browse files Browse the repository at this point in the history
NVIDIA#1084)

Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored Nov 7, 2020
1 parent d57a638 commit 39e2313
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 13 deletions.
7 changes: 7 additions & 0 deletions integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ def test_coalesce(data_gen):
lambda spark : gen_df(spark, gen).select(
f.coalesce(*command_args)))

def test_coalesce_constant_output():
# Coalesce can allow a constant value as output. Technically Spark should mark this
# as foldable and turn it into a constant, but it does not, so make sure our code
# can deal with it. (This means something like + will get two constant scalar values)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.range(1, 100).selectExpr("4 + coalesce(5, id) as nine"))

@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_nvl2(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,28 @@ abstract class GpuUnaryExpression extends UnaryExpression with GpuExpression {

def outputTypeOverride: DType = null

private[this] def doItColumnar(input: GpuColumnVector): GpuColumnVector = {
withResource(doColumnar(input)) { vec =>
if (outputTypeOverride != null && outputTypeOverride != vec.getType) {
GpuColumnVector.from(vec.castTo(outputTypeOverride), dataType)
} else {
GpuColumnVector.from(vec.incRefCount(), dataType)
}
}
}

override def columnarEval(batch: ColumnarBatch): Any = {
val input = child.columnarEval(batch)
try {
input match {
case vec: GpuColumnVector =>
withResource(doColumnar(vec)) { base =>
if (outputTypeOverride != null && outputTypeOverride != base.getType) {
GpuColumnVector.from(base.castTo(outputTypeOverride), dataType)
} else {
GpuColumnVector.from(base.incRefCount(), dataType)
doItColumnar(vec)
case other =>
withResource(GpuScalar.from(other, child.dataType)) { s =>
withResource(GpuColumnVector.from(s, batch.numRows(), child.dataType)) { vec =>
doItColumnar(vec)
}
}
case _ => throw new IllegalStateException(
s"Unary expression $this should only see a column result from child eval")
}
} finally {
if (input.isInstanceOf[AutoCloseable]) {
Expand All @@ -145,6 +153,7 @@ trait GpuBinaryExpression extends BinaryExpression with GpuExpression {
def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector
def doColumnar(lhs: Scalar, rhs: GpuColumnVector): ColumnVector
def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector
def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector

override def columnarEval(batch: ColumnarBatch): Any = {
var lhs: Any = null
Expand All @@ -164,7 +173,12 @@ trait GpuBinaryExpression extends BinaryExpression with GpuExpression {
withResource(GpuScalar.from(r, right.dataType)) { scalar =>
GpuColumnVector.from(doColumnar(l, scalar), dataType)
}
case (l, r) if (l != null && r != null) => nullSafeEval(l, r)
case (l, r) if l != null && r != null =>
withResource(GpuScalar.from(l, left.dataType)) { leftScalar =>
withResource(GpuScalar.from(r, right.dataType)) { rightScalar =>
GpuColumnVector.from(doColumnar(batch.numRows(), leftScalar, rightScalar), dataType)
}
}
case _ => null
}
} finally {
Expand Down Expand Up @@ -211,6 +225,12 @@ trait CudfBinaryExpression extends GpuBinaryExpression {
val outType = outputType(lBase, rhs)
lBase.binaryOp(binaryOp, rhs, outType)
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

abstract class CudfBinaryOperator extends GpuBinaryOperator with CudfBinaryExpression
Expand Down Expand Up @@ -268,6 +288,7 @@ trait GpuTernaryExpression extends TernaryExpression with GpuExpression {
def doColumnar(val0: GpuColumnVector, val1: Scalar, val2: GpuColumnVector): ColumnVector
def doColumnar(val0: GpuColumnVector, val1: Scalar, val2: Scalar): ColumnVector
def doColumnar(val0: GpuColumnVector, val1: GpuColumnVector, val2: Scalar): ColumnVector
def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector

override def columnarEval(batch: ColumnarBatch): Any = {
var val0: Any = null
Expand Down Expand Up @@ -329,8 +350,15 @@ trait GpuTernaryExpression extends TernaryExpression with GpuExpression {
scalar1.close()
scalar2.close()
}
case (v0, v1, v2) if (v0 != null && v1 != null && v2 != null) =>
nullSafeEval(v0, v1, v2)
case (v0, v1, v2) if v0 != null && v1 != null && v2 != null =>
withResource(GpuScalar.from(v0, children(0).dataType)) { v0Scalar =>
withResource(GpuScalar.from(v1, children(1).dataType)) { v1Scalar =>
withResource(GpuScalar.from(v2, children(2).dataType)) { v2Scalar =>
GpuColumnVector.from(doColumnar(batch.numRows(), v0Scalar, v1Scalar, v2Scalar),
dataType)
}
}
}
case _ => null
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ case class GpuNaNvl(left: Expression, right: Expression) extends GpuBinaryExpres
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def dataType: DataType = left.dataType

// Access to AbstractDataType is not allowed, and not really needed here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ trait GpuShiftBase extends GpuBinaryExpression with ImplicitCastInputTypes {
lBase.binaryOp(shiftOp, distance, lBase.getType)
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

case class GpuShiftLeft(left: Expression, right: Expression) extends GpuShiftBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ case class GpuGetArrayItem(child: Expression, ordinal: Expression)
}
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

class GpuGetMapValueMeta(
Expand Down Expand Up @@ -139,6 +145,11 @@ case class GpuGetMapValue(child: Expression, key: Expression)
override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector =
lhs.getBase.getMapValue(rhs)

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def doColumnar(lhs: Scalar, rhs: GpuColumnVector): ColumnVector =
throw new IllegalStateException("This is not supported yet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ case class GpuDateDiff(endDate: Expression, startDate: Expression)
}
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

case class GpuQuarter(child: Expression) extends GpuDateUnaryExpression {
Expand Down Expand Up @@ -328,6 +334,7 @@ abstract class GpuToTimestamp
throw new IllegalArgumentException("lhs has to be a vector and rhs has to be a scalar for " +
"the unixtimestamp to work")
}

override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = {
val tmp = if (lhs.dataType == StringType) {
// rhs is ignored we already parsed the format
Expand All @@ -344,6 +351,12 @@ abstract class GpuToTimestamp
}
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

/**
Expand Down Expand Up @@ -462,6 +475,12 @@ case class GpuFromUnixTime(
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
copy(timeZoneId = Option(timeZoneId))
}
Expand Down Expand Up @@ -512,6 +531,12 @@ trait GpuDateMathBase extends GpuBinaryExpression with ExpectsInputTypes {
}
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

case class GpuDateSub(startDate: Expression, days: Expression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ case class GpuStringLocate(substr: Expression, col: Expression, start: Expressio
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val1, numRows, col.dataType)) { val1Col =>
doColumnar(val0, val1Col, val2)
}
}

override def doColumnar(
val0: GpuColumnVector,
val1: Scalar,
Expand All @@ -147,7 +153,7 @@ case class GpuStartsWith(left: Expression, right: Expression)

override def sql: String = {
val inputSQL = left.sql
val listSQL = right.sql.toString
val listSQL = right.sql
s"($inputSQL STARTSWITH ($listSQL))"
}

Expand All @@ -156,6 +162,12 @@ case class GpuStartsWith(left: Expression, right: Expression)
def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector =
lhs.getBase.startsWith(rhs)

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector =
throw new IllegalStateException(
"Really should not be here, cannot have two column vectors as input in StartsWith")
Expand All @@ -172,7 +184,7 @@ case class GpuEndsWith(left: Expression, right: Expression)

override def sql: String = {
val inputSQL = left.sql
val listSQL = right.sql.toString
val listSQL = right.sql
s"($inputSQL ENDSWITH ($listSQL))"
}

Expand All @@ -181,6 +193,12 @@ case class GpuEndsWith(left: Expression, right: Expression)
def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector =
lhs.getBase.endsWith(rhs)

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector =
throw new IllegalStateException(
"Really should not be here, cannot have two column vectors as input in EndsWith")
Expand Down Expand Up @@ -300,6 +318,12 @@ case class GpuContains(left: Expression, right: Expression) extends GpuBinaryExp
def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector =
lhs.getBase.stringContains(rhs)

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector =
throw new IllegalStateException("Really should not be here, " +
"Cannot have two column vectors as input in Contains")
Expand Down Expand Up @@ -365,6 +389,12 @@ case class GpuSubstring(str: Expression, pos: Expression, len: Expression)
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
val0: GpuColumnVector,
val1: GpuColumnVector,
Expand Down Expand Up @@ -442,6 +472,12 @@ case class GpuStringReplace(
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, srcExpr.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
strExpr: GpuColumnVector,
searchExpr: GpuColumnVector,
Expand Down Expand Up @@ -490,6 +526,12 @@ case class GpuLike(left: Expression, right: Expression, escapeChar: Char)
lhs.getBase.matchesRe(regexStr)
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType)

override def dataType: DataType = BooleanType
Expand Down Expand Up @@ -628,6 +670,12 @@ case class GpuSubstringIndex(strExpr: Expression,
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, strExpr.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
str: GpuColumnVector,
delim: GpuColumnVector,
Expand Down Expand Up @@ -694,6 +742,12 @@ trait BasePad extends GpuTernaryExpression with ImplicitCastInputTypes with Null
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
str: GpuColumnVector,
len: GpuColumnVector,
Expand Down Expand Up @@ -800,6 +854,12 @@ case class GpuStringSplit(str: Expression, regex: Expression, limit: Expression)
str.getBase.stringSplitRecord(regex, intLimit)
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
str: GpuColumnVector,
regex: GpuColumnVector,
Expand Down Expand Up @@ -835,4 +895,4 @@ case class GpuStringSplit(str: Expression, regex: Expression, limit: Expression)
regex: GpuColumnVector,
limit: Scalar): ColumnVector =
throw new IllegalStateException("This is not supported yet")
}
}

0 comments on commit 39e2313

Please sign in to comment.