Skip to content

Commit

Permalink
Support Spark 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
CTTY committed Jul 17, 2022
1 parent 80368a0 commit 4eff788
Show file tree
Hide file tree
Showing 72 changed files with 10,361 additions and 69 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ jobs:
sparkProfile: "spark3.2"
flinkProfile: "flink1.14"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.3"
flinkProfile: "flink1.14"

steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ public class HoodieStorageConfig extends HoodieConfig {
.defaultValue("TIMESTAMP_MICROS")
.withDocumentation("Sets spark.sql.parquet.outputTimestampType. Parquet timestamp type to use when Spark writes data to Parquet files.");

public static final ConfigProperty<String> PARQUET_FIELD_ID_WRITE_ENABLED = ConfigProperty
.key("hoodie.parquet.fieldId.write.enabled")
.defaultValue("true")
.withDocumentation("Sets spark.sql.parquet.fieldId.write.enabled. "
+ "If enabled, Spark will write out parquet native field ids that are stored inside StructField's metadata as parquet.field.id to parquet files.");

public static final ConfigProperty<String> HFILE_COMPRESSION_ALGORITHM_NAME = ConfigProperty
.key("hoodie.hfile.compression.algorithm")
.defaultValue("GZ")
Expand Down Expand Up @@ -337,6 +343,11 @@ public Builder parquetOutputTimestampType(String parquetOutputTimestampType) {
return this;
}

public Builder parquetFieldIdWrite(String parquetFieldIdWrite) {
storageConfig.setValue(PARQUET_FIELD_ID_WRITE_ENABLED, parquetFieldIdWrite);
return this;
}

public Builder hfileCompressionAlgorithm(String hfileCompressionAlgorithm) {
storageConfig.setValue(HFILE_COMPRESSION_ALGORITHM_NAME, hfileCompressionAlgorithm);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,10 @@ public String parquetOutputTimestampType() {
return getString(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE);
}

public String parquetFieldIdWriteEnabled() {
return getString(HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED);
}

public Option<HoodieLogBlock.HoodieLogBlockType> getLogDataBlockFormat() {
return Option.ofNullable(getString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT))
.map(HoodieLogBlock.HoodieLogBlockType::fromId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client.bootstrap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
Expand Down Expand Up @@ -71,11 +72,20 @@ protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair
}

private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
MessageType parquetSchema = new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath);
Configuration hadoopConf = context.getHadoopConf().get();
MessageType parquetSchema = new ParquetUtils().readSchema(hadoopConf, filePath);

hadoopConf.set(
SQLConf.PARQUET_BINARY_AS_STRING().key(),
SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
hadoopConf.set(
SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString());
hadoopConf.set(
SQLConf.CASE_SENSITIVE().key(),
SQLConf.CASE_SENSITIVE().defaultValueString());
ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(hadoopConf);

ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(
Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()),
Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()));
StructType sparkSchema = converter.convert(parquetSchema);
String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
String structName = tableName + "_record";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, B
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", writeConfig.parquetFieldIdWriteEnabled());
this.hadoopConf = hadoopConf;
setSchema(structType, hadoopConf);
this.bloomFilter = bloomFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ object HoodieSparkUtils extends SparkAdapterSupport {

def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1"

def isSpark3_3: Boolean = SPARK_VERSION.startsWith("3.3")

def gteqSpark3_3_0: Boolean = SPARK_VERSION >= "3.3.0"

def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.apache.spark.sql.hudi.SparkAdapter
trait SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.isSpark3_2) {
val adapterClass = if (HoodieSparkUtils.gteqSpark3_3_0) {
"org.apache.spark.sql.adapter.Spark3_3Adapter"
} else if (HoodieSparkUtils.isSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
} else if (HoodieSparkUtils.isSpark3_0 || HoodieSparkUtils.isSpark3_1) {
"org.apache.spark.sql.adapter.Spark3_1Adapter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConver
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{AliasIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession}

import java.util.Locale
Expand Down Expand Up @@ -171,12 +170,32 @@ trait SparkAdapter extends Serializable {
}

/**
* Create instance of [[ParquetFileFormat]]
*/
* Create instance of [[ParquetFileFormat]]
*/
def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat]

/**
* Create instance of [[InterpretedPredicate]]
*/
def createInterpretedPredicate(e: Expression): InterpretedPredicate

/**
* Create instance of [[HoodieFileScanRDD]]
*/
def createHoodieFileScanRDD(@transient sparkSession: SparkSession,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient filePartitions: Seq[FilePartition],
readDataSchema: StructType,
metadataColumns: Seq[AttributeReference] = Seq.empty): FileScanRDD

/**
* Get [[DeleteFromTable]]
*/
def getDeleteFromTable(table: LogicalPlan, condition: Option[Expression]): LogicalPlan

/**
* Get parseQuery from ExtendedSqlParser, only Spark 3.3+ use this
*/
def getQueryParserFromExtendedSqlParser(session: SparkSession, delegate: ParserInterface,
sqlText: String): LogicalPlan
}
6 changes: 6 additions & 0 deletions hudi-examples/hudi-examples-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>

