Skip to content

Commit

Permalink
[HUDI-4487] support to create ro/rt table by spark sql
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Aug 1, 2022
1 parent c72d895 commit 1e8bae7
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* Table definition for SQL funcitonalities. Depending on the way of data generation,
* Table definition for SQL functionalities. Depending on the way of data generation,
* meta of Hudi table can be from Spark catalog or meta directory on filesystem.
* [[HoodieCatalogTable]] takes both meta sources into consideration when handling
* EXTERNAL and MANAGED tables.
*
* NOTE: all the meta should be retrieved from meta directory on filesystem first.
*/
class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) extends Logging {

Expand All @@ -53,7 +55,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
/**
* database.table in catalog
*/
val catalogTableName = table.qualifiedName
val catalogTableName: String = table.qualifiedName

/**
* properties defined in catalog.
Expand Down Expand Up @@ -122,7 +124,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
/**
* Table schema
*/
lazy val tableSchema: StructType = table.schema
lazy val tableSchema: StructType = loadTableSchemaByMetaClient().getOrElse(table.schema)

/**
* The schema without hoodie meta fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.spark.sql.hudi.command

import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieFileFormat

import org.apache.hudi.common.model.{HoodieFileFormat, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.hadoop.HoodieParquetInputFormat
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}

import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
Expand All @@ -33,7 +36,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -62,12 +65,22 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
// check if there are conflict between table configs defined in hoodie table and properties defined in catalog.
CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)

// init hoodie table
hoodieCatalogTable.initHoodieTable()
val queryAsProp = hoodieCatalogTable.catalogProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
if (queryAsProp.isEmpty) {
// init hoodie table for a normal table (not a ro/rt table)
hoodieCatalogTable.initHoodieTable()
} else {
if (!hoodieCatalogTable.hoodieTableExists) {
throw new AnalysisException("Creating ro/rt table need the existence of the base table.")
}
if (HoodieTableType.MERGE_ON_READ != hoodieCatalogTable.tableType) {
throw new AnalysisException("Creating ro/rt table should only apply to a mor table.")
}
}

try {
// create catalog table for this hoodie table
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists)
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists, queryAsProp)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to create catalog table in metastore: ${e.getMessage}")
Expand All @@ -92,8 +105,11 @@ object CreateHoodieTableCommand {
}
}

def createTableInCatalog(sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable, ignoreIfExists: Boolean): Unit = {
def createTableInCatalog(
sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable,
ignoreIfExists: Boolean,
queryAsProp: Option[String] = None): Unit = {
val table = hoodieCatalogTable.table
assert(table.tableType != CatalogTableType.VIEW)

Expand Down Expand Up @@ -121,7 +137,8 @@ object CreateHoodieTableCommand {
Some(outputFormat),
Some(serdeFormat),
table.storage.compressed,
storageProperties + ("path" -> path))
storageProperties + ("path" -> path) ++ queryAsProp.map(ConfigUtils.IS_QUERY_AS_RO_TABLE -> _)
)

val tableName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table)
val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package org.apache.spark.sql.hudi.command

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.util.ConfigUtils

import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}

import scala.collection.JavaConverters._

Expand All @@ -45,6 +47,11 @@ case class CreateHoodieTableAsSelectCommand(
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)

val hasQueryAsProp = (table.storage.properties ++ table.properties).contains(ConfigUtils.IS_QUERY_AS_RO_TABLE)
if (hasQueryAsProp) {
throw new AnalysisException("Not support CTAS for the ro/rt table")
}

val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,138 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
}
}

test("Test Create ro/rt Table In The Right Way") {
withTempDir { tmp =>
val parentPath = tmp.getCanonicalPath
val tableName1 = generateTableName
spark.sql(
s"""
|create table $tableName1 (
| id int,
| name string,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'mor'
| )
| location '$parentPath/$tableName1'
""".stripMargin)
spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")

// drop ro and rt table, and recreate them
val roTableName1 = tableName1 + "_ro"
val rtTableName1 = tableName1 + "_rt"
spark.sql(
s"""
|create table $roTableName1
|using hudi
|tblproperties (
| 'hoodie.query.as.ro.table' = 'true'
|)
|location '$parentPath/$tableName1'
|""".stripMargin
)
val roCatalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(roTableName1))
assertResult(roCatalogTable.properties("type"))("mor")
assertResult(roCatalogTable.properties("primaryKey"))("id")
assertResult(roCatalogTable.properties("preCombineField"))("ts")
assertResult(roCatalogTable.storage.properties("hoodie.query.as.ro.table"))("true")
checkAnswer(s"select id, name, ts from $roTableName1")(
Seq(1, "a1", 1000)
)

spark.sql(
s"""
|create table $rtTableName1
|using hudi
|tblproperties (
| 'hoodie.query.as.ro.table' = 'false'
|)
|location '$parentPath/$tableName1'
|""".stripMargin
)
val rtCatalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(rtTableName1))
assertResult(rtCatalogTable.properties("type"))("mor")
assertResult(rtCatalogTable.properties("primaryKey"))("id")
assertResult(rtCatalogTable.properties("preCombineField"))("ts")
assertResult(rtCatalogTable.storage.properties("hoodie.query.as.ro.table"))("false")
checkAnswer(s"select id, name, ts from $rtTableName1")(
Seq(1, "a2", 1100)
)
}
}

test("Test Create ro/rt Table In The Wrong Way") {
withTempDir { tmp =>
val parentPath = tmp.getCanonicalPath

// test the case that create rt/rt table on cow table
val tableName1 = generateTableName
spark.sql(
s"""
|create table $tableName1 (
| id int,
| name string,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'cow'
| )
| location '$parentPath/$tableName1'
""".stripMargin)
spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")

val roTableName1 = tableName1 + "_ro"
checkExceptionContain(
s"""
|create table $roTableName1
|using hudi
|tblproperties (
| 'hoodie.query.as.ro.table' = 'true'
|)
|location '$parentPath/$tableName1'
|""".stripMargin
)("Creating ro/rt table should only apply to a mor table.")

// test the case that create rt/rt table on a nonexistent table
val tableName2 = generateTableName
val rtTableName2 = tableName2 + "_rt"
checkExceptionContain(
s"""
|create table $rtTableName2
|using hudi
|tblproperties (
| 'hoodie.query.as.ro.table' = 'true'
|)
|location '$parentPath/$tableName2'
|""".stripMargin
)("Creating ro/rt table need the existence of the base table.")

// test the case that CTAS
val tableName3 = generateTableName
checkExceptionContain(
s"""
| create table $tableName3 using hudi
| tblproperties(
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'mor',
| 'hoodie.query.as.ro.table' = 'true'
| )
| location '$parentPath/$tableName3'
| AS
| select 1 as id, 'a1' as name, 1000 as ts
| """.stripMargin
)("Not support CTAS for the ro/rt table")
}
}

test("Test Create Table As Select With Tblproperties For Filter Props") {
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
Expand Down

0 comments on commit 1e8bae7

Please sign in to comment.