Skip to content

Commit

Permalink
Support columnar processing for mapInArrow[databricks] (#6823)
Browse files Browse the repository at this point in the history
* Support columnar processing for mapInArrow

Signed-off-by: Liangcai Li <[email protected]>
  • Loading branch information
firestarman authored Oct 20, 2022
1 parent 1b7f437 commit 1223370
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 108 deletions.
20 changes: 18 additions & 2 deletions integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest

from conftest import is_at_least_precommit_run
from spark_session import is_databricks91_or_later
from spark_session import is_databricks91_or_later, is_before_spark_330

from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version
try:
Expand All @@ -34,11 +34,12 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from data_gen import *
from marks import incompat, approximate_float, allow_non_gpu, ignore_order
from marks import approximate_float, allow_non_gpu, ignore_order
from pyspark.sql import Window
from pyspark.sql.types import *
import pyspark.sql.functions as f
import pandas as pd
import pyarrow
from typing import Iterator, Tuple

arrow_udf_conf = {
Expand Down Expand Up @@ -349,3 +350,18 @@ def do_it(spark):
right.groupby('a')).applyInPandas(
asof_join, schema="a int, b int")
assert_gpu_fallback_collect(do_it, 'FlatMapCoGroupsInPandasExec', conf=arrow_udf_conf)


@ignore_order
@pytest.mark.parametrize('data_gen', [LongGen(nullable=False)], ids=idfn)
@pytest.mark.skipif(is_before_spark_330(), reason='mapInArrow is introduced in Pyspark 3.3.0')
def test_map_arrow_apply_udf(data_gen):
def filter_func(iterator):
for batch in iterator:
pdf = batch.to_pandas()
yield pyarrow.RecordBatch.from_pandas(pdf[pdf.b <= pdf.a])

assert_gpu_and_cpu_are_equal_collect(
lambda spark: binary_op_df(spark, data_gen, num_slices=4) \
.mapInArrow(filter_func, schema="a long, b long"),
conf=arrow_udf_conf)
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class GpuGroupUDFArrowPythonRunner(
conf: Map[String, String],
batchSize: Long,
val semWait: GpuMetric,
val pythonOutSchema: StructType)
pythonOutSchema: StructType)
extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets)
with GpuPythonArrowOutput {

Expand Down Expand Up @@ -123,4 +123,8 @@ class GpuGroupUDFArrowPythonRunner(
}
}
}

def toBatch(table: Table): ColumnarBatch = {
GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.python.PythonMapInArrowExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids._
import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta
import org.apache.spark.sql.rapids.shims.{GpuDivideDTInterval, GpuDivideYMInterval, GpuMultiplyDTInterval, GpuMultiplyYMInterval, GpuTimeAdd}
import org.apache.spark.sql.types.{CalendarIntervalType, DayTimeIntervalType, DecimalType, StructType}
import org.apache.spark.unsafe.types.CalendarInterval
Expand Down Expand Up @@ -250,7 +252,14 @@ trait Spark330PlusShims extends Spark321PlusShims with Spark320PlusNonDBShims {
TypeSig.ARRAY + TypeSig.DECIMAL_128 + TypeSig.BINARY +
GpuTypeShims.additionalCommonOperatorSupportedTypes).nested(),
TypeSig.all),
(fsse, conf, p, r) => new FileSourceScanExecMeta(fsse, conf, p, r))
(fsse, conf, p, r) => new FileSourceScanExecMeta(fsse, conf, p, r)),
GpuOverrides.exec[PythonMapInArrowExec](
"The backend for Map Arrow Iterator UDF. Accelerates the data transfer between the" +
" Java process and the Python process. It also supports scheduling GPU resources" +
" for the Python process when enabled.",
ExecChecks((TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(),
TypeSig.all),
(mapPy, conf, p, r) => new GpuPythonMapInArrowExecMeta(mapPy, conf, p, r))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
super.getExecs ++ map
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.execution.python

import com.nvidia.spark.rapids._

import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python._

class GpuPythonMapInArrowExecMeta(
mapArrow: PythonMapInArrowExec,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: DataFromReplacementRule)
extends SparkPlanMeta[PythonMapInArrowExec](mapArrow, conf, parent, rule) {

override def replaceMessage: String = "partially run on GPU"
override def noReplacementPossibleMessage(reasons: String): String =
s"cannot run even partially on the GPU because $reasons"

private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr(
mapArrow.func.asInstanceOf[PythonUDF], conf, Some(this))
private val resultAttrs: Seq[BaseExprMeta[Attribute]] =
mapArrow.output.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf

override def convertToGpu(): GpuExec =
GpuPythonMapInArrowExec(
udf.convertToGpu(),
resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]],
childPlans.head.convertIfNeeded()
)
}

