diff --git a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala index 4821fb6805e..ce13571e910 100644 --- a/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala +++ b/sql-plugin/src/main/301db/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala @@ -15,10 +15,10 @@ */ package com.nvidia.spark.rapids.shims.v2 -import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, RapidsConf, RapidsMeta, RegexUnsupportedException, TernaryExprMeta} +import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, RapidsConf, RapidsMeta, RegexReplaceMode, RegexUnsupportedException, TernaryExprMeta} import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, RegExpReplace} -import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace, RegexReplaceMode} +import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace} import org.apache.spark.sql.types.DataTypes import org.apache.spark.unsafe.types.UTF8String diff --git a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala index aab6c705cec..973948518c5 100644 --- a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala +++ b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala @@ -15,10 +15,10 @@ */ package com.nvidia.spark.rapids.shims.v2 -import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, QuaternaryExprMeta, RapidsConf, RapidsMeta, RegexUnsupportedException} +import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, QuaternaryExprMeta, RapidsConf, RapidsMeta, RegexReplaceMode, RegexUnsupportedException} import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, RegExpReplace} -import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace, RegexReplaceMode} +import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace} import org.apache.spark.sql.types.DataTypes import org.apache.spark.unsafe.types.UTF8String diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala index aab6c705cec..973948518c5 100644 --- a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala +++ b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/v2/GpuRegExpReplaceExec.scala @@ -15,10 +15,10 @@ */ package com.nvidia.spark.rapids.shims.v2 -import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, QuaternaryExprMeta, RapidsConf, RapidsMeta, RegexUnsupportedException} +import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, QuaternaryExprMeta, RapidsConf, RapidsMeta, RegexReplaceMode, RegexUnsupportedException} import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, RegExpReplace} -import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace, RegexReplaceMode} +import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace} import org.apache.spark.sql.types.DataTypes import org.apache.spark.unsafe.types.UTF8String diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 9781ab3e6e4..68924cfca72 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -121,6 +121,10 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr // predicate boolean array results in the two T values mapping to // indices 0 and 1, respectively. + // [F, null, T, F, T] + // [0, 0, 0, 1, 1] + [ 0, 1 ] + val prefixSumExclusive = withResource(boolToInt(predicate)) { boolsAsInts => boolsAsInts.scan( ScanAggregation.sum(), diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/predicates.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/predicates.scala index a98a728a1b3..f42484d7a54 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/predicates.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/predicates.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf._ import ai.rapids.cudf.ast.BinaryOperator import com.nvidia.spark.rapids._ - import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.{Expression, ImplicitCastInputTypes, NullIntolerant, Predicate} import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BooleanType, DataType, DoubleType, FloatType} +import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} trait GpuPredicateHelper { protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = { @@ -66,6 +66,122 @@ case class GpuAnd(left: Expression, right: Expression) extends CudfBinaryOperato override def binaryOp: BinaryOp = BinaryOp.NULL_LOGICAL_AND override def astOperator: Option[BinaryOperator] = Some(ast.BinaryOperator.NULL_LOGICAL_AND) + + protected def filterBatch( + tbl: Table, + pred: ColumnVector, + colTypes: Array[DataType]): ColumnarBatch = { + withResource(tbl.filter(pred)) { filteredData => + GpuColumnVector.from(filteredData, colTypes) + } + } + + def exampleTest: Unit = { + + + ColumnVector + + + } + + private def columnarEvalWithSideEffects(batch: ColumnarBatch): Any = { + val leftExpr = left.asInstanceOf[GpuExpression] + val rightExpr = right.asInstanceOf[GpuExpression] + val colTypes = GpuColumnVector.extractTypes(batch) + + withResource(GpuColumnVector.from(batch)) { tbl => + withResource(GpuExpressionsUtils.columnarEvalToColumn(leftExpr, batch)) { lhsBool => + + GpuColumnVector.debug("lhsBool", lhsBool.getBase) + + // filter to get rows where lhs was true + val rhsBool = withResource(filterBatch(tbl, lhsBool, colTypes)) { rhsBatch => + rightExpr.columnarEval(rhsBatch) + } + + GpuColumnVector.debug("rhsBool", rhsBool.getBase) + + // a AND (CAST(b as INT) + 2) > 0 + // + // a b + // true MAX_INT - 2 ... a AND b = true + // false MAX_INT - 2 + // false MAX_INT <-- currently fails + + // lhsBool = { true, false, false } + + // filtered batch: + // true MAX_INT - 2 ... a AND b = true + + // rhsBool = { true } + + // perform AND lhsBool and rhsBool + + // gather(lhsBool) = { 0, 1, 1 } + // combine lhsBool with gather => { 0 } into rhsBool + + // val rhsAdjusted = gather(lhsBool, rhsBool) + // { true, false, false } + + // lhsBool.and(rhsAdjusted) + + + + + // { 1 + + + + // lhsBool = { true, false, false } + // rhsBool = { true } -> { true, false, false } + + + + + + // TODO: verify the best way to create FALSE_EXPR + // get the inverse of leftBool + withResource(lhsBool.getBase.unaryOp(UnaryOp.NOT)) { leftInverted => + // TODO: How to evaluate RHS? on filtered batch or all batches? + val cView = withResourceIfAllowed(lhsBool) { lhs => + withResource(GpuExpressionsUtils.columnarEvalToColumn(rightExpr, batch)) { rhsBool => + withResourceIfAllowed(rightExpr.columnarEval(batch)) { rhs => + (lhs, rhs) match { + case (l: GpuColumnVector, r: GpuColumnVector) => + GpuColumnVector.from(doColumnar(l, r), dataType) + case (l: GpuScalar, r: GpuColumnVector) => + GpuColumnVector.from(doColumnar(l, r), dataType) + case (l: GpuColumnVector, r: GpuScalar) => + GpuColumnVector.from(doColumnar(l, r), dataType) + case (l: GpuScalar, r: GpuScalar) => + GpuColumnVector.from(doColumnar(batch.numRows(), l, r), dataType) + case (l, r) => + throw new UnsupportedOperationException(s"Unsupported data '($l: " + + s"${l.getClass}, $r: ${r.getClass})' for GPU binary expression.") + } + } + } + } + val flaseExpr = withResource(GpuScalar.from(false, BooleanType)) { falseScalar => + GpuColumnVector.from(falseScalar, lhsBool.getRowCount.toInt, dataType) + } + val finalReturn = leftInverted.ifElse(flaseExpr.getBase, cView.getBase) + GpuColumnVector.from(finalReturn, dataType) + } + } + } + } + + // TODO: Is this right place? or overriding the doColumnar? + override def columnarEval(batch: ColumnarBatch): Any = { + val rightExpr = right.asInstanceOf[GpuExpression] + + if (rightExpr.hasSideEffects) { + columnarEvalWithSideEffects(batch) + } else { + super.columnarEval(batch) + } + } } case class GpuOr(left: Expression, right: Expression) extends CudfBinaryOperator with Predicate { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index ae4db3996a6..3a28cd26a47 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -950,6 +950,14 @@ class CastOpSuite extends GpuExpressionTestSuite { } } + test("") { + val lhs = ColumnVector.fromBooleans(true, false, false) + val rhs = ColumnVector.fromBooleans(true, false, false) + val expected = ColumnVector.fromBooleans(true, false, false) + + + } + test("CAST string to float - sanitize step") { val testPairs = Seq( ("\tinf", "inf"),