From 780156e27ba4c7cd3c875e6da1c663ff0f1c3544 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 20 Jul 2023 16:20:03 -0500 Subject: [PATCH 01/12] Support BloomFilterMightContain expression Signed-off-by: Jason Lowe --- docs/supported_ops.md | 8 +- .../nvidia/spark/rapids/GpuOverrides.scala | 4 +- .../nvidia/spark/rapids/GpuBloomFilter.scala | 127 ++++++++++++++++++ .../rapids/GpuBloomFilterMightContain.scala | 104 ++++++++++++++ .../rapids/shims/Spark330PlusShims.scala | 11 ++ .../BloomFilterAggregateQuerySuite.scala | 102 ++++++++++++++ tools/generated_files/supportedExprs.csv | 2 +- 7 files changed, 351 insertions(+), 7 deletions(-) create mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala create mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterMightContain.scala create mode 100644 tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 8ef7b5d61af..90e3a3222c7 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -18339,11 +18339,11 @@ are limited. S S S +S NS -NS -PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
-PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
-PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types CALENDAR, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types CALENDAR, UDT
NS diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index eaf0d9f2b3f..0365d5ab08a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3464,8 +3464,8 @@ object GpuOverrides extends Logging { expr[org.apache.spark.sql.execution.ScalarSubquery]( "Subquery that will return only one row and one column", ExprChecks.projectOnly( - (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 - + TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT).nested(), + (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128 + TypeSig.BINARY + + TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT).nested(), TypeSig.all, Nil, None), (a, conf, p, r) => diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala new file mode 100644 index 00000000000..b8a66e74d5c --- /dev/null +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids + +import java.io.DataInputStream + +import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, Cuda, DeviceMemoryBuffer, DType, HostMemoryBuffer} +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.jni.BloomFilter + +import org.apache.spark.sql.types.BinaryType + +/** GPU version of Spark's BloomFilterImpl */ +class GpuBloomFilter(val numHashes: Int, buffer: DeviceMemoryBuffer) extends AutoCloseable { + private val spillableBuffer = SpillableBuffer(buffer, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) + private val numFilterBits = buffer.getLength * 8 + + /** + * Given an input column of longs, return a boolean column with the same row count where each + * output row indicates whether the corresponding input row may have been placed into this + * Bloom filter. A false value indicates definitively that the value was not placed in the filter. + */ + def mightContainLong(col: ColumnVector): ColumnVector = { + require(col.getType == DType.INT64, s"expected longs, got ${col.getType}") + withResource(spillableBuffer.getDeviceBuffer()) { buffer => + buffer.incRefCount() + withResource(new BloomFilter(numHashes, numFilterBits, buffer)) { filter => + filter.probe(col) + } + } + } + + override def close(): Unit = { + spillableBuffer.close() + } +} + +object GpuBloomFilter { + // Spark serializes their bloom filters in a specific format, see BloomFilterImpl.readFrom. + // Data is written via DataOutputStream, so everything is big-endian. + // Byte Offset Size Description + // 0 4 Version ID (see Spark's BloomFilter.Version) + // 4 4 Number of hash functions + // 8 4 Number of longs, N + // 12 N*8 Bloom filter data buffer as longs + private val HEADER_SIZE = 12 + + // version numbers from BloomFilter.Version enum + private val VERSION_V1 = 1 + + def apply(s: GpuScalar): GpuBloomFilter = { + if (s == null) { + null + } else { + s.dataType match { + case BinaryType => + withResource(s.getBase.getListAsColumnView) { childView => + require(childView.getType == DType.UINT8, s"expected UINT8 got ${childView.getType}") + deserialize(childView.getData) + } + case t => throw new IllegalArgumentException(s"Expected binary scalar, found $t") + } + } + } + + def deserialize(data: BaseDeviceMemoryBuffer): GpuBloomFilter = { + // Sanity check bloom filter header + val totalLen = data.getLength + val bufferLen = totalLen - HEADER_SIZE + require(totalLen >= HEADER_SIZE, s"header size is $totalLen") + require(bufferLen % 8 == 0, "buffer length not a multiple of 8") + val numHashes = withResource(HostMemoryBuffer.allocate(HEADER_SIZE, false)) { hostHeader => + hostHeader.copyFromMemoryBuffer(0, data, 0, HEADER_SIZE, Cuda.DEFAULT_STREAM) + parseHeader(hostHeader, bufferLen) + } + // TODO: Can we avoid this copy? Would either need the ability to release data buffers + // from scalars or make scalars spillable. + val filterBuffer = DeviceMemoryBuffer.allocate(bufferLen) + closeOnExcept(filterBuffer) { buf => + buf.copyFromDeviceBufferAsync(0, data, HEADER_SIZE, buf.getLength, Cuda.DEFAULT_STREAM) + } + new GpuBloomFilter(numHashes, filterBuffer) + } + + /** + * Parses the Spark Bloom filter serialization header performing sanity checks + * and retrieving the number of hash functions used for the filter. + * @param buffer serialized header data + * @param dataLen size of the serialized Bloom filter data without header + * @return number of hash functions used in the Bloom filter + */ + private def parseHeader(buffer: HostMemoryBuffer, dataLen: Long): Int = { + val in = new DataInputStream(new HostMemoryInputStream(buffer, buffer.getLength)) + val version = in.readInt + require(version == VERSION_V1, s"unsupported serialization format version $version") + val numHashes = in.readInt() + val sizeFromHeader = in.readInt() * 8L + require(dataLen == sizeFromHeader, + s"data size from header is $sizeFromHeader, received $dataLen") + numHashes + } +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterMightContain.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterMightContain.scala new file mode 100644 index 00000000000..3f0050f8706 --- /dev/null +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterMightContain.scala @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids + +import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed} +import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression +import com.nvidia.spark.rapids.shims.ShimBinaryExpression + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField, PlanExpression} +import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +case class GpuBloomFilterMightContain( + bloomFilterExpression: Expression, + valueExpression: Expression) + extends ShimBinaryExpression with GpuExpression with AutoCloseable { + + @transient private lazy val bloomFilter: GpuBloomFilter = { + Option(TaskContext.get).foreach(_.addTaskCompletionListener[Unit](_ => close())) + withResourceIfAllowed(bloomFilterExpression.columnarEvalAny(new ColumnarBatch(Array.empty))) { + case s: GpuScalar => GpuBloomFilter(s) + case x => throw new IllegalStateException(s"Expected GPU scalar, found $x") + } + } + + override def nullable: Boolean = true + + override def left: Expression = bloomFilterExpression + + override def right: Expression = valueExpression + + override def prettyName: String = "might_contain" + + override def dataType: DataType = BooleanType + + override def checkInputDataTypes(): TypeCheckResult = { + (left.dataType, right.dataType) match { + case (BinaryType, NullType) | (NullType, LongType) | (NullType, NullType) | + (BinaryType, LongType) => + bloomFilterExpression match { + case e: Expression if e.foldable => TypeCheckResult.TypeCheckSuccess + case subquery: PlanExpression[_] if !subquery.containsPattern(OUTER_REFERENCE) => + TypeCheckResult.TypeCheckSuccess + case GetStructField(subquery: PlanExpression[_], _, _) + if !subquery.containsPattern(OUTER_REFERENCE) => + TypeCheckResult.TypeCheckSuccess + case _ => + TypeCheckResult.TypeCheckFailure(s"The Bloom filter binary input to $prettyName " + + "should be either a constant value or a scalar subquery expression") + } + case _ => TypeCheckResult.TypeCheckFailure(s"Input to function $prettyName should have " + + s"been ${BinaryType.simpleString} followed by a value with ${LongType.simpleString}, " + + s"but it's [${left.dataType.catalogString}, ${right.dataType.catalogString}].") + } + } + + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + if (bloomFilter == null) { + null + } else { + withResource(valueExpression.columnarEval(batch)) { value => + if (value == null) { + null + } else { + GpuColumnVector.from(bloomFilter.mightContainLong(value.getBase), BooleanType) + } + } + } + } + + override def close(): Unit = { + if (bloomFilter != null) { + bloomFilter.close() + } + } +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/Spark330PlusShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/Spark330PlusShims.scala index 604aae46e9d..7bf701ab1cf 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/Spark330PlusShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/Spark330PlusShims.scala @@ -91,6 +91,17 @@ trait Spark330PlusShims extends Spark321PlusShims with Spark320PlusNonDBShims { (a, conf, p, r) => new BinaryExprMeta[DivideYMInterval](a, conf, p, r) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuDivideYMInterval(lhs, rhs) + }), + GpuOverrides.expr[BloomFilterMightContain]( + "Bloom filter query", + ExprChecks.binaryProject( + TypeSig.BOOLEAN, + TypeSig.BOOLEAN, + ("lhs", TypeSig.BINARY + TypeSig.NULL, TypeSig.BINARY + TypeSig.NULL), + ("rhs", TypeSig.LONG + TypeSig.NULL, TypeSig.LONG + TypeSig.NULL)), + (a, conf, p, r) => new BinaryExprMeta[BloomFilterMightContain](a, conf, p, r) { + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = + GpuBloomFilterMightContain(lhs, rhs) }) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap super.getExprs ++ map ++ DayTimeIntervalShims.exprs ++ RoundingShims.exprs diff --git a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala new file mode 100644 index 00000000000..b9da935e4fd --- /dev/null +++ b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate +import org.apache.spark.sql.internal.SQLConf + +class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { + val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg") + val funcId_might_contain = new FunctionIdentifier("might_contain") + + private def installSqlFuncs(spark: SparkSession): Unit = { + // Register 'bloom_filter_agg' to builtin. + spark.sessionState.functionRegistry.registerFunction(funcId_bloom_filter_agg, + new ExpressionInfo(classOf[BloomFilterAggregate].getName, "bloom_filter_agg"), + (children: Seq[Expression]) => children.size match { + case 1 => new BloomFilterAggregate(children.head) + case 2 => new BloomFilterAggregate(children.head, children(1)) + case 3 => new BloomFilterAggregate(children.head, children(1), children(2)) + }) + + // Register 'might_contain' to builtin. + spark.sessionState.functionRegistry.registerFunction(funcId_might_contain, + new ExpressionInfo(classOf[BloomFilterMightContain].getName, "might_contain"), + (children: Seq[Expression]) => BloomFilterMightContain(children.head, children(1))) + } + + private def uninstallSqlFuncs(spark: SparkSession): Unit = { + spark.sessionState.functionRegistry.dropFunction(funcId_bloom_filter_agg) + spark.sessionState.functionRegistry.dropFunction(funcId_might_contain) + } + + private def buildData(spark: SparkSession): DataFrame = { + import spark.implicits._ + (Seq(Long.MinValue, 0, Long.MaxValue) ++ (1L to 10000L)).toDF("col") + } + + for (numEstimated <- Seq(4096L, 4194304L, Long.MaxValue, + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { + for (numBits <- Seq(4096L, 4194304L, Long.MaxValue, + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { + ALLOW_NON_GPU_testSparkResultsAreEqual( + s"might_contain estimated=$numEstimated numBits=$numBits", + buildData, + Seq("ObjectHashAggregateExec", "ShuffleExchangeExec"))(df => + { + val table = "bloom_filter_test" + val sqlString = + s""" + |SELECT might_contain( + | (SELECT bloom_filter_agg(col, + | cast($numEstimated as long), + | cast($numBits as long)) + | FROM $table), + | col) positive_membership_test, + | might_contain( + | (SELECT bloom_filter_agg(col, + | cast($numEstimated as long), + | cast($numBits as long)) + | FROM values (-1L), (100001L), (20000L) as t(col)), + | col) negative_membership_test + |FROM $table + """.stripMargin + df.createOrReplaceTempView(table) + try { + installSqlFuncs(df.sparkSession) + df.sparkSession.sql(sqlString) + } finally { + uninstallSqlFuncs(df.sparkSession) + } + }) + } + } +} diff --git a/tools/generated_files/supportedExprs.csv b/tools/generated_files/supportedExprs.csv index 809b20a676d..8fff0f894d7 100644 --- a/tools/generated_files/supportedExprs.csv +++ b/tools/generated_files/supportedExprs.csv @@ -697,7 +697,7 @@ VarianceSamp,S,`var_samp`; `variance`,None,window,input,NA,NA,NA,NA,NA,NA,NS,NA, VarianceSamp,S,`var_samp`; `variance`,None,window,result,NA,NA,NA,NA,NA,NA,NS,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA NormalizeNaNAndZero,S, ,None,project,input,NA,NA,NA,NA,NA,S,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA NormalizeNaNAndZero,S, ,None,project,result,NA,NA,NA,NA,NA,S,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -ScalarSubquery,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,PS,PS,PS,NS +ScalarSubquery,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS HiveGenericUDF,S, ,None,project,param,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,NS HiveGenericUDF,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,NS HiveSimpleUDF,S, ,None,project,param,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,NS From e9f4529a8e2ca856b5e1c221e415acd1e2556a98 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 27 Jul 2023 16:35:32 -0500 Subject: [PATCH 02/12] Fix null scalar handling, add null tests --- .../nvidia/spark/rapids/GpuBloomFilter.scala | 36 +++++++++--------- .../rapids/GpuBloomFilterMightContain.scala | 6 +-- .../BloomFilterAggregateQuerySuite.scala | 38 ++++++++++++++++++- 3 files changed, 58 insertions(+), 22 deletions(-) diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala index b8a66e74d5c..3f00ee82bba 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala @@ -29,13 +29,19 @@ package com.nvidia.spark.rapids import java.io.DataInputStream -import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, Cuda, DeviceMemoryBuffer, DType, HostMemoryBuffer} +import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, Cuda, DType, DeviceMemoryBuffer, HostMemoryBuffer} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.jni.BloomFilter -import org.apache.spark.sql.types.BinaryType +import org.apache.spark.sql.types.{BinaryType, NullType} -/** GPU version of Spark's BloomFilterImpl */ +/** + * GPU version of Spark's BloomFilterImpl. + * @param numHashes number of hash functions to use in the Bloom filter + * @param buffer device buffer containing the Bloom filter data in the Spark Bloom filter + * serialization format. The device buffer will be closed when this GpuBloomFilter + * instance is closed. + */ class GpuBloomFilter(val numHashes: Int, buffer: DeviceMemoryBuffer) extends AutoCloseable { private val spillableBuffer = SpillableBuffer(buffer, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) private val numFilterBits = buffer.getLength * 8 @@ -48,10 +54,7 @@ class GpuBloomFilter(val numHashes: Int, buffer: DeviceMemoryBuffer) extends Aut def mightContainLong(col: ColumnVector): ColumnVector = { require(col.getType == DType.INT64, s"expected longs, got ${col.getType}") withResource(spillableBuffer.getDeviceBuffer()) { buffer => - buffer.incRefCount() - withResource(new BloomFilter(numHashes, numFilterBits, buffer)) { filter => - filter.probe(col) - } + BloomFilter.probe(numHashes, numFilterBits, buffer, col) } } @@ -74,17 +77,14 @@ object GpuBloomFilter { private val VERSION_V1 = 1 def apply(s: GpuScalar): GpuBloomFilter = { - if (s == null) { - null - } else { - s.dataType match { - case BinaryType => - withResource(s.getBase.getListAsColumnView) { childView => - require(childView.getType == DType.UINT8, s"expected UINT8 got ${childView.getType}") - deserialize(childView.getData) - } - case t => throw new IllegalArgumentException(s"Expected binary scalar, found $t") - } + s.dataType match { + case BinaryType if s.isValid => + withResource(s.getBase.getListAsColumnView) { childView => + require(childView.getType == DType.UINT8, s"expected UINT8 got ${childView.getType}") + deserialize(childView.getData) + } + case BinaryType | NullType => null + case t => throw new IllegalArgumentException(s"Expected binary or null scalar, found $t") } } diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterMightContain.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterMightContain.scala index 3f0050f8706..361729879f5 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterMightContain.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterMightContain.scala @@ -84,11 +84,11 @@ case class GpuBloomFilterMightContain( override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { if (bloomFilter == null) { - null + GpuColumnVector.fromNull(batch.numRows(), dataType) } else { withResource(valueExpression.columnarEval(batch)) { value => - if (value == null) { - null + if (value == null || value.dataType == NullType) { + GpuColumnVector.fromNull(batch.numRows(), dataType) } else { GpuColumnVector.from(bloomFilter.mightContainLong(value.getBase), BooleanType) } diff --git a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala index b9da935e4fd..7f252f31f23 100644 --- a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala +++ b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala @@ -60,7 +60,9 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { private def buildData(spark: SparkSession): DataFrame = { import spark.implicits._ - (Seq(Long.MinValue, 0, Long.MaxValue) ++ (1L to 10000L)).toDF("col") + (Seq(Some(Long.MinValue), Some(0L), Some(Long.MaxValue), None) ++ + (1L to 10000L).map(x => Some(x)) ++ + (1L to 100L).map(_ => None)).toDF("col") } for (numEstimated <- Seq(4096L, 4194304L, Long.MaxValue, @@ -99,4 +101,38 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { }) } } + + testSparkResultsAreEqual( + "might_contain with literal bloom filter buffer", + spark => spark.range(1, 1).asInstanceOf[DataFrame]) { + df => + try { + installSqlFuncs(df.sparkSession) + df.sparkSession.sql( + """SELECT might_contain( + |X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267', + |cast(201 as long))""".stripMargin) + } finally { + uninstallSqlFuncs(df.sparkSession) + } + } + + ALLOW_NON_GPU_testSparkResultsAreEqual( + "might_contain with all NULL inputs", + spark => spark.range(1, 1).asInstanceOf[DataFrame], + Seq("ObjectHashAggregateExec", "ShuffleExchangeExec")) { + df => + try { + installSqlFuncs(df.sparkSession) + df.sparkSession.sql( + """ + |SELECT might_contain(null, null) both_null, + | might_contain(null, 1L) null_bf, + | might_contain((SELECT bloom_filter_agg(cast(id as long)) from range(1, 10000)), + | null) null_value + """.stripMargin) + } finally { + uninstallSqlFuncs(df.sparkSession) + } + } } From 24a1831da8b4f988f649aa49238847a75a7f56fc Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 27 Jul 2023 17:06:47 -0500 Subject: [PATCH 03/12] scalastyle fixes --- .../nvidia/spark/rapids/GpuBloomFilter.scala | 2 +- .../BloomFilterAggregateQuerySuite.scala | 42 +++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala index 3f00ee82bba..cda30bbebdd 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala @@ -29,7 +29,7 @@ package com.nvidia.spark.rapids import java.io.DataInputStream -import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, Cuda, DType, DeviceMemoryBuffer, HostMemoryBuffer} +import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, Cuda, DeviceMemoryBuffer, DType, HostMemoryBuffer} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.jni.BloomFilter diff --git a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala index 7f252f31f23..cac6a9a55dc 100644 --- a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala +++ b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala @@ -106,15 +106,15 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { "might_contain with literal bloom filter buffer", spark => spark.range(1, 1).asInstanceOf[DataFrame]) { df => - try { - installSqlFuncs(df.sparkSession) - df.sparkSession.sql( - """SELECT might_contain( - |X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267', - |cast(201 as long))""".stripMargin) - } finally { - uninstallSqlFuncs(df.sparkSession) - } + try { + installSqlFuncs(df.sparkSession) + df.sparkSession.sql( + """SELECT might_contain( + |X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267', + |cast(201 as long))""".stripMargin) + } finally { + uninstallSqlFuncs(df.sparkSession) + } } ALLOW_NON_GPU_testSparkResultsAreEqual( @@ -122,17 +122,17 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { spark => spark.range(1, 1).asInstanceOf[DataFrame], Seq("ObjectHashAggregateExec", "ShuffleExchangeExec")) { df => - try { - installSqlFuncs(df.sparkSession) - df.sparkSession.sql( - """ - |SELECT might_contain(null, null) both_null, - | might_contain(null, 1L) null_bf, - | might_contain((SELECT bloom_filter_agg(cast(id as long)) from range(1, 10000)), - | null) null_value - """.stripMargin) - } finally { - uninstallSqlFuncs(df.sparkSession) - } + try { + installSqlFuncs(df.sparkSession) + df.sparkSession.sql( + """ + |SELECT might_contain(null, null) both_null, + | might_contain(null, 1L) null_bf, + | might_contain((SELECT bloom_filter_agg(cast(id as long)) from range(1, 10000)), + | null) null_value + """.stripMargin) + } finally { + uninstallSqlFuncs(df.sparkSession) + } } } From 0c7d711dde0766bbaaf4b77bff0cdc7a20d35985 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 28 Jul 2023 13:41:48 -0500 Subject: [PATCH 04/12] Fix overrides --- .../nvidia/spark/rapids/GpuOverrides.scala | 3 +- .../spark/rapids/shims/BloomFilterShims.scala | 38 ++++++++++++++ .../spark/rapids/shims/BloomFilterShims.scala | 51 +++++++++++++++++++ .../rapids/shims/Spark330PlusShims.scala | 11 ---- 4 files changed, 91 insertions(+), 12 deletions(-) create mode 100644 sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala create mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 0365d5ab08a..38ac90390a0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3535,7 +3535,8 @@ object GpuOverrides extends Logging { // Shim expressions should be last to allow overrides with shim-specific versions val expressions: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = commonExpressions ++ TimeStamp.getExprs ++ GpuHiveOverrides.exprs ++ - ZOrderRules.exprs ++ DecimalArithmeticOverrides.exprs ++ SparkShimImpl.getExprs + ZOrderRules.exprs ++ DecimalArithmeticOverrides.exprs ++ + BloomFilterShims.exprs ++ SparkShimImpl.getExprs def wrapScan[INPUT <: Scan]( scan: INPUT, diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala new file mode 100644 index 00000000000..91649c64150 --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "321db" } +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.ExprRule + +import org.apache.spark.sql.catalyst.expressions.Expression + + +object BloomFilterShims { + val exprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Map.empty +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala new file mode 100644 index 00000000000..bc69ab738d2 --- /dev/null +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.expressions._ + + +object BloomFilterShims { + val exprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { + Seq( + GpuOverrides.expr[BloomFilterMightContain]( + "Bloom filter query", + ExprChecks.binaryProject( + TypeSig.BOOLEAN, + TypeSig.BOOLEAN, + ("lhs", TypeSig.BINARY + TypeSig.NULL, TypeSig.BINARY + TypeSig.NULL), + ("rhs", TypeSig.LONG + TypeSig.NULL, TypeSig.LONG + TypeSig.NULL)), + (a, conf, p, r) => new BinaryExprMeta[BloomFilterMightContain](a, conf, p, r) { + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = + GpuBloomFilterMightContain(lhs, rhs) + }) + ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap + } +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/Spark330PlusShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/Spark330PlusShims.scala index 7bf701ab1cf..604aae46e9d 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/Spark330PlusShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/Spark330PlusShims.scala @@ -91,17 +91,6 @@ trait Spark330PlusShims extends Spark321PlusShims with Spark320PlusNonDBShims { (a, conf, p, r) => new BinaryExprMeta[DivideYMInterval](a, conf, p, r) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuDivideYMInterval(lhs, rhs) - }), - GpuOverrides.expr[BloomFilterMightContain]( - "Bloom filter query", - ExprChecks.binaryProject( - TypeSig.BOOLEAN, - TypeSig.BOOLEAN, - ("lhs", TypeSig.BINARY + TypeSig.NULL, TypeSig.BINARY + TypeSig.NULL), - ("rhs", TypeSig.LONG + TypeSig.NULL, TypeSig.LONG + TypeSig.NULL)), - (a, conf, p, r) => new BinaryExprMeta[BloomFilterMightContain](a, conf, p, r) { - override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = - GpuBloomFilterMightContain(lhs, rhs) }) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap super.getExprs ++ map ++ DayTimeIntervalShims.exprs ++ RoundingShims.exprs From b7b4140dc114e6390dfe44f7a26c3b6cacf89ab5 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 31 Jul 2023 14:24:31 -0500 Subject: [PATCH 05/12] Update to new spark-rapids-jni BloomFilter API --- .../nvidia/spark/rapids/GpuBloomFilter.scala | 51 ++++--------------- 1 file changed, 10 insertions(+), 41 deletions(-) diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala index cda30bbebdd..c96c4c3b5a4 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilter.scala @@ -27,9 +27,7 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids -import java.io.DataInputStream - -import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, Cuda, DeviceMemoryBuffer, DType, HostMemoryBuffer} +import ai.rapids.cudf.{BaseDeviceMemoryBuffer, ColumnVector, Cuda, DeviceMemoryBuffer, DType} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.jni.BloomFilter @@ -37,14 +35,12 @@ import org.apache.spark.sql.types.{BinaryType, NullType} /** * GPU version of Spark's BloomFilterImpl. - * @param numHashes number of hash functions to use in the Bloom filter * @param buffer device buffer containing the Bloom filter data in the Spark Bloom filter - * serialization format. The device buffer will be closed when this GpuBloomFilter - * instance is closed. + * serialization format, including the header. The buffer will be closed by + * this GpuBloomFilter instance. */ -class GpuBloomFilter(val numHashes: Int, buffer: DeviceMemoryBuffer) extends AutoCloseable { +class GpuBloomFilter(buffer: DeviceMemoryBuffer) extends AutoCloseable { private val spillableBuffer = SpillableBuffer(buffer, SpillPriorities.ACTIVE_ON_DECK_PRIORITY) - private val numFilterBits = buffer.getLength * 8 /** * Given an input column of longs, return a boolean column with the same row count where each @@ -54,7 +50,7 @@ class GpuBloomFilter(val numHashes: Int, buffer: DeviceMemoryBuffer) extends Aut def mightContainLong(col: ColumnVector): ColumnVector = { require(col.getType == DType.INT64, s"expected longs, got ${col.getType}") withResource(spillableBuffer.getDeviceBuffer()) { buffer => - BloomFilter.probe(numHashes, numFilterBits, buffer, col) + BloomFilter.probe(buffer, col) } } @@ -73,9 +69,6 @@ object GpuBloomFilter { // 12 N*8 Bloom filter data buffer as longs private val HEADER_SIZE = 12 - // version numbers from BloomFilter.Version enum - private val VERSION_V1 = 1 - def apply(s: GpuScalar): GpuBloomFilter = { s.dataType match { case BinaryType if s.isValid => @@ -91,37 +84,13 @@ object GpuBloomFilter { def deserialize(data: BaseDeviceMemoryBuffer): GpuBloomFilter = { // Sanity check bloom filter header val totalLen = data.getLength - val bufferLen = totalLen - HEADER_SIZE + val bitBufferLen = totalLen - HEADER_SIZE require(totalLen >= HEADER_SIZE, s"header size is $totalLen") - require(bufferLen % 8 == 0, "buffer length not a multiple of 8") - val numHashes = withResource(HostMemoryBuffer.allocate(HEADER_SIZE, false)) { hostHeader => - hostHeader.copyFromMemoryBuffer(0, data, 0, HEADER_SIZE, Cuda.DEFAULT_STREAM) - parseHeader(hostHeader, bufferLen) - } - // TODO: Can we avoid this copy? Would either need the ability to release data buffers - // from scalars or make scalars spillable. - val filterBuffer = DeviceMemoryBuffer.allocate(bufferLen) + require(bitBufferLen % 8 == 0, "buffer length not a multiple of 8") + val filterBuffer = DeviceMemoryBuffer.allocate(totalLen) closeOnExcept(filterBuffer) { buf => - buf.copyFromDeviceBufferAsync(0, data, HEADER_SIZE, buf.getLength, Cuda.DEFAULT_STREAM) + buf.copyFromDeviceBufferAsync(0, data, 0, buf.getLength, Cuda.DEFAULT_STREAM) } - new GpuBloomFilter(numHashes, filterBuffer) - } - - /** - * Parses the Spark Bloom filter serialization header performing sanity checks - * and retrieving the number of hash functions used for the filter. - * @param buffer serialized header data - * @param dataLen size of the serialized Bloom filter data without header - * @return number of hash functions used in the Bloom filter - */ - private def parseHeader(buffer: HostMemoryBuffer, dataLen: Long): Int = { - val in = new DataInputStream(new HostMemoryInputStream(buffer, buffer.getLength)) - val version = in.readInt - require(version == VERSION_V1, s"unsupported serialization format version $version") - val numHashes = in.readInt() - val sizeFromHeader = in.readInt() * 8L - require(dataLen == sizeFromHeader, - s"data size from header is $sizeFromHeader, received $dataLen") - numHashes + new GpuBloomFilter(filterBuffer) } } From fc5e3913e9cbd3923fbb6e3e5118373f51741763 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 28 Jul 2023 13:05:13 -0500 Subject: [PATCH 06/12] Support BloomFilterAggregate expression in a reduction context Signed-off-by: Jason Lowe --- .../src/main/python/join_test.py | 41 ++++- .../rapids/ExecutionPlanCaptureCallback.scala | 4 +- .../rapids/GpuBloomFilterAggregate.scala | 122 +++++++++++++ .../spark/rapids/shims/BloomFilterShims.scala | 20 ++- .../rapids/SparkQueryCompareTestSuite.scala | 13 ++ .../BloomFilterAggregateQuerySuite.scala | 166 +++++++++++++----- 6 files changed, 317 insertions(+), 49 deletions(-) create mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterAggregate.scala diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 58eb8933dd5..19239d4a800 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -14,13 +14,13 @@ import pytest from _pytest.mark.structures import ParameterSet -from pyspark.sql.functions import broadcast +from pyspark.sql.functions import broadcast, col from pyspark.sql.types import * from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture from conftest import is_databricks_runtime, is_emr_runtime from data_gen import * from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan -from spark_session import with_cpu_session, with_spark_session, is_databricks113_or_later +from spark_session import with_cpu_session, is_before_spark_330 pytestmark = [pytest.mark.nightly_resource_consuming_test] @@ -929,3 +929,40 @@ def do_join(spark): "spark.rapids.sql.test.subPartitioning.enabled": True }) assert_gpu_and_cpu_are_equal_collect(do_join, conf=_all_conf) + + +def check_bloom_filter_join(confs): + def do_join(spark): + left = spark.range(100000) + right = spark.range(10).withColumn("id2", col("id").cast("string")) + return right.filter("cast(id2 as bigint) % 3 = 0").join(left, left.id == right.id, "inner") + all_confs = copy_and_update(confs, { "spark.sql.autoBroadcastJoinThreshold": "1", + "spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold": 1, + "spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold": "100GB", + "spark.sql.optimizer.runtime.bloomFilter.enabled": "true"}) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=all_confs) + +@ignore_order(local=True) +@pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") +def test_bloom_filter_join(): + check_bloom_filter_join(confs={}) + +@allow_non_gpu("FilterExec", "ShuffleExchangeExec") +@ignore_order(local=True) +@pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") +def test_bloom_filter_join_cpu_probe(): + check_bloom_filter_join(confs={"spark.rapids.sql.expression.BloomFilterMightContain": "false"}) + +@allow_non_gpu("ObjectHashAggregateExec", "ShuffleExchangeExec") +@ignore_order(local=True) +@pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") +def test_bloom_filter_join_cpu_build(): + check_bloom_filter_join(confs={"spark.rapids.sql.expression.BloomFilterAggregate": "false"}) + +@allow_non_gpu("ObjectHashAggregateExec", "ProjectExec", "ShuffleExchangeExec") +@ignore_order(local=True) +@pytest.mark.parametrize("agg_replace_mode", ["partial", "final"]) +@pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") +def test_bloom_filter_join_cpu_build(agg_replace_mode): + check_bloom_filter_join(confs={"spark.rapids.sql.expression.BloomFilterAggregate": "false", + "spark.rapids.sql.hashAgg.replaceMode": agg_replace_mode}) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala index fa85f7612af..d3ea6d00809 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala @@ -138,13 +138,13 @@ object ExecutionPlanCaptureCallback { "Plan does not contain an ansi cast") } - private def didFallBack(exp: Expression, fallbackCpuClass: String): Boolean = { + def didFallBack(exp: Expression, fallbackCpuClass: String): Boolean = { !exp.getClass.getCanonicalName.equals("com.nvidia.spark.rapids.GpuExpression") && PlanUtils.getBaseNameFromClass(exp.getClass.getName) == fallbackCpuClass || exp.children.exists(didFallBack(_, fallbackCpuClass)) } - private def didFallBack(plan: SparkPlan, fallbackCpuClass: String): Boolean = { + def didFallBack(plan: SparkPlan, fallbackCpuClass: String): Boolean = { val executedPlan = ExecutionPlanCaptureCallback.extractExecutedPlan(plan) !executedPlan.getClass.getCanonicalName.equals("com.nvidia.spark.rapids.GpuExec") && PlanUtils.sameClass(executedPlan, fallbackCpuClass) || diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterAggregate.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterAggregate.scala new file mode 100644 index 00000000000..191af74a9e5 --- /dev/null +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterAggregate.scala @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids + +import ai.rapids.cudf.{ColumnVector, GroupByAggregation, Scalar} +import com.nvidia.spark.rapids.Arm.closeOnExcept +import com.nvidia.spark.rapids.GpuBloomFilterAggregate.optimalNumOfHashFunctions +import com.nvidia.spark.rapids.jni.BloomFilter + +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.internal.SQLConf.{RUNTIME_BLOOM_FILTER_MAX_NUM_BITS, RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.{CudfAggregate, GpuAggregateFunction} +import org.apache.spark.sql.types.{BinaryType, DataType} + +case class GpuBloomFilterAggregate( + child: Expression, + estimatedNumItemsRequested: Long, + numBitsRequested: Long) extends GpuAggregateFunction { + + override def nullable: Boolean = true + + override def dataType: DataType = BinaryType + + override def prettyName: String = "bloom_filter_agg" + + private val estimatedNumItems: Long = + Math.min(estimatedNumItemsRequested, SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS)) + + private val numBits: Long = Math.min(numBitsRequested, + SQLConf.get.getConf(RUNTIME_BLOOM_FILTER_MAX_NUM_BITS)) + + private lazy val numHashes: Int = optimalNumOfHashFunctions(estimatedNumItems, numBits) + + override def children: Seq[Expression] = Seq(child) + + override lazy val initialValues: Seq[Expression] = Seq(GpuLiteral(null, BinaryType)) + + override val inputProjection: Seq[Expression] = Seq(child) + + override val updateAggregates: Seq[CudfAggregate] = Seq(GpuBloomFilterUpdate(numHashes, numBits)) + + override val mergeAggregates: Seq[CudfAggregate] = Seq(GpuBloomFilterMerge) + + private lazy val bloomAttr: AttributeReference = AttributeReference("bloomFilter", dataType)() + + override def aggBufferAttributes: Seq[AttributeReference] = Seq(bloomAttr) + + override val evaluateExpression: Expression = bloomAttr +} + +object GpuBloomFilterAggregate { + /** + * From Spark's BloomFilter.optimalNumOfHashFunctions + * + * Computes the optimal k (number of hashes per item inserted in Bloom filter), given the + * expected insertions and total number of bits in the Bloom filter. + * + * See http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg for the formula. + * + * @param n expected insertions (must be positive) + * @param m total number of bits in Bloom filter (must be positive) + */ + private def optimalNumOfHashFunctions(n: Long, m: Long): Int = { + // (m / n) * log(2), but avoid truncation due to division! + Math.max(1, Math.round(m.toDouble / n * Math.log(2)).toInt) + } +} + +case class GpuBloomFilterUpdate(numHashes: Int, numBits: Long) extends CudfAggregate { + override val reductionAggregate: ColumnVector => Scalar = (col: ColumnVector) => { + closeOnExcept(BloomFilter.create(numHashes, numBits)) { bloomFilter => + BloomFilter.put(bloomFilter, col) + bloomFilter + } + } + + override lazy val groupByAggregate: GroupByAggregation = + throw new UnsupportedOperationException("group by aggregations are not supported") + + override def dataType: DataType = BinaryType + + override val name: String = "gpu_bloom_filter_update" +} + +object GpuBloomFilterMerge extends CudfAggregate { + override val reductionAggregate: ColumnVector => Scalar = (col: ColumnVector) => { + BloomFilter.merge(col) + } + + override lazy val groupByAggregate: GroupByAggregation = + throw new UnsupportedOperationException("group by aggregations are not supported") + + override def dataType: DataType = BinaryType + + override val name: String = "gpu_bloom_filter_merge" +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala index bc69ab738d2..06463a033fd 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala @@ -30,6 +30,7 @@ package com.nvidia.spark.rapids.shims import com.nvidia.spark.rapids._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate object BloomFilterShims { @@ -45,7 +46,24 @@ object BloomFilterShims { (a, conf, p, r) => new BinaryExprMeta[BloomFilterMightContain](a, conf, p, r) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuBloomFilterMightContain(lhs, rhs) - }) + }), + GpuOverrides.expr[BloomFilterAggregate]( + "Bloom filter build", + ExprChecksImpl(Map( + (ReductionAggExprContext, + ContextChecks(TypeSig.BINARY, TypeSig.BINARY, + Seq(ParamCheck("child", TypeSig.LONG, TypeSig.LONG), + ParamCheck("estimatedItems", TypeSig.lit(TypeEnum.LONG), TypeSig.LONG), + ParamCheck("numBits", TypeSig.lit(TypeEnum.LONG), TypeSig.LONG)))))), + (a, conf, p, r) => new ExprMeta[BloomFilterAggregate](a, conf, p, r) { + override def convertToGpu(): GpuExpression = { + GpuBloomFilterAggregate( + childExprs.head.convertToGpu(), + a.estimatedNumItemsExpression.eval().asInstanceOf[Number].longValue, + a.numBitsExpression.eval().asInstanceOf[Number].longValue) + } + } + ) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index c283bd7afd1..c9a58051365 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.{CreateViewCommand, ExecutedCommandExec} import org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types._ @@ -283,6 +284,7 @@ trait SparkQueryCompareTestSuite extends AnyFunSuite with BeforeAndAfterAll { } finally { cpuPlans = ExecutionPlanCaptureCallback.getResultsWithTimeout() } + cpuPlans = filterCapturedPlans(cpuPlans) assert(cpuPlans.nonEmpty, "Did not capture CPU plan") assert(cpuPlans.length == 1, s"Captured more than one CPU plan: ${cpuPlans.mkString("\n")}") @@ -301,12 +303,21 @@ trait SparkQueryCompareTestSuite extends AnyFunSuite with BeforeAndAfterAll { } finally { gpuPlans = ExecutionPlanCaptureCallback.getResultsWithTimeout() } + gpuPlans = filterCapturedPlans(gpuPlans) assert(gpuPlans.nonEmpty, "Did not capture GPU plan") assert(gpuPlans.length == 1, s"Captured more than one GPU plan: ${gpuPlans.mkString("\n")}") (cpuPlans.head, gpuPlans.head) } + // filter out "uninteresting" plans like view creation, etc. + protected def filterCapturedPlans(plans: Array[SparkPlan]): Array[SparkPlan] = { + plans.filter { + case ExecutedCommandExec(_: CreateViewCommand) => false + case _ => true + } + } + def runOnCpuAndGpuWithCapture(df: SparkSession => DataFrame, fun: DataFrame => DataFrame, conf: SparkConf = new SparkConf(), @@ -332,6 +343,7 @@ trait SparkQueryCompareTestSuite extends AnyFunSuite with BeforeAndAfterAll { } finally { cpuPlans = ExecutionPlanCaptureCallback.getResultsWithTimeout() } + cpuPlans = filterCapturedPlans(cpuPlans) assert(cpuPlans.nonEmpty, "Did not capture CPU plan") assert(cpuPlans.length == 1, s"Captured more than one CPU plan: ${cpuPlans.mkString("\n")}") @@ -351,6 +363,7 @@ trait SparkQueryCompareTestSuite extends AnyFunSuite with BeforeAndAfterAll { } finally { gpuPlans = ExecutionPlanCaptureCallback.getResultsWithTimeout() } + gpuPlans = filterCapturedPlans(gpuPlans) assert(gpuPlans.nonEmpty, "Did not capture GPU plan") assert(gpuPlans.length == 1, s"Captured more than one GPU plan: ${gpuPlans.mkString("\n")}") diff --git a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala index cac6a9a55dc..66ed7e40d59 100644 --- a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala +++ b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala @@ -27,11 +27,14 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids +import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.{BloomFilterMightContain, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg") @@ -65,40 +68,113 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { (1L to 100L).map(_ => None)).toDF("col") } - for (numEstimated <- Seq(4096L, 4194304L, Long.MaxValue, + private def withExposedSqlFuncs[T](spark: SparkSession)(func: SparkSession => T): T = { + try { + installSqlFuncs(spark) + func(spark) + } finally { + uninstallSqlFuncs(spark) + } + } + + private def doBloomFilterTest(numEstimated: Long, numBits: Long): DataFrame => DataFrame = { + df => + val table = "bloom_filter_test" + val sqlString = + s""" + |SELECT might_contain( + | (SELECT bloom_filter_agg(col, + | cast($numEstimated as long), + | cast($numBits as long)) + | FROM $table), + | col) positive_membership_test, + | might_contain( + | (SELECT bloom_filter_agg(col, + | cast($numEstimated as long), + | cast($numBits as long)) + | FROM values (-1L), (100001L), (20000L) as t(col)), + | col) negative_membership_test + |FROM $table + """.stripMargin + df.createOrReplaceTempView(table) + withExposedSqlFuncs(df.sparkSession) { spark => + spark.sql(sqlString) + } + } + + private def getPlanValidator(exec: String): (SparkPlan, SparkPlan) => Unit = { + def searchPlan(p: SparkPlan): Boolean = { + if (ExecutionPlanCaptureCallback.didFallBack(p, exec)) { + true + } else if (p.children.exists(searchPlan)) { + true + } else if (p.subqueries.exists(searchPlan)) { + true + } else { + false + } + } + (_, gpuPlan) => { + val executedPlan = ExecutionPlanCaptureCallback.extractExecutedPlan(gpuPlan) + assert(searchPlan(executedPlan), s"Could not find $exec in the GPU plan:\n$executedPlan") + } + } + + // test with GPU bloom build, GPU bloom probe + for (numEstimated <- Seq(4096L, 4194304L, SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { - for (numBits <- Seq(4096L, 4194304L, Long.MaxValue, + for (numBits <- Seq(4096L, 4194304L, SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { - ALLOW_NON_GPU_testSparkResultsAreEqual( - s"might_contain estimated=$numEstimated numBits=$numBits", + testSparkResultsAreEqual( + s"might_contain GPU build GPU probe estimated=$numEstimated numBits=$numBits", + buildData + )(doBloomFilterTest(numEstimated, numBits)) + } + } + + // test with CPU bloom build, GPU bloom probe + for (numEstimated <- Seq(4096L, 4194304L, + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { + for (numBits <- Seq(4096L, 4194304L, + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { + ALLOW_NON_GPU_testSparkResultsAreEqualWithCapture( + s"might_contain CPU build GPU probe estimated=$numEstimated numBits=$numBits", buildData, - Seq("ObjectHashAggregateExec", "ShuffleExchangeExec"))(df => - { - val table = "bloom_filter_test" - val sqlString = - s""" - |SELECT might_contain( - | (SELECT bloom_filter_agg(col, - | cast($numEstimated as long), - | cast($numBits as long)) - | FROM $table), - | col) positive_membership_test, - | might_contain( - | (SELECT bloom_filter_agg(col, - | cast($numEstimated as long), - | cast($numBits as long)) - | FROM values (-1L), (100001L), (20000L) as t(col)), - | col) negative_membership_test - |FROM $table - """.stripMargin - df.createOrReplaceTempView(table) - try { - installSqlFuncs(df.sparkSession) - df.sparkSession.sql(sqlString) - } finally { - uninstallSqlFuncs(df.sparkSession) - } - }) + Seq("ObjectHashAggregateExec", "ShuffleExchangeExec"), + conf = new SparkConf().set("spark.rapids.sql.expression.BloomFilterAggregate", "false") + )(doBloomFilterTest(numEstimated, numBits))(getPlanValidator("ObjectHashAggregateExec")) + } + } + + // test with GPU bloom build, CPU bloom probe + for (numEstimated <- Seq(4096L, 4194304L, + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { + for (numBits <- Seq(4096L, 4194304L, + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { + ALLOW_NON_GPU_testSparkResultsAreEqualWithCapture( + s"might_contain GPU build CPU probe estimated=$numEstimated numBits=$numBits", + buildData, + Seq("LocalTableScanExec", "ProjectExec", "ShuffleExchangeExec"), + conf = new SparkConf().set("spark.rapids.sql.expression.BloomFilterMightContain", "false") + )(doBloomFilterTest(numEstimated, numBits))(getPlanValidator("ProjectExec")) + } + } + + // test with partial/final-only GPU bloom build, CPU bloom probe + for (mode <- Seq("partial", "final")) { + for (numEstimated <- Seq(4096L, 4194304L, + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { + for (numBits <- Seq(4096L, 4194304L, + SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { + ALLOW_NON_GPU_testSparkResultsAreEqualWithCapture( + s"might_contain GPU $mode build CPU probe estimated=$numEstimated numBits=$numBits", + buildData, + Seq("ObjectHashAggregateExec", "ProjectExec", "ShuffleExchangeExec"), + conf = new SparkConf() + .set("spark.rapids.sql.expression.BloomFilterMightContain", "false") + .set("spark.rapids.sql.hashAgg.replaceMode", mode) + )(doBloomFilterTest(numEstimated, numBits))(getPlanValidator("ObjectHashAggregateExec")) + } } } @@ -106,33 +182,35 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { "might_contain with literal bloom filter buffer", spark => spark.range(1, 1).asInstanceOf[DataFrame]) { df => - try { - installSqlFuncs(df.sparkSession) - df.sparkSession.sql( + withExposedSqlFuncs(df.sparkSession) { spark => + spark.sql( """SELECT might_contain( |X'00000001000000050000000343A2EC6EA8C117E2D3CDB767296B144FC5BFBCED9737F267', |cast(201 as long))""".stripMargin) - } finally { - uninstallSqlFuncs(df.sparkSession) } } - ALLOW_NON_GPU_testSparkResultsAreEqual( + testSparkResultsAreEqual( "might_contain with all NULL inputs", - spark => spark.range(1, 1).asInstanceOf[DataFrame], - Seq("ObjectHashAggregateExec", "ShuffleExchangeExec")) { + spark => spark.range(1, 1).asInstanceOf[DataFrame]) { df => - try { - installSqlFuncs(df.sparkSession) - df.sparkSession.sql( + withExposedSqlFuncs(df.sparkSession) { spark => + spark.sql( """ |SELECT might_contain(null, null) both_null, | might_contain(null, 1L) null_bf, | might_contain((SELECT bloom_filter_agg(cast(id as long)) from range(1, 10000)), | null) null_value """.stripMargin) - } finally { - uninstallSqlFuncs(df.sparkSession) + } + } + + testSparkResultsAreEqual( + "bloom_filter_agg with empty input", + spark => spark.range(1, 1).asInstanceOf[DataFrame]) { + df => + withExposedSqlFuncs(df.sparkSession) { spark => + spark.sql("""SELECT bloom_filter_agg(cast(id as long)) from range(1, 1)""") } } } From 9202839e03e8cd3991768f369424c3f412d6ed0a Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 3 Aug 2023 11:26:29 -0500 Subject: [PATCH 07/12] Fix tests to skip on Databricks and check for specific classes --- .../src/main/python/join_test.py | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 19239d4a800..c065dd30301 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -20,7 +20,7 @@ from conftest import is_databricks_runtime, is_emr_runtime from data_gen import * from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan -from spark_session import with_cpu_session, is_before_spark_330 +from spark_session import with_cpu_session, is_before_spark_330, is_databricks_runtime pytestmark = [pytest.mark.nightly_resource_consuming_test] @@ -931,7 +931,7 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join, conf=_all_conf) -def check_bloom_filter_join(confs): +def check_bloom_filter_join(confs, expected_classes): def do_join(spark): left = spark.range(100000) right = spark.range(10).withColumn("id2", col("id").cast("string")) @@ -940,29 +940,36 @@ def do_join(spark): "spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold": 1, "spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold": "100GB", "spark.sql.optimizer.runtime.bloomFilter.enabled": "true"}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=all_confs) + assert_cpu_and_gpu_are_equal_collect_with_capture(do_join, expected_classes, conf=all_confs) @ignore_order(local=True) +@pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/8921") @pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") def test_bloom_filter_join(): - check_bloom_filter_join(confs={}) + check_bloom_filter_join(confs={}, + expected_classes="GpuBloomFilterMightContain,GpuBloomFilterAggregate") @allow_non_gpu("FilterExec", "ShuffleExchangeExec") @ignore_order(local=True) +@pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/8921") @pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") def test_bloom_filter_join_cpu_probe(): - check_bloom_filter_join(confs={"spark.rapids.sql.expression.BloomFilterMightContain": "false"}) + check_bloom_filter_join(confs={"spark.rapids.sql.expression.BloomFilterMightContain": "false"}, + expected_classes="BloomFilterMightContain,GpuBloomFilterAggregate") @allow_non_gpu("ObjectHashAggregateExec", "ShuffleExchangeExec") @ignore_order(local=True) +@pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/8921") @pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") def test_bloom_filter_join_cpu_build(): - check_bloom_filter_join(confs={"spark.rapids.sql.expression.BloomFilterAggregate": "false"}) + check_bloom_filter_join(confs={"spark.rapids.sql.expression.BloomFilterAggregate": "false"}, + expected_classes="GpuBloomFilterMightContain,BloomFilterAggregate") @allow_non_gpu("ObjectHashAggregateExec", "ProjectExec", "ShuffleExchangeExec") @ignore_order(local=True) @pytest.mark.parametrize("agg_replace_mode", ["partial", "final"]) +@pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/8921") @pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") -def test_bloom_filter_join_cpu_build(agg_replace_mode): - check_bloom_filter_join(confs={"spark.rapids.sql.expression.BloomFilterAggregate": "false", - "spark.rapids.sql.hashAgg.replaceMode": agg_replace_mode}) +def test_bloom_filter_join_split_cpu_build(agg_replace_mode): + check_bloom_filter_join(confs={"spark.rapids.sql.hashAgg.replaceMode": agg_replace_mode}, + expected_classes="GpuBloomFilterMightContain,BloomFilterAggregate,GpuBloomFilterAggregate") From 2e5f43fb243f8de0bc6de00d9dd20d2d148b4225 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 7 Aug 2023 09:14:51 -0500 Subject: [PATCH 08/12] Reduce test case combinations, focus most tests on CPU/GPU interop --- .../BloomFilterAggregateQuerySuite.scala | 28 ++++++------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala index 66ed7e40d59..4cb65dde2a3 100644 --- a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala +++ b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala @@ -104,15 +104,9 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { private def getPlanValidator(exec: String): (SparkPlan, SparkPlan) => Unit = { def searchPlan(p: SparkPlan): Boolean = { - if (ExecutionPlanCaptureCallback.didFallBack(p, exec)) { - true - } else if (p.children.exists(searchPlan)) { - true - } else if (p.subqueries.exists(searchPlan)) { - true - } else { - false - } + ExecutionPlanCaptureCallback.didFallBack(p, exec) || + p.children.exists(searchPlan) || + p.subqueries.exists(searchPlan) } (_, gpuPlan) => { val executedPlan = ExecutionPlanCaptureCallback.extractExecutedPlan(gpuPlan) @@ -121,10 +115,8 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { } // test with GPU bloom build, GPU bloom probe - for (numEstimated <- Seq(4096L, 4194304L, - SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { - for (numBits <- Seq(4096L, 4194304L, - SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { + for (numEstimated <- Seq(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { + for (numBits <- Seq(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { testSparkResultsAreEqual( s"might_contain GPU build GPU probe estimated=$numEstimated numBits=$numBits", buildData @@ -133,7 +125,7 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { } // test with CPU bloom build, GPU bloom probe - for (numEstimated <- Seq(4096L, 4194304L, + for (numEstimated <- Seq(4096L, 4194304L, Long.MaxValue, SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { for (numBits <- Seq(4096L, 4194304L, SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { @@ -147,7 +139,7 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { } // test with GPU bloom build, CPU bloom probe - for (numEstimated <- Seq(4096L, 4194304L, + for (numEstimated <- Seq(4096L, 4194304L, Long.MaxValue, SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { for (numBits <- Seq(4096L, 4194304L, SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { @@ -162,10 +154,8 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { // test with partial/final-only GPU bloom build, CPU bloom probe for (mode <- Seq("partial", "final")) { - for (numEstimated <- Seq(4096L, 4194304L, - SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { - for (numBits <- Seq(4096L, 4194304L, - SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { + for (numEstimated <- Seq(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.defaultValue.get)) { + for (numBits <- Seq(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { ALLOW_NON_GPU_testSparkResultsAreEqualWithCapture( s"might_contain GPU $mode build CPU probe estimated=$numEstimated numBits=$numBits", buildData, From b6d9b69600fce2076773e23beb765260e8441710 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 8 Aug 2023 14:36:33 -0500 Subject: [PATCH 09/12] Disable Bloom filter join expressions by default --- .../spark/rapids/shims/BloomFilterShims.scala | 5 ++-- .../BloomFilterAggregateQuerySuite.scala | 23 +++++++++++++------ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala index 06463a033fd..37206988cbc 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala @@ -46,7 +46,7 @@ object BloomFilterShims { (a, conf, p, r) => new BinaryExprMeta[BloomFilterMightContain](a, conf, p, r) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuBloomFilterMightContain(lhs, rhs) - }), + }).disabledByDefault("Bloom filter join acceleration is experimental"), GpuOverrides.expr[BloomFilterAggregate]( "Bloom filter build", ExprChecksImpl(Map( @@ -62,8 +62,7 @@ object BloomFilterShims { a.estimatedNumItemsExpression.eval().asInstanceOf[Number].longValue, a.numBitsExpression.eval().asInstanceOf[Number].longValue) } - } - ) + }).disabledByDefault("Bloom filter join acceleration is experimental") ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap } } diff --git a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala index 4cb65dde2a3..01dc4eafac1 100644 --- a/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala +++ b/tests/src/test/spark330/scala/com/nvidia/spark/rapids/BloomFilterAggregateQuerySuite.scala @@ -37,6 +37,9 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { + val bloomFilterEnabledConf = new SparkConf() + .set("spark.rapids.sql.expression.BloomFilterMightContain", "true") + .set("spark.rapids.sql.expression.BloomFilterAggregate", "true") val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg") val funcId_might_contain = new FunctionIdentifier("might_contain") @@ -119,7 +122,8 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { for (numBits <- Seq(SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.defaultValue.get)) { testSparkResultsAreEqual( s"might_contain GPU build GPU probe estimated=$numEstimated numBits=$numBits", - buildData + buildData, + conf = bloomFilterEnabledConf.clone() )(doBloomFilterTest(numEstimated, numBits)) } } @@ -133,7 +137,8 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { s"might_contain CPU build GPU probe estimated=$numEstimated numBits=$numBits", buildData, Seq("ObjectHashAggregateExec", "ShuffleExchangeExec"), - conf = new SparkConf().set("spark.rapids.sql.expression.BloomFilterAggregate", "false") + conf = bloomFilterEnabledConf.clone() + .set("spark.rapids.sql.expression.BloomFilterAggregate", "false") )(doBloomFilterTest(numEstimated, numBits))(getPlanValidator("ObjectHashAggregateExec")) } } @@ -147,7 +152,8 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { s"might_contain GPU build CPU probe estimated=$numEstimated numBits=$numBits", buildData, Seq("LocalTableScanExec", "ProjectExec", "ShuffleExchangeExec"), - conf = new SparkConf().set("spark.rapids.sql.expression.BloomFilterMightContain", "false") + conf = bloomFilterEnabledConf.clone() + .set("spark.rapids.sql.expression.BloomFilterMightContain", "false") )(doBloomFilterTest(numEstimated, numBits))(getPlanValidator("ProjectExec")) } } @@ -160,7 +166,7 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { s"might_contain GPU $mode build CPU probe estimated=$numEstimated numBits=$numBits", buildData, Seq("ObjectHashAggregateExec", "ProjectExec", "ShuffleExchangeExec"), - conf = new SparkConf() + conf = bloomFilterEnabledConf.clone() .set("spark.rapids.sql.expression.BloomFilterMightContain", "false") .set("spark.rapids.sql.hashAgg.replaceMode", mode) )(doBloomFilterTest(numEstimated, numBits))(getPlanValidator("ObjectHashAggregateExec")) @@ -170,7 +176,8 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { testSparkResultsAreEqual( "might_contain with literal bloom filter buffer", - spark => spark.range(1, 1).asInstanceOf[DataFrame]) { + spark => spark.range(1, 1).asInstanceOf[DataFrame], + conf=bloomFilterEnabledConf.clone()) { df => withExposedSqlFuncs(df.sparkSession) { spark => spark.sql( @@ -182,7 +189,8 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { testSparkResultsAreEqual( "might_contain with all NULL inputs", - spark => spark.range(1, 1).asInstanceOf[DataFrame]) { + spark => spark.range(1, 1).asInstanceOf[DataFrame], + conf=bloomFilterEnabledConf.clone()) { df => withExposedSqlFuncs(df.sparkSession) { spark => spark.sql( @@ -197,7 +205,8 @@ class BloomFilterAggregateQuerySuite extends SparkQueryCompareTestSuite { testSparkResultsAreEqual( "bloom_filter_agg with empty input", - spark => spark.range(1, 1).asInstanceOf[DataFrame]) { + spark => spark.range(1, 1).asInstanceOf[DataFrame], + conf=bloomFilterEnabledConf.clone()) { df => withExposedSqlFuncs(df.sparkSession) { spark => spark.sql("""SELECT bloom_filter_agg(cast(id as long)) from range(1, 1)""") From 707da456db8e0113b8654ef333c14edcf4987ff6 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 8 Aug 2023 14:45:57 -0500 Subject: [PATCH 10/12] Add literal tags for Spark type --- .../com/nvidia/spark/rapids/shims/BloomFilterShims.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala index 37206988cbc..2e3573546ed 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/BloomFilterShims.scala @@ -53,8 +53,10 @@ object BloomFilterShims { (ReductionAggExprContext, ContextChecks(TypeSig.BINARY, TypeSig.BINARY, Seq(ParamCheck("child", TypeSig.LONG, TypeSig.LONG), - ParamCheck("estimatedItems", TypeSig.lit(TypeEnum.LONG), TypeSig.LONG), - ParamCheck("numBits", TypeSig.lit(TypeEnum.LONG), TypeSig.LONG)))))), + ParamCheck("estimatedItems", + TypeSig.lit(TypeEnum.LONG), TypeSig.lit(TypeEnum.LONG)), + ParamCheck("numBits", + TypeSig.lit(TypeEnum.LONG), TypeSig.lit(TypeEnum.LONG))))))), (a, conf, p, r) => new ExprMeta[BloomFilterAggregate](a, conf, p, r) { override def convertToGpu(): GpuExpression = { GpuBloomFilterAggregate( From 5c355569e392cac3d61683115ffbb39540152c88 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 8 Aug 2023 18:36:20 -0500 Subject: [PATCH 11/12] Fix two-column Bloom filter joins --- integration_tests/src/main/python/join_test.py | 12 +++--------- .../spark/rapids/GpuBloomFilterAggregate.scala | 4 ++-- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index ea446cced91..b49e014e069 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -966,9 +966,7 @@ def test_bloom_filter_disabled_by_default(is_multi_column): is_multi_column=is_multi_column) @ignore_order(local=True) -@pytest.mark.parametrize("is_multi_column", [ - False, - pytest.param(True, marks=pytest.mark.xfail(reason="badness"))], ids=idfn) +@pytest.mark.parametrize("is_multi_column", [False, True], ids=idfn) @pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/8921") @pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") def test_bloom_filter_join(is_multi_column): @@ -978,9 +976,7 @@ def test_bloom_filter_join(is_multi_column): @allow_non_gpu("FilterExec", "ShuffleExchangeExec") @ignore_order(local=True) -@pytest.mark.parametrize("is_multi_column", [ - False, - pytest.param(True, marks=pytest.mark.xfail(reason="badness"))], ids=idfn) +@pytest.mark.parametrize("is_multi_column", [False, True], ids=idfn) @pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/8921") @pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") def test_bloom_filter_join_cpu_probe(is_multi_column): @@ -1005,9 +1001,7 @@ def test_bloom_filter_join_cpu_build(is_multi_column): @allow_non_gpu("ObjectHashAggregateExec", "ProjectExec", "ShuffleExchangeExec") @ignore_order(local=True) @pytest.mark.parametrize("agg_replace_mode", ["partial", "final"]) -@pytest.mark.parametrize("is_multi_column", [ - False, - pytest.param(True, marks=pytest.mark.xfail(reason="badness"))], ids=idfn) +@pytest.mark.parametrize("is_multi_column", [False, True], ids=idfn) @pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/8921") @pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") def test_bloom_filter_join_split_cpu_build(agg_replace_mode, is_multi_column): diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterAggregate.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterAggregate.scala index 191af74a9e5..f791dca620a 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterAggregate.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuBloomFilterAggregate.scala @@ -65,7 +65,7 @@ case class GpuBloomFilterAggregate( override val updateAggregates: Seq[CudfAggregate] = Seq(GpuBloomFilterUpdate(numHashes, numBits)) - override val mergeAggregates: Seq[CudfAggregate] = Seq(GpuBloomFilterMerge) + override val mergeAggregates: Seq[CudfAggregate] = Seq(GpuBloomFilterMerge()) private lazy val bloomAttr: AttributeReference = AttributeReference("bloomFilter", dataType)() @@ -108,7 +108,7 @@ case class GpuBloomFilterUpdate(numHashes: Int, numBits: Long) extends CudfAggre override val name: String = "gpu_bloom_filter_update" } -object GpuBloomFilterMerge extends CudfAggregate { +case class GpuBloomFilterMerge() extends CudfAggregate { override val reductionAggregate: ColumnVector => Scalar = (col: ColumnVector) => { BloomFilter.merge(col) } From b4af8ad10b07a3060d85f165684a3876ddd2f844 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 8 Aug 2023 18:42:10 -0500 Subject: [PATCH 12/12] Add batch size limit test --- integration_tests/src/main/python/join_test.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index b49e014e069..8f9bffe82d7 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -966,11 +966,14 @@ def test_bloom_filter_disabled_by_default(is_multi_column): is_multi_column=is_multi_column) @ignore_order(local=True) +@pytest.mark.parametrize("batch_size", ['1g', '1000'], ids=idfn) @pytest.mark.parametrize("is_multi_column", [False, True], ids=idfn) @pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/8921") @pytest.mark.skipif(is_before_spark_330(), reason="Bloom filter joins added in Spark 3.3.0") -def test_bloom_filter_join(is_multi_column): - check_bloom_filter_join(confs=bloom_filter_exprs_enabled, +def test_bloom_filter_join(batch_size, is_multi_column): + conf = copy_and_update(bloom_filter_exprs_enabled, + {"spark.rapids.sql.batchSizeBytes": batch_size}) + check_bloom_filter_join(confs=conf, expected_classes="GpuBloomFilterMightContain,GpuBloomFilterAggregate", is_multi_column=is_multi_column)