Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Mar 7, 2024
1 parent 502cf62 commit 9891359
Show file tree
Hide file tree
Showing 16 changed files with 68 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
s"WriteFilesTransformer metrics update is not supported in CH backend")
}

override def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
override def genVanillaColumnarToNativeColumnarMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] = {
throw new UnsupportedOperationException(
s"ColumnarToColumnar metrics update is not supported in CH backend")
s"VanillaColumnarToNativeColumnar metrics update is not supported in CH backend")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
}
}

override def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase = {
override def genVanillaColumnarToNativeColumnarExec(
child: SparkPlan): VanillaColumnarToNativeColumnarExecBase = {
throw new UnsupportedOperationException(
"ColumnarToColumnarExec is not supported in ch backend.")
"VanillaColumnarToNativeColumnarExec is not supported in ch backend.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ class MetricsApiImpl extends MetricsApi with Logging {
"convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime to convert")
)

override def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
override def genVanillaColumnarToNativeColumnarMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi {
RowToVeloxColumnarExec(child)

/**
* Generate ColumnarToColumnarExec.
* Generate VanillaColumnarToNativeColumnarExec.
*
* @param child
* @return
*/
override def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase =
ColumnarToVeloxColumnarExec(child)
override def genVanillaColumnarToNativeColumnarExec(
child: SparkPlan): VanillaColumnarToNativeColumnarExecBase =
VanillaColumnarToVeloxColumnarExec(child)

/**
* Generate FilterExecTransformer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,5 +481,5 @@ object BackendSettings extends BackendSettingsApi {
true
}

override def supportColumnarToColumnarExec(): Boolean = true
override def supportVanillaColumnarToNativeColumnarExec(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import io.glutenproject.extension.ValidationResult
import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators
import io.glutenproject.memory.nmm.NativeMemoryManagers
import io.glutenproject.utils.{ArrowAbiUtil, Iterators}
import io.glutenproject.vectorized.ColumnarToNativeColumnarJniWrapper
import io.glutenproject.vectorized.VanillaColumnarToNativeColumnarJniWrapper

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.SparkPlan
Expand All @@ -37,7 +37,8 @@ import org.apache.spark.util.TaskResources

import org.apache.arrow.c.{ArrowArray, ArrowSchema, Data}

case class ColumnarToVeloxColumnarExec(child: SparkPlan) extends ColumnarToColumnarExecBase(child) {
case class VanillaColumnarToVeloxColumnarExec(child: SparkPlan)
extends VanillaColumnarToNativeColumnarExecBase(child) {

override protected def doValidateInternal(): ValidationResult = {
BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema) match {
Expand All @@ -61,7 +62,7 @@ case class ColumnarToVeloxColumnarExec(child: SparkPlan) extends ColumnarToColum
val localSchema = schema
child.executeColumnar().mapPartitions {
rowIterator =>
ColumnarToVeloxColumnarExec.toColumnarBatchIterator(
VanillaColumnarToVeloxColumnarExec.toColumnarBatchIterator(
rowIterator,
localSchema,
numInputBatches,
Expand All @@ -75,7 +76,7 @@ case class ColumnarToVeloxColumnarExec(child: SparkPlan) extends ColumnarToColum
}
}

object ColumnarToVeloxColumnarExec {
object VanillaColumnarToVeloxColumnarExec {

def toColumnarBatchIterator(
it: Iterator[ColumnarBatch],
Expand All @@ -89,40 +90,40 @@ object ColumnarToVeloxColumnarExec {

val arrowSchema =
SparkArrowUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone)
val jniWrapper = ColumnarToNativeColumnarJniWrapper.create()
val jniWrapper = VanillaColumnarToNativeColumnarJniWrapper.create()
val allocator = ArrowBufferAllocators.contextInstance()
val cSchema = ArrowSchema.allocateNew(allocator)
ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
val c2cHandle = jniWrapper.init(
cSchema.memoryAddress(),
NativeMemoryManagers
.contextInstance("ColumnarToColumnar")
.contextInstance("VanillaColumnarToNativeColumnar")
.getNativeInstanceHandle)

val converter = ArrowColumnarBatchConverter.create(arrowSchema, allocator)
val arrowConverter = ArrowColumnarBatchConverter.create(arrowSchema, allocator)

TaskResources.addRecycler("ColumnarToColumnar_resourceClean", 100) {
TaskResources.addRecycler("VanillaColumnarToNativeColumnar_resourceClean", 100) {
jniWrapper.close(c2cHandle)
converter.close()
arrowConverter.close()
cSchema.close()
}

val res: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] {

var arrowArray: ArrowArray = null
TaskResources.addRecycler("ColumnarToColumnar_arrowArray", 100) {
TaskResources.addRecycler("VanillaColumnarToNativeColumnar_arrowArray", 100) {
if (arrowArray != null) {
arrowArray.release()
arrowArray.close()
converter.reset()
arrowConverter.reset()
}
}

override def hasNext: Boolean = {
if (arrowArray != null) {
arrowArray.release()
arrowArray.close()
converter.reset()
arrowConverter.reset()
arrowArray = null
}
it.hasNext
Expand All @@ -131,11 +132,10 @@ object ColumnarToVeloxColumnarExec {
def nativeConvert(cb: ColumnarBatch): ColumnarBatch = {
numInputBatches += 1
arrowArray = ArrowArray.allocateNew(allocator)
converter.write(cb)
converter.finish()
Data.exportVectorSchemaRoot(allocator, converter.root, null, arrowArray)
val handle = jniWrapper
.nativeConvertColumnarToColumnar(c2cHandle, arrowArray.memoryAddress())
arrowConverter.write(cb)
arrowConverter.finish()
Data.exportVectorSchemaRoot(allocator, arrowConverter.root, null, arrowArray)
val handle = jniWrapper.nativeConvert(c2cHandle, arrowArray.memoryAddress())
ColumnarBatches.create(Runtimes.contextInstance(), handle)
}

Expand All @@ -153,7 +153,7 @@ object ColumnarToVeloxColumnarExec {
.wrap(res)
.recycleIterator {
jniWrapper.close(c2cHandle)
converter.close()
arrowConverter.close()
cSchema.close()
}
.recyclePayload(_.close())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,17 @@ class TestOperator extends VeloxWholeStageTransformerSuite {

test("c2c") {
withSQLConf(
"spark.gluten.sql.columnar.columnarToColumnar" -> "true",
"spark.gluten.sql.columnar.batchscan" -> "false") {
"spark.gluten.sql.columnar.vanillaColumnarToNativeColumnar" -> "true",
"spark.gluten.sql.columnar.filescan" -> "false") {
// TODO Include decimal types as well
runQueryAndCompare(
"select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_returnflag," +
"l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode," +
"l_comment from lineitem where l_shipdate < '1998-09-02'") {
df =>
assert(getExecutedPlan(df).exists(plan => plan.isInstanceOf[ColumnarToColumnarExecBase]))
assert(
getExecutedPlan(df).exists(
plan => plan.isInstanceOf[VanillaColumnarToNativeColumnarExecBase]))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_NativeRowToColumnarJniWr
JNI_METHOD_END()
}

JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ColumnarToNativeColumnarJniWrapper_init( // NOLINT
JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_init( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong cSchema,
Expand All @@ -678,7 +678,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ColumnarToNativeColumna
}

JNIEXPORT jlong JNICALL
Java_io_glutenproject_vectorized_ColumnarToNativeColumnarJniWrapper_nativeConvertColumnarToColumnar( // NOLINT
Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_nativeConvert( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong c2cHandle,
Expand All @@ -694,7 +694,7 @@ Java_io_glutenproject_vectorized_ColumnarToNativeColumnarJniWrapper_nativeConver
JNI_METHOD_END(kInvalidResourceHandle)
}

JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ColumnarToNativeColumnarJniWrapper_close( // NOLINT
JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_close( // NOLINT
JNIEnv* env,
jobject wrapper,
jlong c2cHandle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,5 @@ trait BackendSettingsApi {

def shouldRewriteTypedImperativeAggregate(): Boolean = false

def supportColumnarToColumnarExec(): Boolean = false
def supportVanillaColumnarToNativeColumnarExec(): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ trait MetricsApi extends Serializable {

def genRowToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric]

def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric]
def genVanillaColumnarToNativeColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric]

def genLimitTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ trait SparkPlanExecApi {
* @param child
* @return
*/
def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase
def genVanillaColumnarToNativeColumnarExec(
child: SparkPlan): VanillaColumnarToNativeColumnarExecBase

/**
* Generate FilterExecTransformer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* Provides a common executor to translate an [[RDD]] of Vanilla [[ColumnarBatch]] into an [[RDD]]
* of native [[ColumnarBatch]].
*/
abstract class ColumnarToColumnarExecBase(child: SparkPlan) extends GlutenPlan with UnaryExecNode {
abstract class VanillaColumnarToNativeColumnarExecBase(child: SparkPlan)
extends GlutenPlan
with UnaryExecNode {

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genColumnarToColumnarMetrics(sparkContext)
BackendsApiManager.getMetricsApiInstance.genVanillaColumnarToNativeColumnarMetrics(sparkContext)

override def supportsColumnar: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,25 +547,26 @@ case class InsertColumnarToColumnarTransitions(session: SparkSession) extends Ru
})
}

