From 9891359f4bbc87d2ad81539b6e68a105be292f02 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 6 Mar 2024 22:04:10 -0500 Subject: [PATCH] Address comments --- .../backendsapi/clickhouse/CHMetricsApi.scala | 5 +-- .../clickhouse/CHSparkPlanExecApi.scala | 5 +-- .../backendsapi/velox/MetricsApiImpl.scala | 3 +- .../velox/SparkPlanExecApiImpl.scala | 7 ++-- .../backendsapi/velox/VeloxBackend.scala | 2 +- ... VanillaColumnarToVeloxColumnarExec.scala} | 36 +++++++++---------- .../execution/TestOperator.scala | 8 +++-- cpp/core/jni/JniWrapper.cc | 6 ++-- .../backendsapi/BackendSettingsApi.scala | 2 +- .../backendsapi/MetricsApi.scala | 2 +- .../backendsapi/SparkPlanExecApi.scala | 3 +- ...llaColumnarToNativeColumnarExecBase.scala} | 6 ++-- .../extension/ColumnarOverrides.scala | 15 ++++---- ...laColumnarToNativeColumnarJniWrapper.java} | 10 +++--- .../arrow/ArrowColumnarBatchConverter.scala | 10 ++---- .../scala/io/glutenproject/GlutenConfig.scala | 9 ++--- 16 files changed, 68 insertions(+), 61 deletions(-) rename backends-velox/src/main/scala/io/glutenproject/execution/{ColumnarToVeloxColumnarExec.scala => VanillaColumnarToVeloxColumnarExec.scala} (81%) rename gluten-core/src/main/scala/io/glutenproject/execution/{ColumnarToColumnarExecBase.scala => VanillaColumnarToNativeColumnarExecBase.scala} (90%) rename gluten-data/src/main/java/io/glutenproject/vectorized/{ColumnarToNativeColumnarJniWrapper.java => VanillaColumnarToNativeColumnarJniWrapper.java} (77%) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala index b05aa45e41d6..18025211d7d6 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala @@ -382,8 +382,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { s"WriteFilesTransformer metrics update is not supported in CH backend") } - override def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = { + override def genVanillaColumnarToNativeColumnarMetrics( + sparkContext: SparkContext): Map[String, SQLMetric] = { throw new UnsupportedOperationException( - s"ColumnarToColumnar metrics update is not supported in CH backend") + s"VanillaColumnarToNativeColumnar metrics update is not supported in CH backend") } } diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 7a24d233bcd8..7630f735fdbf 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -736,8 +736,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { } } - override def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase = { + override def genVanillaColumnarToNativeColumnarExec( + child: SparkPlan): VanillaColumnarToNativeColumnarExecBase = { throw new UnsupportedOperationException( - "ColumnarToColumnarExec is not supported in ch backend.") + "VanillaColumnarToNativeColumnarExec is not supported in ch backend.") } } diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala index 7b0350e849d8..bc360236e4d9 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala @@ -313,7 +313,8 @@ class MetricsApiImpl extends MetricsApi with Logging { "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime to convert") ) - override def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = + override def genVanillaColumnarToNativeColumnarMetrics( + sparkContext: SparkContext): Map[String, SQLMetric] = Map( "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala index 7fb27d288c8c..785d24e97a0b 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/SparkPlanExecApiImpl.scala @@ -159,13 +159,14 @@ class SparkPlanExecApiImpl extends SparkPlanExecApi { RowToVeloxColumnarExec(child) /** - * Generate ColumnarToColumnarExec. + * Generate VanillaColumnarToNativeColumnarExec. * * @param child * @return */ - override def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase = - ColumnarToVeloxColumnarExec(child) + override def genVanillaColumnarToNativeColumnarExec( + child: SparkPlan): VanillaColumnarToNativeColumnarExecBase = + VanillaColumnarToVeloxColumnarExec(child) /** * Generate FilterExecTransformer. diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala index 3661daa07162..2f39af0d2a3e 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala @@ -481,5 +481,5 @@ object BackendSettings extends BackendSettingsApi { true } - override def supportColumnarToColumnarExec(): Boolean = true + override def supportVanillaColumnarToNativeColumnarExec(): Boolean = true } diff --git a/backends-velox/src/main/scala/io/glutenproject/execution/ColumnarToVeloxColumnarExec.scala b/backends-velox/src/main/scala/io/glutenproject/execution/VanillaColumnarToVeloxColumnarExec.scala similarity index 81% rename from backends-velox/src/main/scala/io/glutenproject/execution/ColumnarToVeloxColumnarExec.scala rename to backends-velox/src/main/scala/io/glutenproject/execution/VanillaColumnarToVeloxColumnarExec.scala index c5acca462577..5f2c197b0938 100644 --- a/backends-velox/src/main/scala/io/glutenproject/execution/ColumnarToVeloxColumnarExec.scala +++ b/backends-velox/src/main/scala/io/glutenproject/execution/VanillaColumnarToVeloxColumnarExec.scala @@ -23,7 +23,7 @@ import io.glutenproject.extension.ValidationResult import io.glutenproject.memory.arrowalloc.ArrowBufferAllocators import io.glutenproject.memory.nmm.NativeMemoryManagers import io.glutenproject.utils.{ArrowAbiUtil, Iterators} -import io.glutenproject.vectorized.ColumnarToNativeColumnarJniWrapper +import io.glutenproject.vectorized.VanillaColumnarToNativeColumnarJniWrapper import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.SparkPlan @@ -37,7 +37,8 @@ import org.apache.spark.util.TaskResources import org.apache.arrow.c.{ArrowArray, ArrowSchema, Data} -case class ColumnarToVeloxColumnarExec(child: SparkPlan) extends ColumnarToColumnarExecBase(child) { +case class VanillaColumnarToVeloxColumnarExec(child: SparkPlan) + extends VanillaColumnarToNativeColumnarExecBase(child) { override protected def doValidateInternal(): ValidationResult = { BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema) match { @@ -61,7 +62,7 @@ case class ColumnarToVeloxColumnarExec(child: SparkPlan) extends ColumnarToColum val localSchema = schema child.executeColumnar().mapPartitions { rowIterator => - ColumnarToVeloxColumnarExec.toColumnarBatchIterator( + VanillaColumnarToVeloxColumnarExec.toColumnarBatchIterator( rowIterator, localSchema, numInputBatches, @@ -75,7 +76,7 @@ case class ColumnarToVeloxColumnarExec(child: SparkPlan) extends ColumnarToColum } } -object ColumnarToVeloxColumnarExec { +object VanillaColumnarToVeloxColumnarExec { def toColumnarBatchIterator( it: Iterator[ColumnarBatch], @@ -89,32 +90,32 @@ object ColumnarToVeloxColumnarExec { val arrowSchema = SparkArrowUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone) - val jniWrapper = ColumnarToNativeColumnarJniWrapper.create() + val jniWrapper = VanillaColumnarToNativeColumnarJniWrapper.create() val allocator = ArrowBufferAllocators.contextInstance() val cSchema = ArrowSchema.allocateNew(allocator) ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) val c2cHandle = jniWrapper.init( cSchema.memoryAddress(), NativeMemoryManagers - .contextInstance("ColumnarToColumnar") + .contextInstance("VanillaColumnarToNativeColumnar") .getNativeInstanceHandle) - val converter = ArrowColumnarBatchConverter.create(arrowSchema, allocator) + val arrowConverter = ArrowColumnarBatchConverter.create(arrowSchema, allocator) - TaskResources.addRecycler("ColumnarToColumnar_resourceClean", 100) { + TaskResources.addRecycler("VanillaColumnarToNativeColumnar_resourceClean", 100) { jniWrapper.close(c2cHandle) - converter.close() + arrowConverter.close() cSchema.close() } val res: Iterator[ColumnarBatch] = new Iterator[ColumnarBatch] { var arrowArray: ArrowArray = null - TaskResources.addRecycler("ColumnarToColumnar_arrowArray", 100) { + TaskResources.addRecycler("VanillaColumnarToNativeColumnar_arrowArray", 100) { if (arrowArray != null) { arrowArray.release() arrowArray.close() - converter.reset() + arrowConverter.reset() } } @@ -122,7 +123,7 @@ object ColumnarToVeloxColumnarExec { if (arrowArray != null) { arrowArray.release() arrowArray.close() - converter.reset() + arrowConverter.reset() arrowArray = null } it.hasNext @@ -131,11 +132,10 @@ object ColumnarToVeloxColumnarExec { def nativeConvert(cb: ColumnarBatch): ColumnarBatch = { numInputBatches += 1 arrowArray = ArrowArray.allocateNew(allocator) - converter.write(cb) - converter.finish() - Data.exportVectorSchemaRoot(allocator, converter.root, null, arrowArray) - val handle = jniWrapper - .nativeConvertColumnarToColumnar(c2cHandle, arrowArray.memoryAddress()) + arrowConverter.write(cb) + arrowConverter.finish() + Data.exportVectorSchemaRoot(allocator, arrowConverter.root, null, arrowArray) + val handle = jniWrapper.nativeConvert(c2cHandle, arrowArray.memoryAddress()) ColumnarBatches.create(Runtimes.contextInstance(), handle) } @@ -153,7 +153,7 @@ object ColumnarToVeloxColumnarExec { .wrap(res) .recycleIterator { jniWrapper.close(c2cHandle) - converter.close() + arrowConverter.close() cSchema.close() } .recyclePayload(_.close()) diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala index 038c659763be..f0bbb1a8a27f 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala @@ -74,15 +74,17 @@ class TestOperator extends VeloxWholeStageTransformerSuite { test("c2c") { withSQLConf( - "spark.gluten.sql.columnar.columnarToColumnar" -> "true", - "spark.gluten.sql.columnar.batchscan" -> "false") { + "spark.gluten.sql.columnar.vanillaColumnarToNativeColumnar" -> "true", + "spark.gluten.sql.columnar.filescan" -> "false") { // TODO Include decimal types as well runQueryAndCompare( "select l_orderkey, l_partkey, l_suppkey, l_linenumber, l_returnflag," + "l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode," + "l_comment from lineitem where l_shipdate < '1998-09-02'") { df => - assert(getExecutedPlan(df).exists(plan => plan.isInstanceOf[ColumnarToColumnarExecBase])) + assert( + getExecutedPlan(df).exists( + plan => plan.isInstanceOf[VanillaColumnarToNativeColumnarExecBase])) } } } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 1d355c6a271f..c5cd05298cae 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -663,7 +663,7 @@ JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_NativeRowToColumnarJniWr JNI_METHOD_END() } -JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ColumnarToNativeColumnarJniWrapper_init( // NOLINT +JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_init( // NOLINT JNIEnv* env, jobject wrapper, jlong cSchema, @@ -678,7 +678,7 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ColumnarToNativeColumna } JNIEXPORT jlong JNICALL -Java_io_glutenproject_vectorized_ColumnarToNativeColumnarJniWrapper_nativeConvertColumnarToColumnar( // NOLINT +Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_nativeConvert( // NOLINT JNIEnv* env, jobject wrapper, jlong c2cHandle, @@ -694,7 +694,7 @@ Java_io_glutenproject_vectorized_ColumnarToNativeColumnarJniWrapper_nativeConver JNI_METHOD_END(kInvalidResourceHandle) } -JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_ColumnarToNativeColumnarJniWrapper_close( // NOLINT +JNIEXPORT void JNICALL Java_io_glutenproject_vectorized_VanillaColumnarToNativeColumnarJniWrapper_close( // NOLINT JNIEnv* env, jobject wrapper, jlong c2cHandle) { diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala index 9b5a080b6973..62d2b3278df4 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala @@ -133,5 +133,5 @@ trait BackendSettingsApi { def shouldRewriteTypedImperativeAggregate(): Boolean = false - def supportColumnarToColumnarExec(): Boolean = false + def supportVanillaColumnarToNativeColumnarExec(): Boolean = false } diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/MetricsApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/MetricsApi.scala index a8c460fb18b9..40a4e2a041d8 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/MetricsApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/MetricsApi.scala @@ -83,7 +83,7 @@ trait MetricsApi extends Serializable { def genRowToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] - def genColumnarToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] + def genVanillaColumnarToNativeColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] def genLimitTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] diff --git a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala index 3135aece1190..3d103771e63e 100644 --- a/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala @@ -75,7 +75,8 @@ trait SparkPlanExecApi { * @param child * @return */ - def genColumnarToColumnarExec(child: SparkPlan): ColumnarToColumnarExecBase + def genVanillaColumnarToNativeColumnarExec( + child: SparkPlan): VanillaColumnarToNativeColumnarExecBase /** * Generate FilterExecTransformer. diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/ColumnarToColumnarExecBase.scala b/gluten-core/src/main/scala/io/glutenproject/execution/VanillaColumnarToNativeColumnarExecBase.scala similarity index 90% rename from gluten-core/src/main/scala/io/glutenproject/execution/ColumnarToColumnarExecBase.scala rename to gluten-core/src/main/scala/io/glutenproject/execution/VanillaColumnarToNativeColumnarExecBase.scala index 27b908d37304..700a334c7905 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/ColumnarToColumnarExecBase.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/VanillaColumnarToNativeColumnarExecBase.scala @@ -30,11 +30,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * Provides a common executor to translate an [[RDD]] of Vanilla [[ColumnarBatch]] into an [[RDD]] * of native [[ColumnarBatch]]. */ -abstract class ColumnarToColumnarExecBase(child: SparkPlan) extends GlutenPlan with UnaryExecNode { +abstract class VanillaColumnarToNativeColumnarExecBase(child: SparkPlan) + extends GlutenPlan + with UnaryExecNode { // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. @transient override lazy val metrics = - BackendsApiManager.getMetricsApiInstance.genColumnarToColumnarMetrics(sparkContext) + BackendsApiManager.getMetricsApiInstance.genVanillaColumnarToNativeColumnarMetrics(sparkContext) override def supportsColumnar: Boolean = true diff --git a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala index 7e07594779c9..4a339cd9cbcd 100644 --- a/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala +++ b/gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala @@ -547,14 +547,15 @@ case class InsertColumnarToColumnarTransitions(session: SparkSession) extends Ru }) } - private def replaceWithVanillaColumnarToColumnar(plan: SparkPlan): SparkPlan = { + private def replaceWithVanillaColumnarToNativeColumnar(plan: SparkPlan): SparkPlan = { plan match { case p: RowToColumnarExecBase if p.child.isInstanceOf[ColumnarToRowExec] => - val replacedChild = replaceWithVanillaColumnarToColumnar( + val replacedChild = replaceWithVanillaColumnarToNativeColumnar( p.child.asInstanceOf[ColumnarToRowExec].child) - BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToColumnarExec(replacedChild) + BackendsApiManager.getSparkPlanExecApiInstance.genVanillaColumnarToNativeColumnarExec( + replacedChild) case _ => - plan.withNewChildren(plan.children.map(replaceWithVanillaColumnarToColumnar)) + plan.withNewChildren(plan.children.map(replaceWithVanillaColumnarToNativeColumnar)) } } @@ -562,10 +563,10 @@ case class InsertColumnarToColumnarTransitions(session: SparkSession) extends Ru val transformedPlan = replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan)) val newPlan = if ( - GlutenConfig.getConf.enableNativeColumnarToColumnar && BackendsApiManager.getSettings - .supportColumnarToColumnarExec() + GlutenConfig.getConf.enableVanillaColumnarToNativeColumnar && BackendsApiManager.getSettings + .supportVanillaColumnarToNativeColumnarExec() ) { - replaceWithVanillaColumnarToColumnar(transformedPlan) + replaceWithVanillaColumnarToNativeColumnar(transformedPlan) } else { transformedPlan } diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarToNativeColumnarJniWrapper.java b/gluten-data/src/main/java/io/glutenproject/vectorized/VanillaColumnarToNativeColumnarJniWrapper.java similarity index 77% rename from gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarToNativeColumnarJniWrapper.java rename to gluten-data/src/main/java/io/glutenproject/vectorized/VanillaColumnarToNativeColumnarJniWrapper.java index 6670d0565ec5..e74b7e2d19bf 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/ColumnarToNativeColumnarJniWrapper.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/VanillaColumnarToNativeColumnarJniWrapper.java @@ -21,15 +21,15 @@ import io.glutenproject.exec.Runtimes; /** JniWrapper used to convert spark vanilla columnar data to native columnar data. */ -public class ColumnarToNativeColumnarJniWrapper implements RuntimeAware { +public class VanillaColumnarToNativeColumnarJniWrapper implements RuntimeAware { private final Runtime runtime; - private ColumnarToNativeColumnarJniWrapper(Runtime runtime) { + private VanillaColumnarToNativeColumnarJniWrapper(Runtime runtime) { this.runtime = runtime; } - public static ColumnarToNativeColumnarJniWrapper create() { - return new ColumnarToNativeColumnarJniWrapper(Runtimes.contextInstance()); + public static VanillaColumnarToNativeColumnarJniWrapper create() { + return new VanillaColumnarToNativeColumnarJniWrapper(Runtimes.contextInstance()); } @Override @@ -39,7 +39,7 @@ public long handle() { public native long init(long cSchema, long memoryManagerHandle); - public native long nativeConvertColumnarToColumnar(long c2cHandle, long memoryAddress); + public native long nativeConvert(long c2cHandle, long memoryAddress); public native void close(long c2cHandle); } diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowColumnarBatchConverter.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowColumnarBatchConverter.scala index 1ac1c34c121c..0772916e709b 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowColumnarBatchConverter.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowColumnarBatchConverter.scala @@ -19,12 +19,9 @@ package org.apache.spark.sql.execution.arrow import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} -import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.ArrowUtils -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.sql.vectorized.ColumnVector +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.arrow.memory.BufferAllocator @@ -79,10 +76,9 @@ object ArrowColumnarBatchConverter { } new StructWriter(vector, children.toArray) case (NullType, vector: NullVector) => new NullWriter(vector) - case (_: YearMonthIntervalType, vector: IntervalYearVector) => new IntervalYearWriter(vector) - case (_: DayTimeIntervalType, vector: IntervalDayVector) => new IntervalDayWriter(vector) + // TODO support YearMonthIntervalType, DayTimeIntervalType case (dt, _) => - throw QueryExecutionErrors.unsupportedDataTypeError(dt.catalogString) + throw new UnsupportedOperationException("Unsupported data type: " + dt) } } } diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index 7dac2df8228a..79559040b18e 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -70,7 +70,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def enableNativeColumnarToRow: Boolean = conf.getConf(COLUMNAR_COLUMNAR_TO_ROW_ENABLED) - def enableNativeColumnarToColumnar: Boolean = conf.getConf(COLUMNAR_COLUMNAR_TO_COLUMNAR_ENABLED) + def enableVanillaColumnarToNativeColumnar: Boolean = + conf.getConf(VANILLA_COLUMNAR_TO_NATIVE_COLUMNAR_ENABLED) def forceShuffledHashJoin: Boolean = conf.getConf(COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED) @@ -779,10 +780,10 @@ object GlutenConfig { .booleanConf .createWithDefault(true) - val COLUMNAR_COLUMNAR_TO_COLUMNAR_ENABLED = - buildConf("spark.gluten.sql.columnar.columnarToColumnar") + val VANILLA_COLUMNAR_TO_NATIVE_COLUMNAR_ENABLED = + buildConf("spark.gluten.sql.columnar.vanillaColumnarToNativeColumnar") .internal() - .doc("Enable or disable columnar columnarToColumnar.") + .doc("Enable or disable native VanillaColumnarToNativeColumnar.") .booleanConf .createWithDefault(false)