Skip to content

Commit

Permalink
fix incorrect imports in shim layer
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Feb 11, 2022
1 parent e396c69 commit c70390f
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, RapidsConf, RapidsMeta, RegexUnsupportedException, TernaryExprMeta}
import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, RapidsConf, RapidsMeta, RegexReplaceMode, RegexUnsupportedException, TernaryExprMeta}

import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, RegExpReplace}
import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace, RegexReplaceMode}
import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace}
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, QuaternaryExprMeta, RapidsConf, RapidsMeta, RegexUnsupportedException}
import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, QuaternaryExprMeta, RapidsConf, RapidsMeta, RegexReplaceMode, RegexUnsupportedException}

import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, RegExpReplace}
import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace, RegexReplaceMode}
import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace}
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, QuaternaryExprMeta, RapidsConf, RapidsMeta, RegexUnsupportedException}
import com.nvidia.spark.rapids.{CudfRegexTranspiler, DataFromReplacementRule, GpuExpression, GpuOverrides, QuaternaryExprMeta, RapidsConf, RapidsMeta, RegexReplaceMode, RegexUnsupportedException}

import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, RegExpReplace}
import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace, RegexReplaceMode}
import org.apache.spark.sql.rapids.{GpuRegExpReplace, GpuRegExpUtils, GpuStringReplace}
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr
// predicate boolean array results in the two T values mapping to
// indices 0 and 1, respectively.

// [F, null, T, F, T]
// [0, 0, 0, 1, 1]
[ 0, 1 ]

val prefixSumExclusive = withResource(boolToInt(predicate)) { boolsAsInts =>
boolsAsInts.scan(
ScanAggregation.sum(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql.rapids
import ai.rapids.cudf._
import ai.rapids.cudf.ast.BinaryOperator
import com.nvidia.spark.rapids._

import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.{Expression, ImplicitCastInputTypes, NullIntolerant, Predicate}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types.{AbstractDataType, AnyDataType, BooleanType, DataType, DoubleType, FloatType}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}

trait GpuPredicateHelper {
protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
Expand Down Expand Up @@ -66,6 +66,122 @@ case class GpuAnd(left: Expression, right: Expression) extends CudfBinaryOperato

override def binaryOp: BinaryOp = BinaryOp.NULL_LOGICAL_AND
override def astOperator: Option[BinaryOperator] = Some(ast.BinaryOperator.NULL_LOGICAL_AND)

protected def filterBatch(
tbl: Table,
pred: ColumnVector,
colTypes: Array[DataType]): ColumnarBatch = {
withResource(tbl.filter(pred)) { filteredData =>
GpuColumnVector.from(filteredData, colTypes)
}
}

def exampleTest: Unit = {


ColumnVector


}

private def columnarEvalWithSideEffects(batch: ColumnarBatch): Any = {
val leftExpr = left.asInstanceOf[GpuExpression]
val rightExpr = right.asInstanceOf[GpuExpression]
val colTypes = GpuColumnVector.extractTypes(batch)

withResource(GpuColumnVector.from(batch)) { tbl =>
withResource(GpuExpressionsUtils.columnarEvalToColumn(leftExpr, batch)) { lhsBool =>

GpuColumnVector.debug("lhsBool", lhsBool.getBase)

// filter to get rows where lhs was true
val rhsBool = withResource(filterBatch(tbl, lhsBool, colTypes)) { rhsBatch =>
rightExpr.columnarEval(rhsBatch)
}

GpuColumnVector.debug("rhsBool", rhsBool.getBase)

// a AND (CAST(b as INT) + 2) > 0
//
// a b
// true MAX_INT - 2 ... a AND b = true
// false MAX_INT - 2
// false MAX_INT <-- currently fails

// lhsBool = { true, false, false }

// filtered batch:
// true MAX_INT - 2 ... a AND b = true

// rhsBool = { true }

// perform AND lhsBool and rhsBool

// gather(lhsBool) = { 0, 1, 1 }
// combine lhsBool with gather => { 0 } into rhsBool

// val rhsAdjusted = gather(lhsBool, rhsBool)
// { true, false, false }

// lhsBool.and(rhsAdjusted)




// { 1



// lhsBool = { true, false, false }
// rhsBool = { true } -> { true, false, false }





// TODO: verify the best way to create FALSE_EXPR
// get the inverse of leftBool
withResource(lhsBool.getBase.unaryOp(UnaryOp.NOT)) { leftInverted =>
// TODO: How to evaluate RHS? on filtered batch or all batches?
val cView = withResourceIfAllowed(lhsBool) { lhs =>
withResource(GpuExpressionsUtils.columnarEvalToColumn(rightExpr, batch)) { rhsBool =>
withResourceIfAllowed(rightExpr.columnarEval(batch)) { rhs =>
(lhs, rhs) match {
case (l: GpuColumnVector, r: GpuColumnVector) =>
GpuColumnVector.from(doColumnar(l, r), dataType)
case (l: GpuScalar, r: GpuColumnVector) =>
GpuColumnVector.from(doColumnar(l, r), dataType)
case (l: GpuColumnVector, r: GpuScalar) =>
GpuColumnVector.from(doColumnar(l, r), dataType)
case (l: GpuScalar, r: GpuScalar) =>
GpuColumnVector.from(doColumnar(batch.numRows(), l, r), dataType)
case (l, r) =>
throw new UnsupportedOperationException(s"Unsupported data '($l: " +
s"${l.getClass}, $r: ${r.getClass})' for GPU binary expression.")
}
}
}
}
val flaseExpr = withResource(GpuScalar.from(false, BooleanType)) { falseScalar =>
GpuColumnVector.from(falseScalar, lhsBool.getRowCount.toInt, dataType)
}
val finalReturn = leftInverted.ifElse(flaseExpr.getBase, cView.getBase)
GpuColumnVector.from(finalReturn, dataType)
}
}
}
}

// TODO: Is this right place? or overriding the doColumnar?
override def columnarEval(batch: ColumnarBatch): Any = {
val rightExpr = right.asInstanceOf[GpuExpression]

if (rightExpr.hasSideEffects) {
columnarEvalWithSideEffects(batch)
} else {
super.columnarEval(batch)
}
}
}

case class GpuOr(left: Expression, right: Expression) extends CudfBinaryOperator with Predicate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,14 @@ class CastOpSuite extends GpuExpressionTestSuite {
}
}

test("") {
val lhs = ColumnVector.fromBooleans(true, false, false)
val rhs = ColumnVector.fromBooleans(true, false, false)
val expected = ColumnVector.fromBooleans(true, false, false)


}

test("CAST string to float - sanitize step") {
val testPairs = Seq(
("\tinf", "inf"),
Expand Down

0 comments on commit c70390f

Please sign in to comment.