private def replaceWithVanillaColumnarToColumnar(plan: SparkPlan): SparkPlan = {
private def replaceWithVanillaColumnarToNativeColumnar(plan: SparkPlan): SparkPlan = {
plan match {
case p: RowToColumnarExecBase if p.child.isInstanceOf[ColumnarToRowExec] =>
val replacedChild = replaceWithVanillaColumnarToColumnar(
val replacedChild = replaceWithVanillaColumnarToNativeColumnar(
p.child.asInstanceOf[ColumnarToRowExec].child)
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToColumnarExec(replacedChild)
BackendsApiManager.getSparkPlanExecApiInstance.genVanillaColumnarToNativeColumnarExec(
replacedChild)
case _ =>
plan.withNewChildren(plan.children.map(replaceWithVanillaColumnarToColumnar))
plan.withNewChildren(plan.children.map(replaceWithVanillaColumnarToNativeColumnar))
}
}

def apply(plan: SparkPlan): SparkPlan = {
val transformedPlan = replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan))
val newPlan =
if (
GlutenConfig.getConf.enableNativeColumnarToColumnar && BackendsApiManager.getSettings
.supportColumnarToColumnarExec()
GlutenConfig.getConf.enableVanillaColumnarToNativeColumnar && BackendsApiManager.getSettings
.supportVanillaColumnarToNativeColumnarExec()
) {
replaceWithVanillaColumnarToColumnar(transformedPlan)
replaceWithVanillaColumnarToNativeColumnar(transformedPlan)
} else {
transformedPlan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import io.glutenproject.exec.Runtimes;

/** JniWrapper used to convert spark vanilla columnar data to native columnar data. */
public class ColumnarToNativeColumnarJniWrapper implements RuntimeAware {
public class VanillaColumnarToNativeColumnarJniWrapper implements RuntimeAware {
private final Runtime runtime;

private ColumnarToNativeColumnarJniWrapper(Runtime runtime) {
private VanillaColumnarToNativeColumnarJniWrapper(Runtime runtime) {
this.runtime = runtime;
}

public static ColumnarToNativeColumnarJniWrapper create() {
return new ColumnarToNativeColumnarJniWrapper(Runtimes.contextInstance());
public static VanillaColumnarToNativeColumnarJniWrapper create() {
return new VanillaColumnarToNativeColumnarJniWrapper(Runtimes.contextInstance());
}

@Override
Expand All @@ -39,7 +39,7 @@ public long handle() {

public native long init(long cSchema, long memoryManagerHandle);

public native long nativeConvertColumnarToColumnar(long c2cHandle, long memoryAddress);
public native long nativeConvert(long c2cHandle, long memoryAddress);

public native void close(long c2cHandle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ package org.apache.spark.sql.execution.arrow
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.vectorized.ColumnVector
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

import org.apache.arrow.memory.BufferAllocator
Expand Down Expand Up @@ -79,10 +76,9 @@ object ArrowColumnarBatchConverter {
}
new StructWriter(vector, children.toArray)
case (NullType, vector: NullVector) => new NullWriter(vector)
case (_: YearMonthIntervalType, vector: IntervalYearVector) => new IntervalYearWriter(vector)
case (_: DayTimeIntervalType, vector: IntervalDayVector) => new IntervalDayWriter(vector)
// TODO support YearMonthIntervalType, DayTimeIntervalType
case (dt, _) =>
throw QueryExecutionErrors.unsupportedDataTypeError(dt.catalogString)
throw new UnsupportedOperationException("Unsupported data type: " + dt)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def enableNativeColumnarToRow: Boolean = conf.getConf(COLUMNAR_COLUMNAR_TO_ROW_ENABLED)

def enableNativeColumnarToColumnar: Boolean = conf.getConf(COLUMNAR_COLUMNAR_TO_COLUMNAR_ENABLED)
def enableVanillaColumnarToNativeColumnar: Boolean =
conf.getConf(VANILLA_COLUMNAR_TO_NATIVE_COLUMNAR_ENABLED)

def forceShuffledHashJoin: Boolean = conf.getConf(COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED)

Expand Down Expand Up @@ -779,10 +780,10 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)

val COLUMNAR_COLUMNAR_TO_COLUMNAR_ENABLED =
buildConf("spark.gluten.sql.columnar.columnarToColumnar")
val VANILLA_COLUMNAR_TO_NATIVE_COLUMNAR_ENABLED =
buildConf("spark.gluten.sql.columnar.vanillaColumnarToNativeColumnar")
.internal()
.doc("Enable or disable columnar columnarToColumnar.")
.doc("Enable or disable native VanillaColumnarToNativeColumnar.")
.booleanConf
.createWithDefault(false)

Expand Down

0 comments on commit 9891359

Please sign in to comment.