Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move GpuPartitioning to a separate file #338

Merged
merged 1 commit into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{ContiguousTable, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.rapids.GpuShuffleEnv
import org.apache.spark.sql.vectorized.ColumnarBatch

trait GpuPartitioning extends Partitioning {

def sliceBatch(vectors: Array[RapidsHostColumnVector], start: Int, end: Int): ColumnarBatch = {
var ret: ColumnarBatch = null
val count = end - start
if (count > 0) {
ret = new ColumnarBatch(vectors.map(vec => new SlicedGpuColumnVector(vec, start, end)))
ret.setNumRows(count)
}
ret
}

def sliceInternalOnGpu(batch: ColumnarBatch, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// The first index will always be 0, so we need to skip it.
val batches = if (batch.numRows > 0) {
val parts = partitionIndexes.slice(1, partitionIndexes.length)
val splits = new ArrayBuffer[ColumnarBatch](numPartitions)
val table = new Table(partitionColumns.map(_.getBase).toArray: _*)
val contiguousTables: Array[ContiguousTable] = try {
table.contiguousSplit(parts: _*)
} finally {
table.close()
}
var succeeded = false
try {
contiguousTables.foreach { ct => splits.append(GpuColumnVectorFromBuffer.from(ct)) }
succeeded = true
} finally {
contiguousTables.foreach(_.close())
if (!succeeded) {
splits.foreach(_.close())
}
}
splits.toArray
} else {
Array[ColumnarBatch]()
}

GpuSemaphore.releaseIfNecessary(TaskContext.get())
batches
}

def sliceInternalOnCpu(batch: ColumnarBatch, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// We need to make sure that we have a null count calculated ahead of time.
// This should be a temp work around.
partitionColumns.foreach(_.getBase.getNullCount)

val hostPartColumns = partitionColumns.map(_.copyToHost())
try {
// Leaving the GPU for a while
GpuSemaphore.releaseIfNecessary(TaskContext.get())

val ret = new Array[ColumnarBatch](numPartitions)
var start = 0
for (i <- 1 until Math.min(numPartitions, partitionIndexes.length)) {
val idx = partitionIndexes(i)
ret(i - 1) = sliceBatch(hostPartColumns, start, idx)
start = idx
}
ret(numPartitions - 1) = sliceBatch(hostPartColumns, start, batch.numRows())
ret
} finally {
hostPartColumns.safeClose()
}
}

def sliceInternalGpuOrCpu(batch: ColumnarBatch, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
val rapidsShuffleEnabled = GpuShuffleEnv.isRapidsShuffleEnabled
val nvtxRangeKey = if (rapidsShuffleEnabled) {
"sliceInternalOnGpu"
} else {
"sliceInternalOnCpu"
}
// If we are not using the Rapids shuffle we fall back to CPU splits way to avoid the hit
// for large number of small splits.
val sliceRange = new NvtxRange(nvtxRangeKey, NvtxColor.CYAN)
try {
if (rapidsShuffleEnabled) {
sliceInternalOnGpu(batch, partitionIndexes, partitionColumns)
} else {
sliceInternalOnCpu(batch, partitionIndexes, partitionColumns)
}
} finally {
sliceRange.close()
}
}
}
108 changes: 1 addition & 107 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,116 +20,18 @@ import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf._
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext}
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.rapids.GpuShuffleEnv
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.sql.vectorized.ColumnarBatch

trait GpuPartitioning extends Partitioning {

def sliceBatch(vectors: Array[RapidsHostColumnVector], start: Int, end: Int): ColumnarBatch = {
var ret: ColumnarBatch = null
val count = end - start
if (count > 0) {
ret = new ColumnarBatch(vectors.map(vec => new SlicedGpuColumnVector(vec, start, end)))
ret.setNumRows(count)
}
ret
}

def sliceInternalOnGpu(batch: ColumnarBatch, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// The first index will always be 0, so we need to skip it.
val batches = if (batch.numRows > 0) {
val parts = partitionIndexes.slice(1, partitionIndexes.length)
val splits = new ArrayBuffer[ColumnarBatch](numPartitions)
val table = new Table(partitionColumns.map(_.getBase).toArray: _*)
val contiguousTables: Array[ContiguousTable] = try {
table.contiguousSplit(parts: _*)
} finally {
table.close()
}
var succeeded = false
try {
contiguousTables.foreach { ct => splits.append(GpuColumnVectorFromBuffer.from(ct)) }
succeeded = true
} finally {
contiguousTables.foreach(_.close())
if (!succeeded) {
splits.foreach(_.close())
}
}
splits.toArray
} else {
Array[ColumnarBatch]()
}

GpuSemaphore.releaseIfNecessary(TaskContext.get())
batches
}

def sliceInternalOnCpu(batch: ColumnarBatch, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
// We need to make sure that we have a null count calculated ahead of time.
// This should be a temp work around.
partitionColumns.foreach(_.getBase.getNullCount)

val hostPartColumns = partitionColumns.map(_.copyToHost())
try {
// Leaving the GPU for a while
GpuSemaphore.releaseIfNecessary(TaskContext.get())

val ret = new Array[ColumnarBatch](numPartitions)
var start = 0
for (i <- 1 until Math.min(numPartitions, partitionIndexes.length)) {
val idx = partitionIndexes(i)
ret(i - 1) = sliceBatch(hostPartColumns, start, idx)
start = idx
}
ret(numPartitions - 1) = sliceBatch(hostPartColumns, start, batch.numRows())
ret
} finally {
hostPartColumns.safeClose()
}
}

def sliceInternalGpuOrCpu(batch: ColumnarBatch, partitionIndexes: Array[Int],
partitionColumns: Array[GpuColumnVector]): Array[ColumnarBatch] = {
val rapidsShuffleEnabled = GpuShuffleEnv.isRapidsShuffleEnabled
val nvtxRangeKey = if (rapidsShuffleEnabled) {
"sliceInternalOnGpu"
} else {
"sliceInternalOnCpu"
}
// If we are not using the Rapids shuffle we fall back to CPU splits way to avoid the hit
// for large number of small splits.
val sliceRange = new NvtxRange(nvtxRangeKey, NvtxColor.CYAN)
try {
if (rapidsShuffleEnabled) {
sliceInternalOnGpu(batch, partitionIndexes, partitionColumns)
} else {
sliceInternalOnCpu(batch, partitionIndexes, partitionColumns)
}
} finally {
sliceRange.close()
}
}
}

case class ColumnarOverrideRules() extends ColumnarRule with Logging {
val overrides: Rule[SparkPlan] = GpuOverrides()
Expand All @@ -151,14 +53,6 @@ class SQLExecPlugin extends (SparkSessionExtensions => Unit) with Logging {
}
}

trait GpuSpillable {
/**
* Spill GPU memory if possible
* @param target the amount of memory that we want to try and spill.
*/
def spill(target: Long): Unit
}

object RapidsPluginUtils extends Logging {
private val SQL_PLUGIN_NAME = classOf[SQLExecPlugin].getName
private val SQL_PLUGIN_CONF_KEY = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key
Expand Down