Skip to content

Commit

Permalink
Fixing tests for Spark 2.4;
Browse files Browse the repository at this point in the history
Limiting scope of tests to Spark >= 3.1
  • Loading branch information
Alexey Kudinkin committed Jul 16, 2022
1 parent d819cb8 commit cc5155c
Showing 1 changed file with 69 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{FileSourceScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
Expand All @@ -39,62 +39,74 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp

test("Test NestedSchemaPruning optimization (COW/MOR)") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"

spark.sql(
s"""
|CREATE TABLE $tableName (
| id int,
| item STRUCT<name: string, price: double>,
| ts long
|) USING HUDI TBLPROPERTIES (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.populate.meta.fields = 'false'
|)
|LOCATION '$tablePath'
""".stripMargin)

spark.sql(
s"""
|INSERT INTO $tableName
|SELECT 1 AS id, named_struct('name', 'a1', 'price', 10) AS item, 123456 AS ts
""".stripMargin)

val selectDF = spark.sql(s"SELECT id, item.name FROM $tableName")

val expectedSchema = StructType(Seq(
StructField("id", IntegerType),
StructField("item" , StructType(Seq(StructField("name", StringType))))
))

spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)

val hint =
"""
|Following is expected to be present in the plan (where ReadSchema has properly pruned nested structs, which
|is an optimization performed by NestedSchemaPruning rule):
|
|== Physical Plan ==
|*(1) Project [id#45, item#46.name AS name#55]
|+- FileScan parquet default.h0[id#45,item#46] Batched: false, DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/private/var/folders/kb/cnff55vj041g2nnlzs5ylqk00000gn/T/spark-7137..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,item:struct<name:string>>
|]
|""".stripMargin

val executedPlan = executePlan(selectDF.logicalPlan)
executedPlan match {
// COW
case ProjectExec(_, FileSourceScanExec(_, _, requiredSchema, _, _, _, _, tableIdentifier, _)) =>
assertEquals(tableName, tableIdentifier.get.table)
assertEquals(expectedSchema, requiredSchema, hint)

// MOR
case ProjectExec(_, RowDataSourceScanExec(_, requiredSchema, _, _, _, _, _, tableIdentifier)) =>
assertEquals(tableName, tableIdentifier.get.table)
assertEquals(expectedSchema, requiredSchema, hint)
// NOTE: This tests are only relevant for Spark >= 3.1
// TODO extract tests into a separate spark-version-specific module
if (HoodieSparkUtils.gteqSpark3_1) {
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"

spark.sql(
s"""
|CREATE TABLE $tableName (
| id int,
| item STRUCT<name: string, price: double>,
| ts long
|) USING HUDI TBLPROPERTIES (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.populate.meta.fields = 'false'
|)
|LOCATION '$tablePath'
""".stripMargin)

spark.sql(
s"""
|INSERT INTO $tableName
|SELECT 1 AS id, named_struct('name', 'a1', 'price', 10) AS item, 123456 AS ts
""".stripMargin)

val selectDF = spark.sql(s"SELECT id, item.name FROM $tableName")

val expectedSchema = StructType(Seq(
StructField("id", IntegerType),
StructField("item" , StructType(Seq(StructField("name", StringType))))
))

spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)

val hint =
"""
|Following is expected to be present in the plan (where ReadSchema has properly pruned nested structs, which
|is an optimization performed by NestedSchemaPruning rule):
|
|== Physical Plan ==
|*(1) Project [id#45, item#46.name AS name#55]
|+- FileScan parquet default.h0[id#45,item#46] Batched: false, DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/private/var/folders/kb/cnff55vj041g2nnlzs5ylqk00000gn/T/spark-7137..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int,item:struct<name:string>>
|]
|""".stripMargin

val executedPlan = executePlan(selectDF.logicalPlan)
// NOTE: Unfortunately, we can't use pattern-matching to extract required fields, due to a need to maintain
// compatibility w/ Spark 2.4
executedPlan match {
// COW
case ProjectExec(_, fileScan: FileSourceScanExec) =>
val tableIdentifier = fileScan.tableIdentifier
val requiredSchema = fileScan.requiredSchema

assertEquals(tableName, tableIdentifier.get.table)
assertEquals(expectedSchema, requiredSchema, hint)

// MOR
case ProjectExec(_, dataScan: RowDataSourceScanExec) =>
val tableIdentifier = dataScan.tableIdentifier
val requiredSchema = dataScan.requiredSchema

assertEquals(tableName, tableIdentifier.get.table)
assertEquals(expectedSchema, requiredSchema, hint)
}
}
}
}
Expand Down

0 comments on commit cc5155c

Please sign in to comment.