Skip to content

Commit

Permalink
Support Spark 3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
CTTY committed Jul 14, 2022
1 parent 51244eb commit 925cca4
Show file tree
Hide file tree
Showing 78 changed files with 10,404 additions and 88 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ jobs:
flinkProfile: "flink1.13"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.1"
flinkProfile: "flink1.14"

- scalaProfile: "scala-2.12"
sparkProfile: "spark3.2"
sparkProfile: "spark3.3"
flinkProfile: "flink1.14"

steps:
Expand Down
10 changes: 10 additions & 0 deletions hudi-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ public class HoodieStorageConfig extends HoodieConfig {
.defaultValue("TIMESTAMP_MICROS")
.withDocumentation("Sets spark.sql.parquet.outputTimestampType. Parquet timestamp type to use when Spark writes data to Parquet files.");

public static final ConfigProperty<String> PARQUET_FIELD_ID_WRITE_ENABLED = ConfigProperty
.key("hoodie.parquet.fieldId.write.enabled")
.defaultValue("true")
.withDocumentation("Sets spark.sql.parquet.fieldId.write.enabled. "
+ "If enabled, Spark will write out parquet native field ids that are stored inside StructField's metadata as parquet.field.id to parquet files.");

public static final ConfigProperty<String> HFILE_COMPRESSION_ALGORITHM_NAME = ConfigProperty
.key("hoodie.hfile.compression.algorithm")
.defaultValue("GZ")
Expand Down Expand Up @@ -337,6 +343,11 @@ public Builder parquetOutputTimestampType(String parquetOutputTimestampType) {
return this;
}

public Builder parquetFieldIdWrite(String parquetFieldIdWrite) {
storageConfig.setValue(PARQUET_FIELD_ID_WRITE_ENABLED, parquetFieldIdWrite);
return this;
}

public Builder hfileCompressionAlgorithm(String hfileCompressionAlgorithm) {
storageConfig.setValue(HFILE_COMPRESSION_ALGORITHM_NAME, hfileCompressionAlgorithm);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,10 @@ public String parquetOutputTimestampType() {
return getString(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE);
}

public String parquetFieldIdWriteEnabled() {
return getString(HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED);
}

public Option<HoodieLogBlock.HoodieLogBlockType> getLogDataBlockFormat() {
return Option.ofNullable(getString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT))
.map(HoodieLogBlock.HoodieLogBlockType::fromId);
Expand Down
10 changes: 10 additions & 0 deletions hudi-client/hudi-spark-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client.bootstrap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
Expand Down Expand Up @@ -71,11 +72,20 @@ protected Schema getBootstrapSourceSchema(HoodieEngineContext context, List<Pair
}

private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
MessageType parquetSchema = new ParquetUtils().readSchema(context.getHadoopConf().get(), filePath);
Configuration hadoopConf = context.getHadoopConf().get();
MessageType parquetSchema = new ParquetUtils().readSchema(hadoopConf, filePath);

hadoopConf.set(
SQLConf.PARQUET_BINARY_AS_STRING().key(),
SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
hadoopConf.set(
SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString());
hadoopConf.set(
SQLConf.CASE_SENSITIVE().key(),
SQLConf.CASE_SENSITIVE().defaultValueString());
ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(hadoopConf);

ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(
Boolean.parseBoolean(SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()),
Boolean.parseBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()));
StructType sparkSchema = converter.convert(parquetSchema);
String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
String structName = tableName + "_record";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, B
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", writeConfig.parquetFieldIdWriteEnabled());
this.hadoopConf = hadoopConf;
setSchema(structType, hadoopConf);
this.bloomFilter = bloomFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ object HoodieSparkUtils extends SparkAdapterSupport {

def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1"

def isSpark3_3: Boolean = SPARK_VERSION.startsWith("3.3")

def gteqSpark3_3_0: Boolean = SPARK_VERSION >= "3.3.0"

def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.apache.spark.sql.hudi.SparkAdapter
trait SparkAdapterSupport {

lazy val sparkAdapter: SparkAdapter = {
val adapterClass = if (HoodieSparkUtils.isSpark3_2) {
val adapterClass = if (HoodieSparkUtils.gteqSpark3_3_0) {
"org.apache.spark.sql.adapter.Spark3_3Adapter"
} else if (HoodieSparkUtils.isSpark3_2) {
"org.apache.spark.sql.adapter.Spark3_2Adapter"
} else if (HoodieSparkUtils.isSpark3_0 || HoodieSparkUtils.isSpark3_1) {
"org.apache.spark.sql.adapter.Spark3_1Adapter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConver
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{AliasIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession}

import java.util.Locale
Expand Down Expand Up @@ -179,4 +178,24 @@ trait SparkAdapter extends Serializable {
* Create instance of [[InterpretedPredicate]]
*/
def createInterpretedPredicate(e: Expression): InterpretedPredicate

/**
* Create instance of [[HoodieFileScanRDD]]
* */
def createHoodieFileScanRDD(@transient sparkSession: SparkSession,
readFunction: PartitionedFile => Iterator[InternalRow],
@transient filePartitions: Seq[FilePartition],
readDataSchema: StructType,
metadataColumns: Seq[AttributeReference] = Seq.empty): FileScanRDD

/**
* Get [[DeleteFromTable]]
* */
def getDeleteFromTable(table: LogicalPlan, condition: Option[Expression]): LogicalPlan

/**
* Get parseQuery from ExtendedSqlParser, only Spark 3.3+ use this
* */
def getQueryParserFromExtendedSqlParser(session: SparkSession, delegate: ParserInterface,
sqlText: String): LogicalPlan
}
10 changes: 10 additions & 0 deletions hudi-examples/hudi-examples-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.Utils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand Down
4 changes: 3 additions & 1 deletion hudi-spark-datasource/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This repo contains the code that integrate Hudi with Spark. The repo is split in
`hudi-spark`
`hudi-spark2`
`hudi-spark3`
`hudi-spark3.2`
`hudi-spark3.1.x`
`hudi-spark2-common`
`hudi-spark3-common`
Expand All @@ -30,7 +31,8 @@ This repo contains the code that integrate Hudi with Spark. The repo is split in
* hudi-spark is the module that contains the code that both spark2 & spark3 version would share, also contains the antlr4
file that supports spark sql on spark 2.x version.
* hudi-spark2 is the module that contains the code that compatible with spark 2.x versions.
* hudi-spark3 is the module that contains the code that compatible with spark 3.2.0(and above) versions。
* hudi-spark3 is the module that contains the code that compatible with spark 3.3.x+ versions.
* hudi-spark3.2 is the module that contains the code that compatible with spark 3.2.x versions.
* hudi-spark3.1.x is the module that contains the code that compatible with spark3.1.x and spark3.0.x version.
* hudi-spark2-common is the module that contains the code that would be reused between spark2.x versions, right now the module
has no class since hudi only supports spark 2.4.4 version, and it acts as the placeholder when packaging hudi-spark-bundle module.
Expand Down
8 changes: 8 additions & 0 deletions hudi-spark-datasource/hudi-spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,14 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {

case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit

override type FileSplit = HoodieBaseFileSplit

// TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract
Expand Down Expand Up @@ -91,7 +93,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
hadoopConf = HoodieDataSourceHelper.getConfigurationWithInternalSchema(new Configuration(conf), requiredSchema.internalSchema, metaClient.getBasePath, validCommits)
)

new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
// TODO: which schema to use here?
sparkAdapter.createHoodieFileScanRDD(sparkSession, baseFileReader, fileSplits.map(_.filePartition), dataSchema.structTypeSchema)
.asInstanceOf[HoodieUnsafeRDD]
}

protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
Expand Down
8 changes: 8 additions & 0 deletions hudi-spark-datasource/hudi-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@
<groupId>javax.servlet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,18 @@ object HoodieAnalysis {
val spark3ResolveReferences: RuleBuilder =
session => ReflectionUtils.loadClass(spark3ResolveReferencesClass, session).asInstanceOf[Rule[LogicalPlan]]

val spark32ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
val spark32ResolveAlterTableCommands: RuleBuilder =
session => ReflectionUtils.loadClass(spark32ResolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]]
val resolveAlterTableCommandsClass =
if (HoodieSparkUtils.gteqSpark3_3_0)
"org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark33"
else "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommandSpark32"
val resolveAlterTableCommands: RuleBuilder =
session => ReflectionUtils.loadClass(resolveAlterTableCommandsClass, session).asInstanceOf[Rule[LogicalPlan]]

// NOTE: PLEASE READ CAREFULLY
//
// It's critical for this rules to follow in this order, so that DataSource V2 to V1 fallback
// is performed prior to other rules being evaluated
rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, spark32ResolveAlterTableCommands)
rules ++= Seq(dataSourceV2ToV1Fallback, spark3Analysis, spark3ResolveReferences, resolveAlterTableCommands)

} else if (HoodieSparkUtils.gteqSpark3_1) {
val spark31ResolveAlterTableCommandsClass = "org.apache.spark.sql.hudi.ResolveHudiAlterTableCommand312"
Expand Down Expand Up @@ -413,10 +416,13 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
// Resolve Delete Table
case DeleteFromTable(table, condition)
if sparkAdapter.isHoodieTable(table, sparkSession) && table.resolved =>
// Resolve condition
val resolvedCondition = condition.map(resolveExpressionFrom(table)(_))
val unwrappedCondition: Expression = condition match {
case option: Option[Expression] => option.getOrElse(null)
case expr: Expression => expr
case _ => throw new IllegalArgumentException(s"condition has to be either Option[Expression] or Expression")
}
// Return the resolved DeleteTable
DeleteFromTable(table, resolvedCondition)
sparkAdapter.getDeleteFromTable(table, Option(resolveExpressionFrom(table)(unwrappedCondition)))

// Append the meta field to the insert query to walk through the validate for the
// number of insert fields with the number of the target table fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hudi.SparkAdapterSupport
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig

Expand All @@ -36,9 +37,13 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Hoodie

// Remove meta fields from the data frame
var df = removeMetaFields(Dataset.ofRows(sparkSession, table))
if (deleteTable.condition.isDefined) {
df = df.filter(Column(deleteTable.condition.get))
// SPARK-38626 DeleteFromTable.condition is changed from Option[Expression] to Expression in Spark 3.3
val condition: Expression = deleteTable.condition match {
case option: Option[Expression] => option.get
case expr: Expression => expr
case _ => throw new IllegalArgumentException(s"DeleteFromTable.condition has to be either Option[Expression] or Expression")
}
df = df.filter(Column(condition))

val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId)
val config = buildHoodieDeleteTableConfig(hoodieCatalogTable, sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ class HoodieCommonSqlParser(session: SparkSession, delegate: ParserInterface)

override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText)

// SPARK-37266 Added parseQuery to ParserInterface in Spark 3.3.0
// Don't mark this as override for backward compatibility
// Can't use sparkExtendedParser directly here due to the same reason
def parseQuery(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
sparkAdapter.getQueryParserFromExtendedSqlParser(session, delegate, sqlText)
}

def parseRawDataType(sqlText : String) : DataType = {
throw new UnsupportedOperationException(s"Unsupported parseRawDataType method")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
spark.sql(sql)
} catch {
case e: Throwable =>
assertResult(true)(e.getMessage.contains(errorMsg))
hasException = true
if (e.getMessage.contains(errorMsg)) {
hasException = true
} else {
fail("Exception should contain: " + errorMsg + ", error message: " + e.getMessage, e)
}
}
assertResult(true)(hasException)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {

// specify duplicate partition columns
checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01', dt='2021-10-02')")(
"Found duplicate keys 'dt'")
"Found duplicate keys ")

// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")
Expand Down
Loading

0 comments on commit 925cca4

Please sign in to comment.