diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index fb3251871ba..e6cf44d5790 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -93,6 +93,13 @@ def test_coalesce(data_gen): lambda spark : gen_df(spark, gen).select( f.coalesce(*command_args))) +def test_coalesce_constant_output(): + # Coalesce can allow a constant value as output. Technically Spark should mark this + # as foldable and turn it into a constant, but it does not, so make sure our code + # can deal with it. (This means something like + will get two constant scalar values) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.range(1, 100).selectExpr("4 + coalesce(5, id) as nine")) + @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) def test_nvl2(data_gen): (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=True) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala index bb1a433139a..a75e47e3745 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala @@ -111,20 +111,28 @@ abstract class GpuUnaryExpression extends UnaryExpression with GpuExpression { def outputTypeOverride: DType = null + private[this] def doItColumnar(input: GpuColumnVector): GpuColumnVector = { + withResource(doColumnar(input)) { vec => + if (outputTypeOverride != null && outputTypeOverride != vec.getType) { + GpuColumnVector.from(vec.castTo(outputTypeOverride), dataType) + } else { + GpuColumnVector.from(vec.incRefCount(), dataType) + } + } + } + override def columnarEval(batch: ColumnarBatch): Any = { val input = child.columnarEval(batch) try { input match { case vec: GpuColumnVector => - withResource(doColumnar(vec)) { base => - if (outputTypeOverride != null && outputTypeOverride != base.getType) { - GpuColumnVector.from(base.castTo(outputTypeOverride), dataType) - } else { - GpuColumnVector.from(base.incRefCount(), dataType) + doItColumnar(vec) + case other => + withResource(GpuScalar.from(other, child.dataType)) { s => + withResource(GpuColumnVector.from(s, batch.numRows(), child.dataType)) { vec => + doItColumnar(vec) } } - case _ => throw new IllegalStateException( - s"Unary expression $this should only see a column result from child eval") } } finally { if (input.isInstanceOf[AutoCloseable]) { @@ -145,6 +153,7 @@ trait GpuBinaryExpression extends BinaryExpression with GpuExpression { def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector def doColumnar(lhs: Scalar, rhs: GpuColumnVector): ColumnVector def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector + def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector override def columnarEval(batch: ColumnarBatch): Any = { var lhs: Any = null @@ -164,7 +173,12 @@ trait GpuBinaryExpression extends BinaryExpression with GpuExpression { withResource(GpuScalar.from(r, right.dataType)) { scalar => GpuColumnVector.from(doColumnar(l, scalar), dataType) } - case (l, r) if (l != null && r != null) => nullSafeEval(l, r) + case (l, r) if l != null && r != null => + withResource(GpuScalar.from(l, left.dataType)) { leftScalar => + withResource(GpuScalar.from(r, right.dataType)) { rightScalar => + GpuColumnVector.from(doColumnar(batch.numRows(), leftScalar, rightScalar), dataType) + } + } case _ => null } } finally { @@ -211,6 +225,12 @@ trait CudfBinaryExpression extends GpuBinaryExpression { val outType = outputType(lBase, rhs) lBase.binaryOp(binaryOp, rhs, outType) } + + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } } abstract class CudfBinaryOperator extends GpuBinaryOperator with CudfBinaryExpression @@ -268,6 +288,7 @@ trait GpuTernaryExpression extends TernaryExpression with GpuExpression { def doColumnar(val0: GpuColumnVector, val1: Scalar, val2: GpuColumnVector): ColumnVector def doColumnar(val0: GpuColumnVector, val1: Scalar, val2: Scalar): ColumnVector def doColumnar(val0: GpuColumnVector, val1: GpuColumnVector, val2: Scalar): ColumnVector + def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector override def columnarEval(batch: ColumnarBatch): Any = { var val0: Any = null @@ -329,8 +350,15 @@ trait GpuTernaryExpression extends TernaryExpression with GpuExpression { scalar1.close() scalar2.close() } - case (v0, v1, v2) if (v0 != null && v1 != null && v2 != null) => - nullSafeEval(v0, v1, v2) + case (v0, v1, v2) if v0 != null && v1 != null && v2 != null => + withResource(GpuScalar.from(v0, children(0).dataType)) { v0Scalar => + withResource(GpuScalar.from(v1, children(1).dataType)) { v1Scalar => + withResource(GpuScalar.from(v2, children(2).dataType)) { v2Scalar => + GpuColumnVector.from(doColumnar(batch.numRows(), v0Scalar, v1Scalar, v2Scalar), + dataType) + } + } + } case _ => null } } finally { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/nullExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/nullExpressions.scala index 55837236121..d41bb560a6b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/nullExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/nullExpressions.scala @@ -273,6 +273,12 @@ case class GpuNaNvl(left: Expression, right: Expression) extends GpuBinaryExpres } } + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } + override def dataType: DataType = left.dataType // Access to AbstractDataType is not allowed, and not really needed here diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/bitwise.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/bitwise.scala index 1772136f788..45f40e82ba7 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/bitwise.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/bitwise.scala @@ -78,6 +78,12 @@ trait GpuShiftBase extends GpuBinaryExpression with ImplicitCastInputTypes { lBase.binaryOp(shiftOp, distance, lBase.getType) } } + + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } } case class GpuShiftLeft(left: Expression, right: Expression) extends GpuShiftBase { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala index 53954c0fb59..fc336084f6f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala @@ -88,6 +88,12 @@ case class GpuGetArrayItem(child: Expression, ordinal: Expression) } } } + + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } } class GpuGetMapValueMeta( @@ -139,6 +145,11 @@ case class GpuGetMapValue(child: Expression, key: Expression) override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = lhs.getBase.getMapValue(rhs) + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } override def doColumnar(lhs: Scalar, rhs: GpuColumnVector): ColumnVector = throw new IllegalStateException("This is not supported yet") diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index e79f80ab1fe..3c48dfcd2e4 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -243,6 +243,12 @@ case class GpuDateDiff(endDate: Expression, startDate: Expression) } } } + + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } } case class GpuQuarter(child: Expression) extends GpuDateUnaryExpression { @@ -328,6 +334,7 @@ abstract class GpuToTimestamp throw new IllegalArgumentException("lhs has to be a vector and rhs has to be a scalar for " + "the unixtimestamp to work") } + override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = { val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format @@ -344,6 +351,12 @@ abstract class GpuToTimestamp } } } + + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } } /** @@ -462,6 +475,12 @@ case class GpuFromUnixTime( } } + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) } @@ -512,6 +531,12 @@ trait GpuDateMathBase extends GpuBinaryExpression with ExpectsInputTypes { } } } + + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } } case class GpuDateSub(startDate: Expression, days: Expression) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala index eee84236fc2..f4313dfb7b3 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala @@ -121,6 +121,12 @@ case class GpuStringLocate(substr: Expression, col: Expression, start: Expressio } } + override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(val1, numRows, col.dataType)) { val1Col => + doColumnar(val0, val1Col, val2) + } + } + override def doColumnar( val0: GpuColumnVector, val1: Scalar, @@ -147,7 +153,7 @@ case class GpuStartsWith(left: Expression, right: Expression) override def sql: String = { val inputSQL = left.sql - val listSQL = right.sql.toString + val listSQL = right.sql s"($inputSQL STARTSWITH ($listSQL))" } @@ -156,6 +162,12 @@ case class GpuStartsWith(left: Expression, right: Expression) def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = lhs.getBase.startsWith(rhs) + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } + override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = throw new IllegalStateException( "Really should not be here, cannot have two column vectors as input in StartsWith") @@ -172,7 +184,7 @@ case class GpuEndsWith(left: Expression, right: Expression) override def sql: String = { val inputSQL = left.sql - val listSQL = right.sql.toString + val listSQL = right.sql s"($inputSQL ENDSWITH ($listSQL))" } @@ -181,6 +193,12 @@ case class GpuEndsWith(left: Expression, right: Expression) def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = lhs.getBase.endsWith(rhs) + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } + override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = throw new IllegalStateException( "Really should not be here, cannot have two column vectors as input in EndsWith") @@ -300,6 +318,12 @@ case class GpuContains(left: Expression, right: Expression) extends GpuBinaryExp def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = lhs.getBase.stringContains(rhs) + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } + override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = throw new IllegalStateException("Really should not be here, " + "Cannot have two column vectors as input in Contains") @@ -365,6 +389,12 @@ case class GpuSubstring(str: Expression, pos: Expression, len: Expression) } } + override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col => + doColumnar(val0Col, val1, val2) + } + } + override def doColumnar( val0: GpuColumnVector, val1: GpuColumnVector, @@ -442,6 +472,12 @@ case class GpuStringReplace( } } + override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(val0, numRows, srcExpr.dataType)) { val0Col => + doColumnar(val0Col, val1, val2) + } + } + override def doColumnar( strExpr: GpuColumnVector, searchExpr: GpuColumnVector, @@ -490,6 +526,12 @@ case class GpuLike(left: Expression, right: Expression, escapeChar: Char) lhs.getBase.matchesRe(regexStr) } + override def doColumnar(numRows: Int, lhs: Scalar, rhs: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(lhs, numRows, left.dataType)) { expandedLhs => + doColumnar(expandedLhs, rhs) + } + } + override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType) override def dataType: DataType = BooleanType @@ -628,6 +670,12 @@ case class GpuSubstringIndex(strExpr: Expression, } } + override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(val0, numRows, strExpr.dataType)) { val0Col => + doColumnar(val0Col, val1, val2) + } + } + override def doColumnar( str: GpuColumnVector, delim: GpuColumnVector, @@ -694,6 +742,12 @@ trait BasePad extends GpuTernaryExpression with ImplicitCastInputTypes with Null } } + override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col => + doColumnar(val0Col, val1, val2) + } + } + override def doColumnar( str: GpuColumnVector, len: GpuColumnVector, @@ -800,6 +854,12 @@ case class GpuStringSplit(str: Expression, regex: Expression, limit: Expression) str.getBase.stringSplitRecord(regex, intLimit) } + override def doColumnar(numRows: Int, val0: Scalar, val1: Scalar, val2: Scalar): ColumnVector = { + withResource(GpuColumnVector.from(val0, numRows, str.dataType)) { val0Col => + doColumnar(val0Col, val1, val2) + } + } + override def doColumnar( str: GpuColumnVector, regex: GpuColumnVector, @@ -835,4 +895,4 @@ case class GpuStringSplit(str: Expression, regex: Expression, limit: Expression) regex: GpuColumnVector, limit: Scalar): ColumnVector = throw new IllegalStateException("This is not supported yet") -} \ No newline at end of file +}