Skip to content

Commit

Permalink
[HUDI-3896] Porting Nested Schema Pruning optimization for Hudi's cus…
Browse files Browse the repository at this point in the history
…tom Relations (#5428)

Currently, all Hudi Relations bear performance gap relative to Spark's HadoopFsRelation 
and the reason to that is SchemaPruning optimization rule (pruning nested schemas) 
that is unfortunately predicated on usage of HadoopFsRelation, meaning that it's 
not applied in cases when any other relation is used.

This change is porting this rule to Hudi relations (MOR, Incremental, etc) 
by the virtue of leveraging HoodieSparkSessionExtensions mechanism 
injecting modified version of the original SchemaPruning rule 
that is adopted to work w/ Hudi's custom relations.

- Added customOptimizerRules to HoodieAnalysis
- Added NestedSchemaPrunning Spark's Optimizer rule
- Handle Spark's Optimizer pruned data schema (to effectively prune nested schemas)
- Enable HoodieClientTestHarness to inject HoodieSparkSessionExtensions
- Injecting Spark Session extensions for TestMORDataSource, TestCOWDataSource
- Disabled fallback to HadoopFsRelation
  • Loading branch information
Alexey Kudinkin authored Jul 21, 2022
1 parent 2394c62 commit de37774
Show file tree
Hide file tree
Showing 42 changed files with 1,221 additions and 501 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,24 @@ import org.apache.hudi.avro.HoodieAvroUtils

import scala.collection.JavaConverters._

object HoodieSparkUtils extends SparkAdapterSupport {

def isSpark2: Boolean = SPARK_VERSION.startsWith("2.")

def isSpark3: Boolean = SPARK_VERSION.startsWith("3.")

def isSpark3_0: Boolean = SPARK_VERSION.startsWith("3.0")

def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1")

def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1"

def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3"

def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2")
private[hudi] trait SparkVersionsSupport {
def getSparkVersion: String

def isSpark2: Boolean = getSparkVersion.startsWith("2.")
def isSpark3: Boolean = getSparkVersion.startsWith("3.")
def isSpark3_0: Boolean = getSparkVersion.startsWith("3.0")
def isSpark3_1: Boolean = getSparkVersion.startsWith("3.1")
def isSpark3_2: Boolean = getSparkVersion.startsWith("3.2")

def gteqSpark3_1: Boolean = getSparkVersion >= "3.1"
def gteqSpark3_1_3: Boolean = getSparkVersion >= "3.1.3"
def gteqSpark3_2: Boolean = getSparkVersion >= "3.2"
def gteqSpark3_2_1: Boolean = getSparkVersion >= "3.2.1"
}

def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2"
object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {

def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1"
override def getSparkVersion: String = SPARK_VERSION

def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
Expand Down Expand Up @@ -268,15 +267,15 @@ object HoodieSparkUtils extends SparkAdapterSupport {
case StringStartsWith(attribute, value) =>
val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"$value%")
sparkAdapter.createLike(leftExp, rightExp)
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
case StringEndsWith(attribute, value) =>
val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"%$value")
sparkAdapter.createLike(leftExp, rightExp)
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
case StringContains(attribute, value) =>
val leftExp = toAttribute(attribute, tableSchema)
val rightExp = Literal.create(s"%$value%")
sparkAdapter.createLike(leftExp, rightExp)
sparkAdapter.getCatalystPlanUtils.createLike(leftExp, rightExp)
case _ => null
}
)
Expand Down Expand Up @@ -318,38 +317,4 @@ object HoodieSparkUtils extends SparkAdapterSupport {
s"${tableSchema.fieldNames.mkString(",")}")
AttributeReference(columnName, field.get.dataType, field.get.nullable)()
}

def getRequiredSchema(tableAvroSchema: Schema, requiredColumns: Array[String], internalSchema: InternalSchema = InternalSchema.getEmptyInternalSchema): (Schema, StructType, InternalSchema) = {
if (internalSchema.isEmptySchema || requiredColumns.isEmpty) {
// First get the required avro-schema, then convert the avro-schema to spark schema.
val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap
// Here have to create a new Schema.Field object
// to prevent throwing exceptions like "org.apache.avro.AvroRuntimeException: Field already used".
val requiredFields = requiredColumns.map(c => name2Fields(c))
.map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList
val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc,
tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema, internalSchema)
} else {
// now we support nested project
val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava)
val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName)
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
(requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
}
}

def toAttribute(tableSchema: StructType): Seq[AttributeReference] = {
tableSchema.map { field =>
AttributeReference(field.name, field.dataType, field.nullable, field.metadata)()
}
}

