Skip to content

Commit

Permalink
cleaner code
Browse files Browse the repository at this point in the history
  • Loading branch information
CTTY committed Jul 26, 2022
1 parent 979eda6 commit fb4b473
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ private[hudi] trait SparkVersionsSupport {
def gteqSpark3_2: Boolean = getSparkVersion >= "3.2"
def gteqSpark3_2_1: Boolean = getSparkVersion >= "3.2.1"
def gteqSpark3_3: Boolean = getSparkVersion >= "3.3"
def gteqSpark3_3_0: Boolean = getSparkVersion >= "3.3.0"
}

object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.hudi.SparkAdapter
trait SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.gteqSpark3_3_0) {
val adapterClass = if (HoodieSparkUtils.isSpark3_3) {
"org.apache.spark.sql.adapter.Spark3_3Adapter"
} else if (HoodieSparkUtils.isSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Command, Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -157,7 +155,7 @@ trait SparkAdapter extends Serializable {
* Resolve [[DeleteFromTable]]
* SPARK-38626 condition is no longer Option in Spark 3.3
*/
def resolveDeleteFromTable(dft: Command,
def resolveDeleteFromTable(deleteFromTable: Command,
resolveExpression: Expression => Expression): LogicalPlan

/**
Expand Down
2 changes: 1 addition & 1 deletion hudi-spark-datasource/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ has no class since hudi only supports spark 2.4.4 version, and it acts as the pl
### To improve:
Spark3.3 support time travel syntax link [SPARK-37219](https://issues.apache.org/jira/browse/SPARK-37219).
Once Spark 3.3 released. The files in the following list will be removed:
* hudi-spark3.3.x's `HoodieSpark3_3ExtendedSqlAstBuilder.scala``HoodieSpark3_3ExtendedSqlParser.scala``TimeTravelRelation.scala``SqlBase.g4``HoodieSqlBase.g4`
* hudi-spark3.3.x's `HoodieSpark3_3ExtendedSqlAstBuilder.scala`, `HoodieSpark3_3ExtendedSqlParser.scala`, `TimeTravelRelation.scala`, `SqlBase.g4`, `HoodieSqlBase.g4`
Tracking Jira: [HUDI-4468](https://issues.apache.org/jira/browse/HUDI-4468)

Some other improvement undergoing:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ object HoodieAnalysis {

def customOptimizerRules: Seq[RuleBuilder] = {
if (HoodieSparkUtils.gteqSpark3_1) {
var nestedSchemaPruningClass = "ClassName"
if (HoodieSparkUtils.gteqSpark3_3) {
nestedSchemaPruningClass = "org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning"
} else if (HoodieSparkUtils.gteqSpark3_2) {
nestedSchemaPruningClass = "org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning"
} else {
// spark 3.1
nestedSchemaPruningClass = "org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning"
}
val nestedSchemaPruningClass =
if (HoodieSparkUtils.gteqSpark3_3) {
"org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning"
} else if (HoodieSparkUtils.gteqSpark3_2) {
"org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning"
} else {
// spark 3.1
"org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning"
}

val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]]
Seq(_ => nestedSchemaPruningRule)
Expand Down Expand Up @@ -84,7 +84,7 @@ object HoodieAnalysis {
session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]]

val resolveAlterTableCommandsClass =
if (HoodieSparkUtils.gteqSpark3_3_0)
if (HoodieSparkUtils.gteqSpark3_3)
"org.apache.spark.sql.hudi.Spark33ResolveHudiAlterTableCommand"
else "org.apache.spark.sql.hudi.Spark32ResolveHudiAlterTableCommand"
val resolveAlterTableCommands: RuleBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
Expand Down Expand Up @@ -210,8 +211,14 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""")

// specify duplicate partition columns
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')")(
"Found duplicate keys ")
if (HoodieSparkUtils.gteqSpark3_3) {
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')")(
"Found duplicate keys `dt`")
} else {
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')")(
"Found duplicate keys 'dt'")
}


// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class TestCallCommandParser extends HoodieSparkSqlTestBase {
}

test("Test Call Parse Error") {
if (HoodieSparkUtils.gteqSpark3_3_0) {
if (HoodieSparkUtils.gteqSpark3_3) {
checkParseExceptionContain("CALL cat.system radish kebab")("Syntax error at or near 'CALL'")
} else {
checkParseExceptionContain("CALL cat.system radish kebab")("mismatched input 'CALL' expecting")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ class Spark2Adapter extends SparkAdapter {
new Spark2HoodieFileScanRDD(sparkSession, readFunction, filePartitions)
}

override def resolveDeleteFromTable(dft: Command,
override def resolveDeleteFromTable(deleteFromTable: Command,
resolveExpression: Expression => Expression): DeleteFromTable = {
val deleteFromTableCommand = dft.asInstanceOf[DeleteFromTable]
val deleteFromTableCommand = deleteFromTable.asInstanceOf[DeleteFromTable]
val resolvedCondition = deleteFromTableCommand.condition.map(resolveExpression)
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
new Spark31HoodieFileScanRDD(sparkSession, readFunction, filePartitions)
}

override def resolveDeleteFromTable(dft: Command,
override def resolveDeleteFromTable(deleteFromTable: Command,
resolveExpression: Expression => Expression): DeleteFromTable = {
val deleteFromTableCommand = dft.asInstanceOf[DeleteFromTable]
val deleteFromTableCommand = deleteFromTable.asInstanceOf[DeleteFromTable]
val resolvedCondition = deleteFromTableCommand.condition.map(resolveExpression)
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
new Spark32HoodieFileScanRDD(sparkSession, readFunction, filePartitions)
}

override def resolveDeleteFromTable(dft: Command,
override def resolveDeleteFromTable(deleteFromTable: Command,
resolveExpression: Expression => Expression): DeleteFromTable = {
val deleteFromTableCommand = dft.asInstanceOf[DeleteFromTable]
val deleteFromTableCommand = deleteFromTable.asInstanceOf[DeleteFromTable]
val resolvedCondition = deleteFromTableCommand.condition.map(resolveExpression)
DeleteFromTable(deleteFromTableCommand.table, resolvedCondition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
}


override def resolveDeleteFromTable(dft: Command,
override def resolveDeleteFromTable(deleteFromTable: Command,
resolveExpression: Expression => Expression): DeleteFromTable = {
val deleteFromTableCommand = dft.asInstanceOf[DeleteFromTable]
val deleteFromTableCommand = deleteFromTable.asInstanceOf[DeleteFromTable]
DeleteFromTable(deleteFromTableCommand.table, resolveExpression(deleteFromTableCommand.condition))
}

Expand Down

0 comments on commit fb4b473

Please sign in to comment.