Skip to content

Commit

Permalink
Fixed reading t/h globbed paths to properly handle case of partitione…
Browse files Browse the repository at this point in the history
…d tables not using Hive-style partitioning
  • Loading branch information
Alexey Kudinkin committed Jul 19, 2022
1 parent 0caadf9 commit 10bc802
Showing 1 changed file with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -148,6 +149,15 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key)
val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq())

// NOTE: Spark is able to infer partitioning values from partition path only when Hive-style partitioning
// scheme is used. Therefore, we fallback to reading the table as non-partitioned (specifying
// partitionColumns = Seq.empty) whenever Hive-style partitioning is not involved
val partitionColumns: Seq[String] = if (tableConfig.getHiveStylePartitioningEnable.toBoolean) {
this.partitionColumns
} else {
Seq.empty
}

DataSource.apply(
sparkSession = sparkSession,
paths = extraReadPaths,
Expand All @@ -162,9 +172,15 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
//
// We rely on [[HoodieROTablePathFilter]], to do proper filtering to assure that
"mapreduce.input.pathFilter.class" -> classOf[HoodieROTablePathFilter].getName,

// We have to override [[EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH]] setting, since
// the relation might have this setting overridden
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key -> shouldExtractPartitionValuesFromPartitionPath.toString
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key -> shouldExtractPartitionValuesFromPartitionPath.toString,

// NOTE: We have to specify table's base-path explicitly, since we're requesting Spark to read it as a
// list of globbed paths which complicates partitioning discovery for Spark.
// Please check [[PartitioningAwareFileIndex#basePaths]] comment for more details.
PartitioningAwareFileIndex.BASE_PATH_PARAM -> metaClient.getBasePathV2.toString
),
partitionColumns = partitionColumns
)
Expand Down

0 comments on commit 10bc802

Please sign in to comment.