Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid issues where more scalars that expected show up in an expression #1084

Merged
merged 1 commit into from
Nov 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions integration_tests/src/main/python/conditionals_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ def test_coalesce(data_gen):
lambda spark : gen_df(spark, gen).select(
f.coalesce(*command_args)))

def test_coalesce_constant_output():
# Coalesce can allow a constant value as output. Technically Spark should mark this
# as foldable and turn it into a constant, but it does not, so make sure our code
# can deal with it. (This means something like + will get two constant scalar values)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.range(1, 100).selectExpr("4 + coalesce(5, id) as nine"))

@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_nvl2(data_gen):
(s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,28 @@ abstract class GpuUnaryExpression extends UnaryExpression with GpuExpression {

def outputTypeOverride: DType = null

private[this] def doItColumnar(input: GpuColumnVector): GpuColumnVector = {
withResource(doColumnar(input)) { vec =>
if (outputTypeOverride != null && outputTypeOverride != vec.getType) {
GpuColumnVector.from(vec.castTo(outputTypeOverride), dataType)
} else {
GpuColumnVector.from(vec.incRefCount(), dataType)
}
}
}

override def columnarEval(batch: ColumnarBatch): Any = {
val input = child.columnarEval(batch)
try {
input match {
case vec: GpuColumnVector =>
withResource(doColumnar(vec)) { base =>
if (outputTypeOverride != null && outputTypeOverride != base.getType) {
GpuColumnVector.from(base.castTo(outputTypeOverride), dataType)
} else {
GpuColumnVector.from(base.incRefCount(), dataType)
doItColumnar(vec)
case other =>
withResource(GpuScalar.from(other, child.dataType)) { s =>
withResource(GpuColumnVector.from(s, batch.numRows(), child.dataType)) { vec =>
doItColumnar(vec)
}
}
case _ => throw new IllegalStateException(
s"Unary expression $this should only see a column result from child eval")
}
} finally {
if (input.isInstanceOf[AutoCloseable]) {
Expand All @@ -145,6 +153,7 @@ trait GpuBinaryExpression extends BinaryExpression with GpuExpression {
def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector
def doColumnar(lhs: Scalar, rhs: GpuColumnVector): ColumnVector
def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector
def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector

override def columnarEval(batch: ColumnarBatch): Any = {
var lhs: Any = null
Expand All @@ -164,7 +173,12 @@ trait GpuBinaryExpression extends BinaryExpression with GpuExpression {
withResource(GpuScalar.from(r, right.dataType)) { scalar =>
GpuColumnVector.from(doColumnar(l, scalar), dataType)
}
case (l, r) if (l != null && r != null) => nullSafeEval(l, r)
case (l, r) if l != null && r != null =>
withResource(GpuScalar.from(l, left.dataType)) { leftScalar =>
withResource(GpuScalar.from(r, right.dataType)) { rightScalar =>
GpuColumnVector.from(doColumnar(batch.numRows(), leftScalar, rightScalar), dataType)
}
}
case _ => null
}
} finally {
Expand Down Expand Up @@ -211,6 +225,12 @@ trait CudfBinaryExpression extends GpuBinaryExpression {
val outType = outputType(lBase, rhs)
lBase.binaryOp(binaryOp, rhs, outType)
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

abstract class CudfBinaryOperator extends GpuBinaryOperator with CudfBinaryExpression
Expand Down Expand Up @@ -268,6 +288,7 @@ trait GpuTernaryExpression extends TernaryExpression with GpuExpression {
def doColumnar(val0: GpuColumnVector, val1: Scalar, val2: GpuColumnVector): ColumnVector
def doColumnar(val0: GpuColumnVector, val1: Scalar, val2: Scalar): ColumnVector
def doColumnar(val0: GpuColumnVector, val1: GpuColumnVector, val2: Scalar): ColumnVector
def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector

override def columnarEval(batch: ColumnarBatch): Any = {
var val0: Any = null
Expand Down Expand Up @@ -329,8 +350,15 @@ trait GpuTernaryExpression extends TernaryExpression with GpuExpression {
scalar1.close()
scalar2.close()
}
case (v0, v1, v2) if (v0 != null && v1 != null && v2 != null) =>
nullSafeEval(v0, v1, v2)
case (v0, v1, v2) if v0 != null && v1 != null && v2 != null =>
withResource(GpuScalar.from(v0, children(0).dataType)) { v0Scalar =>
withResource(GpuScalar.from(v1, children(1).dataType)) { v1Scalar =>
withResource(GpuScalar.from(v2, children(2).dataType)) { v2Scalar =>
GpuColumnVector.from(doColumnar(batch.numRows(), v0Scalar, v1Scalar, v2Scalar),
dataType)
}
}
}
case _ => null
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,12 @@ case class GpuNaNvl(left: Expression, right: Expression) extends GpuBinaryExpres
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def dataType: DataType = left.dataType

// Access to AbstractDataType is not allowed, and not really needed here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ trait GpuShiftBase extends GpuBinaryExpression with ImplicitCastInputTypes {
lBase.binaryOp(shiftOp, distance, lBase.getType)
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

case class GpuShiftLeft(left: Expression, right: Expression) extends GpuShiftBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ case class GpuGetArrayItem(child: Expression, ordinal: Expression)
}
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

class GpuGetMapValueMeta(
Expand Down Expand Up @@ -139,6 +145,11 @@ case class GpuGetMapValue(child: Expression, key: Expression)
override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector =
lhs.getBase.getMapValue(rhs)

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def doColumnar(lhs: Scalar, rhs: GpuColumnVector): ColumnVector =
throw new IllegalStateException("This is not supported yet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ case class GpuDateDiff(endDate: Expression, startDate: Expression)
}
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

case class GpuQuarter(child: Expression) extends GpuDateUnaryExpression {
Expand Down Expand Up @@ -328,6 +334,7 @@ abstract class GpuToTimestamp
throw new IllegalArgumentException("lhs has to be a vector and rhs has to be a scalar for " +
"the unixtimestamp to work")
}

override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = {
val tmp = if (lhs.dataType == StringType) {
// rhs is ignored we already parsed the format
Expand All @@ -344,6 +351,12 @@ abstract class GpuToTimestamp
}
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

/**
Expand Down Expand Up @@ -462,6 +475,12 @@ case class GpuFromUnixTime(
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
copy(timeZoneId = Option(timeZoneId))
}
Expand Down Expand Up @@ -512,6 +531,12 @@ trait GpuDateMathBase extends GpuBinaryExpression with ExpectsInputTypes {
}
}
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}
}

case class GpuDateSub(startDate: Expression, days: Expression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ case class GpuStringLocate(substr: Expression, col: Expression, start: Expressio
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val1, numRows, col.dataType)) { val1Col =>
doColumnar(val0, val1Col, val2)
}
}

override def doColumnar(
val0: GpuColumnVector,
val1: Scalar,
Expand All @@ -147,7 +153,7 @@ case class GpuStartsWith(left: Expression, right: Expression)

override def sql: String = {
val inputSQL = left.sql
val listSQL = right.sql.toString
val listSQL = right.sql
s"($inputSQL STARTSWITH ($listSQL))"
}

Expand All @@ -156,6 +162,12 @@ case class GpuStartsWith(left: Expression, right: Expression)
def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector =
lhs.getBase.startsWith(rhs)

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector =
throw new IllegalStateException(
"Really should not be here, cannot have two column vectors as input in StartsWith")
Expand All @@ -172,7 +184,7 @@ case class GpuEndsWith(left: Expression, right: Expression)

override def sql: String = {
val inputSQL = left.sql
val listSQL = right.sql.toString
val listSQL = right.sql
s"($inputSQL ENDSWITH ($listSQL))"
}

Expand All @@ -181,6 +193,12 @@ case class GpuEndsWith(left: Expression, right: Expression)
def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector =
lhs.getBase.endsWith(rhs)

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector =
throw new IllegalStateException(
"Really should not be here, cannot have two column vectors as input in EndsWith")
Expand Down Expand Up @@ -300,6 +318,12 @@ case class GpuContains(left: Expression, right: Expression) extends GpuBinaryExp
def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector =
lhs.getBase.stringContains(rhs)

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector =
throw new IllegalStateException("Really should not be here, " +
"Cannot have two column vectors as input in Contains")
Expand Down Expand Up @@ -365,6 +389,12 @@ case class GpuSubstring(str: Expression, pos: Expression, len: Expression)
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
val0: GpuColumnVector,
val1: GpuColumnVector,
Expand Down Expand Up @@ -442,6 +472,12 @@ case class GpuStringReplace(
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, srcExpr.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
strExpr: GpuColumnVector,
searchExpr: GpuColumnVector,
Expand Down Expand Up @@ -490,6 +526,12 @@ case class GpuLike(left: Expression, right: Expression, escapeChar: Char)
lhs.getBase.matchesRe(regexStr)
}

override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs =>
doColumnar(expandedLhs, rhs)
}
}

override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType)

override def dataType: DataType = BooleanType
Expand Down Expand Up @@ -628,6 +670,12 @@ case class GpuSubstringIndex(strExpr: Expression,
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, strExpr.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
str: GpuColumnVector,
delim: GpuColumnVector,
Expand Down Expand Up @@ -694,6 +742,12 @@ trait BasePad extends GpuTernaryExpression with ImplicitCastInputTypes with Null
}
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
str: GpuColumnVector,
len: GpuColumnVector,
Expand Down Expand Up @@ -800,6 +854,12 @@ case class GpuStringSplit(str: Expression, regex: Expression, limit: Expression)
str.getBase.stringSplitRecord(regex, intLimit)
}

override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = {
withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col =>
doColumnar(val0Col, val1, val2)
}
}

override def doColumnar(
str: GpuColumnVector,
regex: GpuColumnVector,
Expand Down Expand Up @@ -835,4 +895,4 @@ case class GpuStringSplit(str: Expression, regex: Expression, limit: Expression)
regex: GpuColumnVector,
limit: Scalar): ColumnVector =
throw new IllegalStateException("This is not supported yet")
}
}