Skip to content

Commit

Permalink
Merge branch 'main' into enable_from_json
Browse files Browse the repository at this point in the history
  • Loading branch information
zhli1142015 authored Jan 9, 2025
2 parents cc68e23 + add2542 commit c158887
Show file tree
Hide file tree
Showing 76 changed files with 2,287 additions and 875 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/clickhouse_be_trigger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ jobs:
script: |
const issueNumber = context.payload.number;
let body;
body = "Run Gluten Clickhouse CI on x86";
if (issueNumber % 10 === 0) {
body = "Run Gluten ClickHouse CI on ARM";
} else {
body = "Run Gluten Clickhouse CI on x86";
}
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
isPartitionedTable: Boolean,
options: Map[String, String]): ValidationResult = {

def validateCompressionCodec(): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.columnarbatch;

import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators;
import org.apache.gluten.runtime.Runtime;
import org.apache.gluten.runtime.Runtimes;

Expand Down Expand Up @@ -56,6 +57,7 @@ public static void checkNonVeloxBatch(ColumnarBatch batch) {
}

public static ColumnarBatch toVeloxBatch(ColumnarBatch input) {
ColumnarBatches.checkOffloaded(input);
if (ColumnarBatches.isZeroColumnBatch(input)) {
return input;
}
Expand Down Expand Up @@ -86,6 +88,26 @@ public static ColumnarBatch toVeloxBatch(ColumnarBatch input) {
return input;
}

/**
* Check if a columnar batch is in Velox format. If not, convert it to Velox format then return.
* If already in Velox format, return the batch directly.
*
* <p>Should only be used for certain conditions when unable to insert explicit to-Velox
* transitions through query planner.
*
* <p>For example, used by {@link org.apache.spark.sql.execution.ColumnarCachedBatchSerializer} as
* Spark directly calls API ColumnarCachedBatchSerializer#convertColumnarBatchToCachedBatch for
* query plan that returns supportsColumnar=true without generating a cache-write query plan node.
*/
public static ColumnarBatch ensureVeloxBatch(ColumnarBatch input) {
final ColumnarBatch light =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), input);
if (isVeloxBatch(light)) {
return light;
}
return toVeloxBatch(light);
}

