Skip to content

Commit

Permalink
Spark SQL hash function using murmur3 (NVIDIA#1207)
Browse files Browse the repository at this point in the history
GpuMurmur3Hash implementation

Signed-off-by: Ryan Lee <[email protected]>
  • Loading branch information
rwlee authored Jan 6, 2021
1 parent 601d9fe commit e16493b
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ class UnaryOperatorsSuite extends SparkQueryCompareTestSuite {
frame => frame.selectExpr("md5(strings)", "md5(cast(ints as string))",
"md5(cast(longs as binary))")
}

testSparkResultsAreEqual("Test murmur3", mixedDfWithNulls) {
frame => frame.selectExpr("hash(longs, doubles, 1, null, 'stock string', ints, strings)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,19 @@ object GpuOverrides {
(a, conf, p, r) => new ComplexTypeMergingExprMeta[Concat](a, conf, p, r) {
override def convertToGpu(child: Seq[Expression]): GpuExpression = GpuConcat(child)
}),
expr[Murmur3Hash] (
"Murmur3 hash operator",
ExprChecks.projectNotLambda(TypeSig.INT, TypeSig.INT,
repeatingParamCheck = Some(RepeatingParamCheck("input",
TypeSig.BOOLEAN + TypeSig.BYTE + TypeSig.SHORT + TypeSig.INT + TypeSig.LONG +
TypeSig.FLOAT + TypeSig.DOUBLE + TypeSig.STRING + TypeSig.NULL,
TypeSig.BOOLEAN + TypeSig.BYTE + TypeSig.SHORT + TypeSig.INT + TypeSig.LONG +
TypeSig.FLOAT + TypeSig.DOUBLE + TypeSig.STRING + TypeSig.NULL))),
(a, conf, p, r) => new ExprMeta[Murmur3Hash](a, conf, p, r) {
override val childExprs: Seq[BaseExprMeta[_]] = a.children
.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
def convertToGpu(): GpuExpression = GpuMurmur3Hash(childExprs.map(_.convertToGpu()))
}),
expr[Contains](
"Contains",
ExprChecks.binaryProjectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

package org.apache.spark.sql.rapids

import ai.rapids.cudf.{BinaryOp, ColumnVector, DType}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuUnaryExpression}
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, Scalar}
import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuScalar, GpuUnaryExpression}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.expressions.{Expression, ImplicitCastInputTypes, NullIntolerant}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String

case class GpuMd5(child: Expression)
extends GpuUnaryExpression with ImplicitCastInputTypes with NullIntolerant {
Expand All @@ -34,3 +39,32 @@ case class GpuMd5(child: Expression)
}
}
}

case class GpuMurmur3Hash(child: Seq[Expression]) extends GpuExpression {
override def dataType: DataType = IntegerType

override def toString: String = s"hash($child)"
def nullable: Boolean = children.exists(_.nullable)
def children: Seq[Expression] = child

def columnarEval(batch: ColumnarBatch): Any = {
val rows = batch.numRows()
val columns: ArrayBuffer[ColumnVector] = new ArrayBuffer[ColumnVector]()
try {
children.foreach { child => child.columnarEval(batch) match {
case vector: GpuColumnVector =>
columns += vector.getBase
case col => if (col != null) {
withResource(GpuScalar.from(col)) { scalarValue =>
columns += ai.rapids.cudf.ColumnVector.fromScalar(scalarValue, rows)
}
}
}
}
GpuColumnVector.from(
ColumnVector.spark32BitMurmurHash3(42, columns.toArray[ColumnView]), dataType)
} finally {
columns.safeClose()
}
}
}

0 comments on commit e16493b

Please sign in to comment.