diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index d1ede273963..d9fc64bd79a 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -73,6 +73,14 @@ mvn_verify() { # Triggering here until we change the jenkins file rapids_shuffle_smoke_test + + # non-caller classloader smoke test in pseudo-distributed + # standalone cluster + echo "Running test_cartesian_join_special_case_count with spark.rapids.force.caller.classloader=false" + PYSP_TEST_spark_rapids_force_caller_classloader=false \ + NUM_LOCAL_EXECS=1 \ + TEST_PARALLEL=0 \ + ./integration_tests/run_pyspark_from_build.sh -k 'test_cartesian_join_special_case_count[100]' } rapids_shuffle_smoke_test() { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala index 3b4661027f7..91452ffc4e4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import java.util -import ai.rapids.cudf.{HostConcatResultUtil, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} +import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange} import ai.rapids.cudf.JCudfSerialization.{HostConcatResult, SerializedTableHeader} import com.nvidia.spark.rapids.shims.ShimUnaryExecNode @@ -103,7 +103,7 @@ class HostShuffleCoalesceIterator( val firstHeader = serializedTables.peekFirst().header if (firstHeader.getNumColumns == 0) { (0 until numTablesInBatch).foreach(_ => serializedTables.removeFirst()) - HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch) + cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch) } else { val headers = new Array[SerializedTableHeader](numTablesInBatch) withResource(new Array[HostMemoryBuffer](numTablesInBatch)) { buffers => @@ -211,7 +211,7 @@ class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult], // generate GPU data from batches that are empty. GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWaitTime) withResource(new MetricRange(opTimeMetric)) { _ => - val batch = HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) + val batch = cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) outputBatchesMetric += 1 outputRowsMetric += batch.numRows() batch diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index 2412350faad..6cbc6f79dfd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{HostConcatResultUtil, NvtxColor, NvtxRange} +import ai.rapids.cudf.{NvtxColor, NvtxRange} import ai.rapids.cudf.JCudfSerialization.HostConcatResult import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode} @@ -364,7 +364,7 @@ object GpuShuffledHashJoinExec extends Arm { // we can bring the build batch to the GPU now withResource(hostConcatResult) { _ => buildTime.ns { - HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) + cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes) } } } diff --git a/sql-plugin/src/main/scala/ai/rapids/cudf/HostConcatResultUtil.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/cudf_utils/HostConcatResultUtil.scala similarity index 92% rename from sql-plugin/src/main/scala/ai/rapids/cudf/HostConcatResultUtil.scala rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/cudf_utils/HostConcatResultUtil.scala index 30d7289c902..188b0b1d683 100644 --- a/sql-plugin/src/main/scala/ai/rapids/cudf/HostConcatResultUtil.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/cudf_utils/HostConcatResultUtil.scala @@ -14,8 +14,9 @@ * limitations under the License. */ -package ai.rapids.cudf +package com.nvidia.spark.rapids.cudf_utils +import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization} import ai.rapids.cudf.JCudfSerialization.HostConcatResult import com.nvidia.spark.rapids.{Arm, GpuColumnVectorFromBuffer} @@ -28,8 +29,7 @@ object HostConcatResultUtil extends Arm { */ def rowsOnlyHostConcatResult(numRows: Int): HostConcatResult = { new HostConcatResult( - new JCudfSerialization.SerializedTableHeader( - Array.empty, numRows, 0L), + new JCudfSerialization.SerializedTableHeader(numRows), HostMemoryBuffer.allocate(0, false)) }