/**
* Combine multiple columnar batches horizontally, assuming each of them is already offloaded.
* Otherwise {@link UnsupportedOperationException} will be thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,12 @@ object VeloxBackendSettings extends BackendSettingsApi {

format match {
case ParquetReadFormat =>
val typeValidator: PartialFunction[StructField, String] = {
// Parquet timestamp is not fully supported yet
case StructField(_, TimestampType, _, _)
if GlutenConfig.get.forceParquetTimestampTypeScanFallbackEnabled =>
"TimestampType(force fallback)"
}
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
if (parquetOptions.mergeSchema) {
// https://github.com/apache/incubator-gluten/issues/7174
Some(s"not support when merge schema is true")
} else {
validateTypes(typeValidator)
None
}
case DwrfReadFormat => None
case OrcReadFormat =>
Expand Down Expand Up @@ -238,6 +232,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
isPartitionedTable: Boolean,
options: Map[String, String]): ValidationResult = {

// Validate if HiveFileFormat write is supported based on output file type
Expand Down Expand Up @@ -337,10 +332,17 @@ object VeloxBackendSettings extends BackendSettingsApi {
}

def validateBucketSpec(): Option[String] = {
if (bucketSpec.nonEmpty) {
Some("Unsupported native write: bucket write is not supported.")
} else {
val isHiveCompatibleBucketTable = bucketSpec.nonEmpty && options
.getOrElse("__hive_compatible_bucketed_table_insertion__", "false")
.equals("true")
// Currently, the velox backend only supports bucketed tables compatible with Hive and
// is limited to partitioned tables. Therefore, we should add this condition restriction.
// After velox supports bucketed non-partitioned tables, we can remove the restriction on
// partitioned tables.
if (bucketSpec.isEmpty || (isHiveCompatibleBucketTable && isPartitionedTable)) {
None
} else {
Some("Unsupported native write: non-compatible hive bucket write is not supported.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ class VeloxListenerApi extends ListenerApi with Logging {
ByteUnit.BYTE.toMiB(desiredOverheadSize).toString)
}
val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
if (overheadSize < desiredOverheadSize) {
if (ByteUnit.BYTE.toMiB(overheadSize) < ByteUnit.BYTE.toMiB(desiredOverheadSize)) {
logWarning(
s"Memory overhead is set to $overheadSize which is smaller than the recommended size" +
s" $desiredOverheadSize. This may cause OOM.")
s"Memory overhead is set to ${ByteUnit.BYTE.toMiB(overheadSize)}MiB which is smaller than" +
s" the recommended size ${ByteUnit.BYTE.toMiB(desiredOverheadSize)}MiB." +
s" This may cause OOM.")
}
conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.iterator.Iterators
Expand All @@ -28,7 +28,7 @@ import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
import org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer
Expand Down Expand Up @@ -76,9 +76,11 @@ case class CachedColumnarBatch(
* -> Convert DefaultCachedBatch to InternalRow using vanilla Spark serializer
*/
// format: on
class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHelper with Logging {
class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging {
private lazy val rowBasedCachedBatchSerializer = new DefaultCachedBatchSerializer

private def glutenConf: GlutenConfig = GlutenConfig.get

private def toStructType(schema: Seq[Attribute]): StructType = {
StructType(schema.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
}
Expand Down Expand Up @@ -108,14 +110,14 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
// `convertColumnarBatchToCachedBatch`, but the inside ColumnarBatch is not arrow-based.
// See: `InMemoryRelation.apply()`.
// So we should disallow columnar input if using vanilla Spark columnar scan.
val noVanillaSparkColumnarScan = conf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
!conf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
val noVanillaSparkColumnarScan = glutenConf.getConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED) ||
!glutenConf.getConf(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED)
glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(
schema) && noVanillaSparkColumnarScan
}

override def supportsColumnarOutput(schema: StructType): Boolean = {
conf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
glutenConf.getConf(GlutenConfig.GLUTEN_ENABLED) && validateSchema(schema)
}

override def convertInternalRowToCachedBatch(
Expand Down Expand Up @@ -169,11 +171,16 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with SQLConfHe
conf: SQLConf): RDD[CachedBatch] = {
input.mapPartitions {
it =>
val veloxBatches = it.map {
/* Native code needs a Velox offloaded batch, making sure to offload
if heavy batch is encountered */
batch => VeloxColumnarBatches.ensureVeloxBatch(batch)
}
new Iterator[CachedBatch] {
override def hasNext: Boolean = it.hasNext
override def hasNext: Boolean = veloxBatches.hasNext

override def next(): CachedBatch = {
val batch = it.next()
val batch = veloxBatches.next()
val results =
ColumnarBatchSerializerJniWrapper
.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.test;

import org.apache.gluten.config.GlutenConfig;
import org.apache.gluten.config.GlutenConfig$;

import com.codahale.metrics.MetricRegistry;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Object ask(Object message) throws Exception {
private static SparkConf newSparkConf() {
final SparkConf conf = new SparkConf();
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY(), "1g");
conf.set(GlutenConfig.COLUMNAR_VELOX_CONNECTOR_IO_THREADS(), "0");
conf.set(GlutenConfig$.MODULE$.COLUMNAR_VELOX_CONNECTOR_IO_THREADS().key(), "0");
return conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,33 +67,26 @@ class ArrowCsvScanSuiteV2 extends ArrowCsvScanSuite {
}
}

/** Since https://github.com/apache/incubator-gluten/pull/5850. */
abstract class ArrowCsvScanSuite extends VeloxWholeStageTransformerSuite {
override protected val resourcePath: String = "N/A"
override protected val fileFormat: String = "N/A"

protected val rootPath: String = getClass.getResource("/").getPath

override def beforeAll(): Unit = {
super.beforeAll()
createCsvTables()
}

override def afterAll(): Unit = {
super.afterAll()
}

class ArrowCsvScanWithTableCacheSuite extends ArrowCsvScanSuiteBase {
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
.set("spark.sql.sources.useV1SourceList", "csv")
.set(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
}

/**
* Test for GLUTEN-8453: https://github.com/apache/incubator-gluten/issues/8453. To make sure no
* error is thrown when caching an Arrow Java query plan.
*/
test("csv scan v1 with table cache") {
val df = spark.sql("select * from student")
df.cache()
assert(df.collect().length == 3)
}
}

/** Since https://github.com/apache/incubator-gluten/pull/5850. */
abstract class ArrowCsvScanSuite extends ArrowCsvScanSuiteBase {
test("csv scan with option string as null") {
val df = runAndCompare("select * from student_option_str")()
val plan = df.queryExecution.executedPlan
Expand Down Expand Up @@ -152,6 +145,33 @@ abstract class ArrowCsvScanSuite extends VeloxWholeStageTransformerSuite {
val df = runAndCompare("select count(1) from student")()
checkLengthAndPlan(df, 1)
}
}

abstract class ArrowCsvScanSuiteBase extends VeloxWholeStageTransformerSuite {
override protected val resourcePath: String = "N/A"
override protected val fileFormat: String = "N/A"

protected val rootPath: String = getClass.getResource("/").getPath

override def beforeAll(): Unit = {
super.beforeAll()
createCsvTables()
}

override def afterAll(): Unit = {
super.afterAll()
}

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.sql.files.maxPartitionBytes", "1g")
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set(GlutenConfig.NATIVE_ARROW_READER_ENABLED.key, "true")
}

private def createCsvTables(): Unit = {
spark.read
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.{DataFrame, GlutenQueryTest}
import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, Expression, HiveHash, Literal, Pmod, UnsafeProjection}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SQLTestUtils

import java.io.File

trait BucketWriteUtils extends GlutenQueryTest with SQLTestUtils {

def tableDir(table: String): File = {
val identifier = spark.sessionState.sqlParser.parseTableIdentifier(table)
new File(spark.sessionState.catalog.defaultTablePath(identifier))
}

protected def testBucketing(
dataDir: File,
source: String = "parquet",
numBuckets: Int,
bucketCols: Seq[String],
sortCols: Seq[String] = Nil,
inputDF: DataFrame,
bucketIdExpression: (Seq[Expression], Int) => Expression,
getBucketIdFromFileName: String => Option[Int]): Unit = {
val allBucketFiles =
dataDir.listFiles().filterNot(f => f.getName.startsWith(".") || f.getName.startsWith("_"))

for (bucketFile <- allBucketFiles) {
val bucketId = getBucketIdFromFileName(bucketFile.getName).getOrElse {
fail(s"Unable to find the related bucket files.")
}

// Remove the duplicate columns in bucketCols and sortCols;
// Otherwise, we got analysis errors due to duplicate names
val selectedColumns = (bucketCols ++ sortCols).distinct
// We may lose the type information after write(e.g. json format doesn't keep schema
// information), here we get the types from the original dataframe.
val types = inputDF.select(selectedColumns.map(col): _*).schema.map(_.dataType)
val columns = selectedColumns.zip(types).map { case (colName, dt) => col(colName).cast(dt) }

// Read the bucket file into a dataframe, so that it's easier to test.
val readBack = spark.read
.format(source)
.load(bucketFile.getAbsolutePath)
.select(columns: _*)

// If we specified sort columns while writing bucket table, make sure the data in this
// bucket file is already sorted.
if (sortCols.nonEmpty) {
checkAnswer(readBack.sort(sortCols.map(col): _*), readBack.collect())
}

// Go through all rows in this bucket file, calculate bucket id according to bucket column
// values, and make sure it equals to the expected bucket id that inferred from file name.
val qe = readBack.select(bucketCols.map(col): _*).queryExecution
val rows = qe.toRdd.map(_.copy()).collect()
val getBucketId = UnsafeProjection.create(
bucketIdExpression(qe.analyzed.output, numBuckets) :: Nil,
qe.analyzed.output)

for (row <- rows) {
val actualBucketId = getBucketId(row).getInt(0)
assert(actualBucketId == bucketId)
}
}
}

def bucketIdExpression(expressions: Seq[Expression], numBuckets: Int): Expression =
Pmod(BitwiseAnd(HiveHash(expressions), Literal(Int.MaxValue)), Literal(numBuckets))

def getBucketIdFromFileName(fileName: String): Option[Int] = {
val hiveBucketedFileName = """^(\d+)_0_.*$""".r
fileName match {
case hiveBucketedFileName(bucketId) => Some(bucketId.toInt)
case _ => None
}
}
}
Loading

0 comments on commit c158887

Please sign in to comment.