<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
</dependency>

<!-- Parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.Utils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down
8 changes: 5 additions & 3 deletions hudi-spark-datasource/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,19 @@ This repo contains the code that integrate Hudi with Spark. The repo is split in

`hudi-spark`
`hudi-spark2`
`hudi-spark3`
`hudi-spark3.1.x`
`hudi-spark3.2.x`
`hudi-spark3`
`hudi-spark2-common`
`hudi-spark3-common`
`hudi-spark-common`

* hudi-spark is the module that contains the code that both spark2 & spark3 version would share, also contains the antlr4
file that supports spark sql on spark 2.x version.
* hudi-spark2 is the module that contains the code that compatible with spark 2.x versions.
* hudi-spark3 is the module that contains the code that compatible with spark 3.2.0(and above) versions。
* hudi-spark3.1.x is the module that contains the code that compatible with spark3.1.x and spark3.0.x version.
* hudi-spark3.1.x is the module that contains the code that compatible with spark3.1.x and spark3.0.x version.
* hudi-spark3.2 is the module that contains the code that compatible with spark 3.2.x versions.
* hudi-spark3 is the module that contains the code that compatible with spark 3.3.x+ versions.
* hudi-spark2-common is the module that contains the code that would be reused between spark2.x versions, right now the module
has no class since hudi only supports spark 2.4.4 version, and it acts as the placeholder when packaging hudi-spark-bundle module.
* hudi-spark3-common is the module that contains the code that would be reused between spark3.x versions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {

case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit

override type FileSplit = HoodieBaseFileSplit

// TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract
Expand Down Expand Up @@ -91,7 +93,10 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
)

new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
// SPARK-37273 FileScanRDD constructor changed in SPARK 3.3
// TODO: Critical change introduced in Spark 3.3, need to test manually
sparkAdapter.createHoodieFileScanRDD(sparkSession, baseFileReader, fileSplits.map(_.filePartition), requiredSchema.structTypeSchema)
.asInstanceOf[HoodieUnsafeRDD]
}

protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,18 @@ object HoodieAnalysis {
val spark3ResolveReferences: RuleBuilder =
session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]]

val spark32ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
val spark32ResolveAlterTableCommands: RuleBuilder =
session => ReflectionUtils.loadClass(spark32ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]]
val resolveAlterTableCommandsClass =
if (HoodieSparkUtils.gteqSpark3_3_0)
"org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark33"
else "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
val resolveAlterTableCommands: RuleBuilder =
session => ReflectionUtils.loadClass(resolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]]