/*
* A relation produced by applying a function that takes an iterator of PyArrow's record
* batches and outputs an iterator of PyArrow's record batches.
*
* This GpuMapInPandasExec aims at accelerating the data transfer between
* JVM and Python, and scheduling GPU resources for its Python processes.
*
*/
case class GpuPythonMapInArrowExec(
func: Expression,
output: Seq[Attribute],
child: SparkPlan) extends GpuMapInBatchExec {

override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_ARROW_ITER_UDF
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,25 +338,41 @@ private[python] object BatchGroupedIterator extends Arm {
}

/**
* Extract the children columns from a batch consisting of only one struct column, and
* wrap them up by a ColumnarBatch where they become the top-level children.
* Extract the first N children columns from a batch consisting of only one struct column,
* and wrap them up by a ColumnarBatch where they become the top-level children.
* N is equal to the size of the given children attributes.
*
* @param batch The input batch
* @param childrenAttrs The attributes of the children columns to be pulled out
* @return a columnar batch with the children pulled out.
*/
def extractChildren(batch: ColumnarBatch, childrenAttrs: Seq[Attribute]): ColumnarBatch = {
assert(batch.numCols() == 1, "Expect only one struct column")
assert(batch.column(0).dataType().isInstanceOf[StructType],
withResource(GpuColumnVector.from(batch)) { table =>
extractChildren(table, childrenAttrs)
}
}

/**
* Extract the first N children columns from a table consisting of only one struct column,
* and wrap them up by a ColumnarBatch where they become the top-level children.
* N is equal to the size of the given children attributes.
*
* @param table The input table
* @param childrenAttrs The attributes of the children columns to be pulled out
* @return a columnar batch with the children pulled out.
*/
def extractChildren(table: cudf.Table, childrenAttrs: Seq[Attribute]): ColumnarBatch = {
assert(table.getNumberOfColumns == 1, "Expect only one struct column")
assert(table.getColumn(0).getType == cudf.DType.STRUCT,
"Expect a struct column")
val structColumn = batch.column(0).asInstanceOf[GpuColumnVector].getBase
val structColumn = table.getColumn(0)
val outputColumns = childrenAttrs.zipWithIndex.safeMap {
case (attr, i) =>
withResource(structColumn.getChildColumnView(i)) { childView =>
GpuColumnVector.from(childView.copyToColumnVector(), attr.dataType)
}
}
new ColumnarBatch(outputColumns.toArray, batch.numRows())
new ColumnarBatch(outputColumns.toArray, table.getRowCount.toInt)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ trait GpuPythonArrowOutput extends Arm { _: GpuPythonRunnerBase[_] =>
minReadTargetBatchSize = size
}

def pythonOutSchema: StructType

def semWait: GpuMetric

/** Convert the table received from the Python side to a batch. */
protected def toBatch(table: Table): ColumnarBatch

protected def newReaderIterator(
stream: DataInputStream,
writerThread: WriterThread,
Expand Down Expand Up @@ -158,9 +159,9 @@ trait GpuPythonArrowOutput extends Arm { _: GpuPythonRunnerBase[_] =>
arrowReader = null
read()
} else {
withResource(table) { table =>
withResource(table) { _ =>
batchLoaded = true
GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema))
toBatch(table)
}
}
} else {
Expand Down Expand Up @@ -201,7 +202,7 @@ abstract class GpuPythonRunnerBase[IN](
/**
* Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream.
*/
class GpuArrowPythonRunner(
abstract class GpuArrowPythonRunnerBase(
funcs: Seq[ChainedPythonFunctions],
evalType: Int,
argOffsets: Array[Array[Int]],
Expand All @@ -210,7 +211,6 @@ class GpuArrowPythonRunner(
conf: Map[String, String],
batchSize: Long,
val semWait: GpuMetric,
val pythonOutSchema: StructType,
onDataWriteFinished: () => Unit = null)
extends GpuPythonRunnerBase[ColumnarBatch](funcs, evalType, argOffsets)
with GpuPythonArrowOutput {
Expand Down Expand Up @@ -282,6 +282,33 @@ class GpuArrowPythonRunner(
}
}

class GpuArrowPythonRunner(
funcs: Seq[ChainedPythonFunctions],
evalType: Int,
argOffsets: Array[Array[Int]],
pythonInSchema: StructType,
timeZoneId: String,
conf: Map[String, String],
batchSize: Long,
override val semWait: GpuMetric,
pythonOutSchema: StructType,
onDataWriteFinished: () => Unit = null)
extends GpuArrowPythonRunnerBase(
funcs,
evalType,
argOffsets,
pythonInSchema,
timeZoneId,
conf,
batchSize,
semWait,
onDataWriteFinished) {

def toBatch(table: Table): ColumnarBatch = {
GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema))
}
}

object GpuArrowPythonRunner {
def flattenNames(d: DataType, nullable: Boolean = true): Seq[(String, Boolean)] =
d match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils
/**
* Python UDF Runner for cogrouped UDFs, designed for `GpuFlatMapCoGroupsInPandasExec` only.
*
* It sends Arrow bathes from two different DataFrames, groups them in Python,
* It sends Arrow batches from two different DataFrames, groups them in Python,
* and receive it back in JVM as batches of single DataFrame.
*/
class GpuCoGroupedArrowPythonRunner(
Expand All @@ -45,7 +45,7 @@ class GpuCoGroupedArrowPythonRunner(
conf: Map[String, String],
batchSize: Int,
val semWait: GpuMetric,
val pythonOutSchema: StructType)
pythonOutSchema: StructType)
extends GpuPythonRunnerBase[(ColumnarBatch, ColumnarBatch)](funcs, evalType, argOffsets)
with GpuPythonArrowOutput {

Expand Down Expand Up @@ -117,4 +117,8 @@ class GpuCoGroupedArrowPythonRunner(
} // end of writeGroup
}
} // end of newWriterThread

def toBatch(table: Table): ColumnarBatch = {
GpuColumnVector.from(table, GpuColumnVector.extractTypes(pythonOutSchema))
}
}
Loading

0 comments on commit 1223370

Please sign in to comment.