Skip to content

Commit

Permalink
Add Support for Renaming of PythonMapInArrow [databricks] (NVIDIA#10931)
Browse files Browse the repository at this point in the history
* Add support for the renaming of PythonMapInArrow to MapInArrow

* Signing off

Signed-off-by: Raza Jafri <[email protected]>

* Removed the unnecessary base class from 400

* addressed review comments

---------

Signed-off-by: Raza Jafri <[email protected]>
  • Loading branch information
razajafri authored and SurajAralihalli committed Jul 12, 2024
1 parent 02497b4 commit e1db179
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
{"spark": "343"}
{"spark": "350"}
{"spark": "351"}
{"spark": "400"}
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.shims

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
/*** spark-rapids-shim-json-lines
{"spark": "350"}
{"spark": "351"}
{"spark": "400"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
/*** spark-rapids-shim-json-lines
{"spark": "350"}
{"spark": "351"}
{"spark": "400"}
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.shims

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "400"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.MapInArrowExec
import org.apache.spark.sql.rapids.shims.GpuMapInArrowExecMeta

object PythonMapInArrowExecShims {

def execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = Seq(
GpuOverrides.exec[MapInArrowExec](
"The backend for Map Arrow Iterator UDF. Accelerates the data transfer between the" +
" Java process and the Python process. It also supports scheduling GPU resources" +
" for the Python process when enabled.",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
TypeSig.all),
(mapPy, conf, p, r) => new GpuMapInArrowExecMeta(mapPy, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "400"}
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.rapids.shims

import com.nvidia.spark.rapids._

import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.MapInArrowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.execution.python.GpuMapInBatchExec
import org.apache.spark.sql.types.{BinaryType, StringType}

class GpuMapInArrowExecMeta(
mapArrow: MapInArrowExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends SparkPlanMeta[MapInArrowExec](mapArrow, conf, parent, rule) {
override def replaceMessage: String = "partially run on GPU"

override def noReplacementPossibleMessage(reasons: String): String =
s"cannot run even partially on the GPU because $reasons"

protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
mapArrow.func.asInstanceOf[PythonUDF], conf, Some(this))
protected val resultAttrs: Seq[BaseExprMeta[Attribute]] =
mapArrow.output.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf

override def tagPlanForGpu(): Unit = {
super.tagPlanForGpu()
if (SQLConf.get.getConf(SQLConf.ARROW_EXECUTION_USE_LARGE_VAR_TYPES)) {

val inputTypes = mapArrow.child.schema.fields.map(_.dataType)
val outputTypes = mapArrow.output.map(_.dataType)

val hasStringOrBinaryTypes = (inputTypes ++ outputTypes).exists(dataType =>
TrampolineUtil.dataTypeExistsRecursively(dataType,
dt => dt == StringType || dt == BinaryType))

if (hasStringOrBinaryTypes) {
willNotWorkOnGpu(s"${SQLConf.ARROW_EXECUTION_USE_LARGE_VAR_TYPES.key} is " +
s"enabled and the schema contains string or binary types. This is not " +
s"supported on the GPU.")
}
}
}

override def convertToGpu(): GpuExec =
GpuMapInArrowExec(
udf.convertToGpu(),
resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
childPlans.head.convertIfNeeded(),
isBarrier = mapArrow.isBarrier,
)
}

/*
* A relation produced by applying a function that takes an iterator of PyArrow's record
* batches and outputs an iterator of PyArrow's record batches.
*
* This GpuMapInPandasExec aims at accelerating the data transfer between
* JVM and Python, and scheduling GPU resources for its Python processes.
*
*/
case class GpuMapInArrowExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan,
override val isBarrier: Boolean) extends GpuMapInBatchExec {

override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_ARROW_ITER_UDF
}

0 comments on commit e1db179

Please sign in to comment.