// NOTE: PLEASE READ CAREFULLY
//
// It's critical for this rules to follow in this order, so that DataSource V2 to V1 fallback
// is performed prior to other rules being evaluated
rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, spark32ResolveAlterTableCommands)
rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, resolveAlterTableCommands)

} else if (HoodieSparkUtils.gteqSpark3_1) {
val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312"
Expand Down Expand Up @@ -413,10 +416,19 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// Resolve Delete Table
case DeleteFromTable(table, condition)
if sparkAdapter.isHoodieTable(table, sparkSession) && table.resolved =>
// Resolve condition
val resolvedCondition = condition.map(resolveExpressionFrom(table)(_))
// Return the resolved DeleteTable
DeleteFromTable(table, resolvedCondition)
// SPARK-38626 condition is no longer Option in Spark 3.3
// unwrap Option[Expression] to Expression for earlier versions of Spark
val unwrappedCondition: Expression = condition match {
case option: Option[Expression] => option.getOrElse(null)
case expr: Expression => expr
case _ => throw new IllegalArgumentException(s"condition has to be either Option[Expression] or Expression")
}
if (unwrappedCondition == null) {
sparkAdapter.getDeleteFromTable(table, None)
} else {
// Return the resolved DeleteTable
sparkAdapter.getDeleteFromTable(table, Option(resolveExpressionFrom(table)(unwrappedCondition)))
}

// Append the meta field to the insert query to walk through the validate for the
// number of insert fields with the number of the target table fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig

Expand All @@ -36,9 +37,13 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie

// Remove meta fields from the data frame
var df = removeMetaFields(Dataset.ofRows(sparkSession, table))
if (deleteTable.condition.isDefined) {
df = df.filter(Column(deleteTable.condition.get))
// SPARK-38626 DeleteFromTable.condition is changed from Option[Expression] to Expression in Spark 3.3
val condition: Expression = deleteTable.condition match {
case option: Option[Expression] => option.getOrElse(null)
case expr: Expression => expr
case _ => throw new IllegalArgumentException(s"DeleteFromTable.condition has to be either Option[Expression] or Expression")
}
if (condition != null) df = df.filter(Column(condition))

val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
val config = buildHoodieDeleteTableConfig(hoodieCatalogTable, sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ class HoodieCommonSqlParser(session: SparkSession, delegate: ParserInterface)

override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText)

// SPARK-37266 Added parseQuery to ParserInterface in Spark 3.3.0
// Don't mark this as override for backward compatibility
// Can't use sparkExtendedParser directly here due to the same reason
def parseQuery(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
sparkAdapter.getQueryParserFromExtendedSqlParser(session, delegate, sqlText)
}

def parseRawDataType(sqlText : String) : DataType = {
throw new UnsupportedOperationException(s"Unsupported parseRawDataType method")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
spark.sql(sql)
} catch {
case e: Throwable =>
assertResult(true)(e.getMessage.contains(errorMsg))
hasException = true
if (e.getMessage.contains(errorMsg)) {
hasException = true
} else {
fail("Exception should contain: " + errorMsg + ", error message: " + e.getMessage, e)
}
}
assertResult(true)(hasException)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {

// specify duplicate partition columns
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')")(
"Found duplicate keys 'dt'")
"Found duplicate keys ")

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ class TestCallCommandParser extends HoodieSparkSqlTestBase {
}

test("Test Call Parse Error") {
checkParseExceptionContain("CALL cat.system radish kebab")("mismatched input 'CALL' expecting")
if (HoodieSparkUtils.gteqSpark3_3_0) {
checkParseExceptionContain("CALL cat.system radish kebab")("Syntax error at or near 'CALL'")
} else {
checkParseExceptionContain("CALL cat.system radish kebab")("mismatched input 'CALL' expecting")
}
}

test("Test Call Produce with semicolon") {
Expand Down Expand Up @@ -111,8 +115,11 @@ class TestCallCommandParser extends HoodieSparkSqlTestBase {
parser.parsePlan(sql)
} catch {
case e: Throwable =>
assertResult(true)(e.getMessage.contains(errorMsg))
hasException = true
if (e.getMessage.contains(errorMsg)) {
hasException = true
} else {
fail("Exception should contain: " + errorMsg + ", error message: " + e.getMessage, e)
}
}
assertResult(true)(hasException)
}
Expand Down
Loading

0 comments on commit 4eff788

Please sign in to comment.