From fd8f7c5f6c0bbfbed829f5d1a1d250ca4752d503 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 11 Jun 2022 14:46:43 -0700 Subject: [PATCH] [HUDI-4205] Fix NullPointerException in HFile reader creation (#5841) Replace SerializableConfiguration with SerializableWritable for broadcasting the hadoop configuration before initializing HFile readers --- .../main/scala/org/apache/hudi/HoodieBaseRelation.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 47e391a560a22..43a2d72733eb1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -34,9 +34,10 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkState -import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter +import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.hudi.io.storage.HoodieHFileReader +import org.apache.spark.SerializableWritable import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -535,11 +536,10 @@ object HoodieBaseRelation extends SparkAdapterSupport { filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - val hadoopConfBroadcast = - spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) partitionedFile => { - val hadoopConf = hadoopConfBroadcast.value.get() + val hadoopConf = hadoopConfBroadcast.value.value val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath), new CacheConfig(hadoopConf))