def collectFieldIndexes(projectedSchema: StructType, originalSchema: StructType): Seq[Int] = {
val nameToIndex = originalSchema.fields.zipWithIndex.map{ case (field, index) =>
field.name -> index
}.toMap
projectedSchema.map(field => nameToIndex(field.name))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}

trait HoodieCatalystPlansUtils {

def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan

/**
* Convert a AliasIdentifier to TableIdentifier.
*/
def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier

/**
* Convert a UnresolvedRelation to TableIdentifier.
*/
def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier

/**
* Create Join logical plan.
*/
def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join

/**
* Test if the logical plan is a Insert Into LogicalPlan.
*/
def isInsertInto(plan: LogicalPlan): Boolean

/**
* Get the member of the Insert Into LogicalPlan.
*/
def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]

/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
def isRelationTimeTravel(plan: LogicalPlan): Boolean

/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])]

/**
* Create a Insert Into LogicalPlan.
*/
def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan

/**
* Create Like expression.
*/
def createLike(left: Expression, right: Expression): Expression

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ package org.apache.spark.sql.hudi
import org.apache.avro.Schema
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, Row, SparkSession}
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SparkSession}

import java.util.Locale

Expand All @@ -45,9 +43,15 @@ trait SparkAdapter extends Serializable {

/**
* Creates instance of [[HoodieCatalystExpressionUtils]] providing for common utils operating
* on Catalyst Expressions
* on Catalyst [[Expression]]s
*/
def createCatalystExpressionUtils(): HoodieCatalystExpressionUtils
def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils

/**
* Creates instance of [[HoodieCatalystPlansUtils]] providing for common utils operating
* on Catalyst [[LogicalPlan]]s
*/
def getCatalystPlanUtils: HoodieCatalystPlansUtils

/**
* Creates instance of [[HoodieAvroSerializer]] providing for ability to serialize
Expand All @@ -71,48 +75,6 @@ trait SparkAdapter extends Serializable {
*/
def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe

/**
* Convert a AliasIdentifier to TableIdentifier.
*/
def toTableIdentifier(aliasId: AliasIdentifier): TableIdentifier

/**
* Convert a UnresolvedRelation to TableIdentifier.
*/
def toTableIdentifier(relation: UnresolvedRelation): TableIdentifier

/**
* Create Join logical plan.
*/
def createJoin(left: LogicalPlan, right: LogicalPlan, joinType: JoinType): Join

/**
* Test if the logical plan is a Insert Into LogicalPlan.
*/
def isInsertInto(plan: LogicalPlan): Boolean

/**
* Get the member of the Insert Into LogicalPlan.
*/
def getInsertIntoChildren(plan: LogicalPlan):
Option[(LogicalPlan, Map[String, Option[String]], LogicalPlan, Boolean, Boolean)]

/**
* if the logical plan is a TimeTravelRelation LogicalPlan.
*/
def isRelationTimeTravel(plan: LogicalPlan): Boolean

/**
* Get the member of the TimeTravelRelation LogicalPlan.
*/
def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])]

/**
* Create a Insert Into LogicalPlan.
*/
def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan

/**
* Create the hoodie's extended spark sql parser.
*/
Expand All @@ -123,11 +85,6 @@ trait SparkAdapter extends Serializable {
*/
def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil

/**
* Create Like expression.
*/
def createLike(left: Expression, right: Expression): Expression

/**
* ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
*/
Expand All @@ -143,7 +100,7 @@ trait SparkAdapter extends Serializable {
unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation =>
isHoodieTable(toTableIdentifier(relation), spark)
isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
case _=> false
}
}
Expand Down Expand Up @@ -177,6 +134,8 @@ trait SparkAdapter extends Serializable {

/**
* Create instance of [[InterpretedPredicate]]
*
* TODO move to HoodieCatalystExpressionUtils
*/
def createInterpretedPredicate(e: Expression): InterpretedPredicate
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected void initSparkContexts(String appName) {

if (sparkSessionExtensionsInjector.isPresent()) {
// In case we need to inject extensions into Spark Session, we have
// to stop any session that might still be active and since Spark will try
// to stop any session that might still be active, since Spark will try
// to re-use it
HoodieConversionUtils.toJavaOption(SparkSession.getActiveSession())
.ifPresent(SparkSession::stop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private static Map<String, String> defaultConf() {
Map<String, String> additionalConfigs = new HashMap<>();
additionalConfigs.put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
additionalConfigs.put("spark.kryoserializer.buffer.max", "512m");
additionalConfigs.put("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
return additionalConfigs;
}

Expand Down
Loading

0 comments on commit de37774

Please sign in to comment.