Skip to content

Commit

Permalink
Move HostConcatResultUtil out of unshimmed classes (#5614)
Browse files Browse the repository at this point in the history
Don't use ai.rapidsai.cudf package for spark-rapids Scala classes. Otherwise it is going to be loaded by conventional classloader and fail to load referenced classes out of the shimmed areas.
- Move the class 
- add a smoking test to prevent this sort of regressions in premerge

Closes #5513. 

Depends on rapidsai/cudf#10949
    
Signed-off-by: Gera Shegalov <[email protected]>
  • Loading branch information
gerashegalov authored May 25, 2022
1 parent 808bcc2 commit c588a1f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 8 deletions.
8 changes: 8 additions & 0 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ mvn_verify() {

# Triggering here until we change the jenkins file
rapids_shuffle_smoke_test

# non-caller classloader smoke test in pseudo-distributed
# standalone cluster
echo "Running test_cartesian_join_special_case_count with spark.rapids.force.caller.classloader=false"
PYSP_TEST_spark_rapids_force_caller_classloader=false \
NUM_LOCAL_EXECS=1 \
TEST_PARALLEL=0 \
./integration_tests/run_pyspark_from_build.sh -k 'test_cartesian_join_special_case_count[100]'
}

rapids_shuffle_smoke_test() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import java.util

import ai.rapids.cudf.{HostConcatResultUtil, HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange}
import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization, NvtxColor, NvtxRange}
import ai.rapids.cudf.JCudfSerialization.{HostConcatResult, SerializedTableHeader}
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode

Expand Down Expand Up @@ -103,7 +103,7 @@ class HostShuffleCoalesceIterator(
val firstHeader = serializedTables.peekFirst().header
if (firstHeader.getNumColumns == 0) {
(0 until numTablesInBatch).foreach(_ => serializedTables.removeFirst())
HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch)
cudf_utils.HostConcatResultUtil.rowsOnlyHostConcatResult(numRowsInBatch)
} else {
val headers = new Array[SerializedTableHeader](numTablesInBatch)
withResource(new Array[HostMemoryBuffer](numTablesInBatch)) { buffers =>
Expand Down Expand Up @@ -211,7 +211,7 @@ class GpuShuffleCoalesceIterator(iter: Iterator[HostConcatResult],
// generate GPU data from batches that are empty.
GpuSemaphore.acquireIfNecessary(TaskContext.get(), semWaitTime)
withResource(new MetricRange(opTimeMetric)) { _ =>
val batch = HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)
val batch = cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)
outputBatchesMetric += 1
outputRowsMetric += batch.numRows()
batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids

import ai.rapids.cudf.{HostConcatResultUtil, NvtxColor, NvtxRange}
import ai.rapids.cudf.{NvtxColor, NvtxRange}
import ai.rapids.cudf.JCudfSerialization.HostConcatResult
import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, ShimBinaryExecNode}

Expand Down Expand Up @@ -364,7 +364,7 @@ object GpuShuffledHashJoinExec extends Arm {
// we can bring the build batch to the GPU now
withResource(hostConcatResult) { _ =>
buildTime.ns {
HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)
cudf_utils.HostConcatResultUtil.getColumnarBatch(hostConcatResult, dataTypes)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
* limitations under the License.
*/

package ai.rapids.cudf
package com.nvidia.spark.rapids.cudf_utils

import ai.rapids.cudf.{HostMemoryBuffer, JCudfSerialization}
import ai.rapids.cudf.JCudfSerialization.HostConcatResult
import com.nvidia.spark.rapids.{Arm, GpuColumnVectorFromBuffer}

Expand All @@ -28,8 +29,7 @@ object HostConcatResultUtil extends Arm {
*/
def rowsOnlyHostConcatResult(numRows: Int): HostConcatResult = {
new HostConcatResult(
new JCudfSerialization.SerializedTableHeader(
Array.empty, numRows, 0L),
new JCudfSerialization.SerializedTableHeader(numRows),
HostMemoryBuffer.allocate(0, false))
}

Expand Down

0 comments on commit c588a1f

Please sign in to comment.