Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2456] support 'show partitions' sql #3693

Merged
merged 6 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class KeyGenUtils {
protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";

protected static final String DEFAULT_PARTITION_PATH = "default";
protected static final String HUDI_DEFAULT_PARTITION_PATH = PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";

/**
Expand Down Expand Up @@ -121,8 +121,8 @@ public static String getRecordPartitionPath(GenericRecord record, List<String> p
for (String partitionPathField : partitionPathFields) {
String fieldVal = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
if (fieldVal == null || fieldVal.isEmpty()) {
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + DEFAULT_PARTITION_PATH
: DEFAULT_PARTITION_PATH);
partitionPath.append(hiveStylePartitioning ? partitionPathField + "=" + HUDI_DEFAULT_PARTITION_PATH
: HUDI_DEFAULT_PARTITION_PATH);
} else {
if (encodePartitionPath) {
fieldVal = PartitionPathEncodeUtils.escapePathName(fieldVal);
Expand All @@ -147,7 +147,7 @@ public static String getPartitionPath(GenericRecord record, String partitionPath
boolean hiveStylePartitioning, boolean encodePartitionPath) {
String partitionPath = HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathField, true);
if (partitionPath == null || partitionPath.isEmpty()) {
partitionPath = DEFAULT_PARTITION_PATH;
partitionPath = HUDI_DEFAULT_PARTITION_PATH;
}
if (encodePartitionPath) {
partitionPath = PartitionPathEncodeUtils.escapePathName(partitionPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

import scala.Option;

import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
Expand Down Expand Up @@ -104,11 +104,11 @@ public static String getPartitionPathFromRow(Row row, List<String> partitionPath
Integer fieldPos = fieldPositions.get(0);
// for partition path, if field is not found, index will be set to -1
if (fieldPos == -1 || row.isNullAt(fieldPos)) {
val = DEFAULT_PARTITION_PATH;
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
val = row.getAs(field).toString();
if (val.isEmpty()) {
val = DEFAULT_PARTITION_PATH;
val = HUDI_DEFAULT_PARTITION_PATH;
}
}
if (hiveStylePartitioning) {
Expand All @@ -117,7 +117,7 @@ public static String getPartitionPathFromRow(Row row, List<String> partitionPath
} else { // nested
Object nestedVal = getNestedFieldVal(row, partitionPathPositions.get(field));
if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
val = hiveStylePartitioning ? field + "=" + DEFAULT_PARTITION_PATH : DEFAULT_PARTITION_PATH;
val = hiveStylePartitioning ? field + "=" + HUDI_DEFAULT_PARTITION_PATH : HUDI_DEFAULT_PARTITION_PATH;
} else {
val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : nestedVal.toString();
}
Expand All @@ -137,11 +137,11 @@ public static String getPartitionPathFromInternalRow(InternalRow row, List<Strin
Integer fieldPos = fieldPositions.get(0);
// for partition path, if field is not found, index will be set to -1
if (fieldPos == -1 || row.isNullAt(fieldPos)) {
val = DEFAULT_PARTITION_PATH;
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
Object value = row.get(fieldPos, partitionPathDataTypes.get(field).get(0));
if (value == null || value.toString().isEmpty()) {
val = DEFAULT_PARTITION_PATH;
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
val = value.toString();
}
Expand All @@ -161,11 +161,11 @@ public static Object getFieldValFromInternalRow(InternalRow internalRow,
DataType partitionPathDataType) {
Object val = null;
if (internalRow.isNullAt(partitionPathPosition)) {
return DEFAULT_PARTITION_PATH;
return HUDI_DEFAULT_PARTITION_PATH;
} else {
Object value = partitionPathDataType == DataTypes.StringType ? internalRow.getString(partitionPathPosition) : internalRow.get(partitionPathPosition, partitionPathDataType);
if (value == null || value.toString().isEmpty()) {
val = DEFAULT_PARTITION_PATH;
val = HUDI_DEFAULT_PARTITION_PATH;
} else {
val = value;
}
Expand Down Expand Up @@ -197,7 +197,7 @@ public static Object getFieldValFromInternalRow(InternalRow internalRow,
*/
public static Object getNestedFieldVal(Row row, List<Integer> positions) {
if (positions.size() == 1 && positions.get(0) == -1) {
return DEFAULT_PARTITION_PATH;
return HUDI_DEFAULT_PARTITION_PATH;
}
int index = 0;
int totalCount = positions.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import java.io.IOException;

import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;

Expand Down Expand Up @@ -85,7 +85,7 @@ public String getPartitionPath(InternalRow internalRow, StructType structType) {
private String getTimestampBasedPartitionPath(Object partitionPathFieldVal) {
Object fieldVal = null;
try {
if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(HUDI_DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
|| partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
fieldVal = timestampBasedAvroKeyGenerator.getDefaultPartitionVal();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

import java.util.stream.Stream;

import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;

public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getCommonProps() {
Expand Down Expand Up @@ -108,9 +108,9 @@ public void testWrongRecordKeyField() {
public void testWrongPartitionPathField() {
SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getWrongPartitionPathFieldProps());
GenericRecord record = getRecord();
Assertions.assertEquals(keyGenerator.getPartitionPath(record), KeyGenUtils.DEFAULT_PARTITION_PATH);
Assertions.assertEquals(keyGenerator.getPartitionPath(record), KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH);
Assertions.assertEquals(keyGenerator.getPartitionPath(KeyGeneratorTestUtilities.getRow(record)),
KeyGenUtils.DEFAULT_PARTITION_PATH);
KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH);
}

@Test
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testNestedPartitionPathField(GenericRecord nestedColRecord) {
partitionPathFieldValue = (String) nestedColRecord.get("prop1");
}
String expectedPartitionPath = "nested_col.prop1="
+ (partitionPathFieldValue != null && !partitionPathFieldValue.isEmpty() ? partitionPathFieldValue : DEFAULT_PARTITION_PATH);
+ (partitionPathFieldValue != null && !partitionPathFieldValue.isEmpty() ? partitionPathFieldValue : HUDI_DEFAULT_PARTITION_PATH);
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals("key1", key.getRecordKey());
Assertions.assertEquals(expectedPartitionPath, key.getPartitionPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
*/
public class PartitionPathEncodeUtils {

public static final String DEFAULT_PARTITION_PATH = "default";

static BitSet charToEscape = new BitSet(128);
static {
for (char c = 0; c < ' '; c++) {
Expand Down Expand Up @@ -64,14 +66,11 @@ public static String escapePathName(String path) {
* @return An escaped path name.
*/
public static String escapePathName(String path, String defaultPath) {

// __HIVE_DEFAULT_NULL__ is the system default value for null and empty string.
// TODO: we should allow user to specify default partition or HDFS file location.
if (path == null || path.length() == 0) {
if (defaultPath == null) {
//previously, when path is empty or null and no default path is specified,
// __HIVE_DEFAULT_PARTITION__ was the return value for escapePathName
return "__HIVE_DEFAULT_PARTITION__";
// previously, when path is empty or null and no default path is specified,
// "default" was the return value for escapePathName
return DEFAULT_PARTITION_PATH;
} else {
return defaultPath;
}
Expand Down Expand Up @@ -111,4 +110,12 @@ public static String unescapePathName(String path) {
}
return sb.toString();
}

public static String escapePartitionValue(String value) {
if (value == null || value.isEmpty()) {
return DEFAULT_PARTITION_PATH;
} else {
return escapePathName(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ package org.apache.spark.sql.hudi

import scala.collection.JavaConverters._
import java.net.URI
import java.util.{Date, Locale}
import java.util.{Date, Locale, Properties}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
Expand All @@ -36,6 +40,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expressi
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, MergeIntoTable, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType}

import java.text.SimpleDateFormat
Expand Down Expand Up @@ -80,6 +85,16 @@ object HoodieSqlUtils extends SparkAdapterSupport {
.asInstanceOf[StructType]).map(removeMetaFields)
}

def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val metadataConfig = {
val properties = new Properties()
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, HoodieSqlUtils.getTableLocation(table, spark)).asScala
}

private def tripAlias(plan: LogicalPlan): LogicalPlan = {
plan match {
case SubqueryAlias(_, relation: LogicalPlan) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, CompactionPath, CompactionShowOnPath, CompactionShowOnTable, CompactionTable, DeleteAction, DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRenameCommand, CreateDataSourceTableCommand, TruncateTableCommand}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.hudi.command.{AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, AlterHoodieTableRenameCommand, CompactionHoodiePathCommand, CompactionHoodieTableCommand, CompactionShowHoodiePathCommand, CompactionShowHoodieTableCommand, CreateHoodieTableAsSelectCommand, CreateHoodieTableCommand, DeleteHoodieTableCommand, InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, TruncateHoodieTableCommand, UpdateHoodieTableCommand}
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.types.StringType

object HoodieAnalysis {
Expand Down Expand Up @@ -417,6 +417,9 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case AlterTableChangeColumnCommand(tableName, columnName, newColumn)
if isHoodieTable(tableName, sparkSession) =>
AlterHoodieTableChangeColumnCommand(tableName, columnName, newColumn)
case ShowPartitionsCommand(tableName, specOpt)
if isHoodieTable(tableName, sparkSession) =>
ShowHoodieTablePartitionsCommand(tableName, specOpt)
// Rewrite TruncateTableCommand to TruncateHoodieTableCommand
case TruncateTableCommand(tableName, partitionSpec)
if isHoodieTable(tableName, sparkSession) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@ package org.apache.spark.sql.hudi.command

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ValidationUtils
import org.apache.hudi.hadoop.HoodieParquetInputFormat
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -129,9 +125,9 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
(addMetaFields(tableSchema.get), options)
} else if (userSpecifiedSchema.nonEmpty) {
(addMetaFields(userSpecifiedSchema), options)
} else {
} else {
throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName")
}
}
} else {
assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName")
// SPARK-19724: the default location of a managed table should be non-existent or empty.
Expand Down Expand Up @@ -319,16 +315,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
}
}

private def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = {
val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
val metadataConfig = {
val properties = new Properties()
properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties).asJava)
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
}
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala
}

/**
* This method is used to compatible with the old non-hive-styled partition table.
* By default we enable the "hoodie.datasource.write.hive_style_partitioning"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command

import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.types.StringType

/**
* Command for show hudi table's partitions.
*/
case class ShowHoodieTablePartitionsCommand(
tableName: TableIdentifier,
specOpt: Option[TablePartitionSpec])
extends RunnableCommand {

override val output: Seq[Attribute] = {
AttributeReference("partition", StringType, nullable = false)() :: Nil
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val resolver = sparkSession.sessionState.conf.resolver
val catalogTable = catalog.getTableMetadata(tableName)
val tablePath = getTableLocation(catalogTable, sparkSession)

val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
.setConf(hadoopConf).build()
val schemaOpt = getTableSqlSchema(metaClient)
val partitionColumnNamesOpt = metaClient.getTableConfig.getPartitionFields
if (partitionColumnNamesOpt.isPresent && partitionColumnNamesOpt.get.nonEmpty
&& schemaOpt.isDefined && schemaOpt.nonEmpty) {

val partitionColumnNames = partitionColumnNamesOpt.get
val schema = schemaOpt.get
val allPartitionPaths: Seq[String] = getAllPartitionPaths(sparkSession, catalogTable)

if (specOpt.isEmpty) {
allPartitionPaths.map(Row(_))
} else {
val spec = specOpt.get
allPartitionPaths.filter { partitionPath =>
val part = PartitioningUtils.parsePathFragment(partitionPath)
spec.forall { case (col, value) =>
PartitionPathEncodeUtils.escapePartitionValue(value) == part.getOrElse(col, null)
}
}.map(Row(_))
}
} else {
Seq.empty[Row]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
}

protected def checkAnswer(sql: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row: _*)).toArray)(spark.sql(sql).collect())
assertResult(expects.map(row => Row(row: _*)).toArray.sortBy(_.toString()))(spark.sql(sql).collect().sortBy(_.toString()))
}

protected def checkException(sql: String)(errorMsg: String): Unit = {
Expand Down
Loading