Skip to content

Commit

Permalink
[HUDI-4205] Fix NullPointerException in HFile reader creation (apache…
Browse files Browse the repository at this point in the history
…#5841)

Replace SerializableConfiguration with SerializableWritable for broadcasting the hadoop configuration before initializing HFile readers

(cherry picked from commit fd8f7c5)
  • Loading branch information
yihua authored and XuQianJin-Stars committed Jun 13, 2022
1 parent 1c46c72 commit 4925ba2
Showing 1 changed file with 36 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.SerializableWritable
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -74,7 +75,7 @@ case class HoodieTableState(tablePath: String,
abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient,
val optParams: Map[String, String],
userSchema: Option[StructType])
schemaSpec: Option[StructType])
extends BaseRelation
with FileRelation
with PrunedFilteredScan
Expand Down Expand Up @@ -128,24 +129,28 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected lazy val (tableAvroSchema: Schema, internalSchema: InternalSchema) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val avroSchema = Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logWarning("Failed to fetch schema from the table", e)
// If there is no commit in the table, we can't get the schema
// t/h [[TableSchemaResolver]], fallback to the provided [[userSchema]] instead.
userSchema match {
case Some(s) => convertToAvroSchema(s)
case _ => throw new IllegalArgumentException("User-provided schema is required in case the table is empty")
}
val avroSchema: Schema = schemaSpec.map(convertToAvroSchema).getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
logError("Failed to fetch schema from the table", e)
throw new HoodieSchemaException("Failed to fetch schema from the table")
}
}
// try to find internalSchema
val internalSchemaFromMeta = try {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
} catch {
case _: Exception => InternalSchema.getEmptyInternalSchema

val internalSchema: InternalSchema = if (!isSchemaEvolutionEnabled) {
InternalSchema.getEmptyInternalSchema
} else {
Try(schemaResolver.getTableInternalSchemaFromCommitMetadata) match {
case Success(internalSchemaOpt) =>
toScalaOption(internalSchemaOpt).getOrElse(InternalSchema.getEmptyInternalSchema)
case Failure(e) =>
logWarning("Failed to fetch internal-schema from the table", e)
InternalSchema.getEmptyInternalSchema
}
}
(avroSchema, internalSchemaFromMeta)

(avroSchema, internalSchema)
}

protected lazy val tableStructSchema: StructType = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
Expand Down Expand Up @@ -175,7 +180,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected val shouldExtractPartitionValuesFromPartitionPath: Boolean = {
// Controls whether partition columns (which are the source for the partition path values) should
// be omitted from persistence in the data files. On the read path it affects whether partition values (values
// of partition columns) will be read from the data file ot extracted from partition path
// of partition columns) will be read from the data file or extracted from partition path
val shouldOmitPartitionColumns = metaClient.getTableConfig.shouldDropPartitionColumns && partitionColumns.nonEmpty
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
Expand Down Expand Up @@ -428,7 +433,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}
} catch {
case NonFatal(e) =>
logWarning(s"Failed to get the right partition InternalRow for file : ${file.toString}")
logWarning(s"Failed to get the right partition InternalRow for file: ${file.toString}", e)
InternalRow.empty
}
}
Expand Down Expand Up @@ -503,6 +508,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

private def prunePartitionColumns(dataStructSchema: StructType): StructType =
StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name)))

private def isSchemaEvolutionEnabled = {
// NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as
// t/h Spark Session configuration (for ex, for Spark SQL)
optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean ||
sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
}
}

object HoodieBaseRelation extends SparkAdapterSupport {
Expand All @@ -522,11 +536,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val hadoopConfBroadcast =
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf))

partitionedFile => {
val hadoopConf = hadoopConfBroadcast.value.get()
val hadoopConf = hadoopConfBroadcast.value.value
val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath),
new CacheConfig(hadoopConf))

Expand Down

0 comments on commit 4925ba2

Please sign in to comment.