Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4487] support to create ro/rt table by spark sql #6262

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* Table definition for SQL funcitonalities. Depending on the way of data generation,
* Table definition for SQL functionalities. Depending on the way of data generation,
* meta of Hudi table can be from Spark catalog or meta directory on filesystem.
* [[HoodieCatalogTable]] takes both meta sources into consideration when handling
* EXTERNAL and MANAGED tables.
*
* NOTE: all the meta should be retrieved from meta directory on filesystem first.
*/
class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) extends Logging {

Expand All @@ -53,7 +55,7 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
/**
* database.table in catalog
*/
val catalogTableName = table.qualifiedName
val catalogTableName: String = table.qualifiedName

/**
* properties defined in catalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.spark.sql.hudi.command

import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieFileFormat

import org.apache.hudi.common.model.{HoodieFileFormat, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.hadoop.HoodieParquetInputFormat
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
import org.apache.hudi.sync.common.util.ConfigUtils
import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport}

import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
Expand All @@ -33,7 +36,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isUsingHiveCatalog
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlCommonUtils}
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.{SPARK_VERSION, SparkConf}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -62,12 +65,22 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean
// check if there are conflict between table configs defined in hoodie table and properties defined in catalog.
CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable)

// init hoodie table
hoodieCatalogTable.initHoodieTable()
val queryAsProp = hoodieCatalogTable.catalogProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)
if (queryAsProp.isEmpty) {
// init hoodie table for a normal table (not a ro/rt table)
hoodieCatalogTable.initHoodieTable()
} else {
if (!hoodieCatalogTable.hoodieTableExists) {
throw new AnalysisException("Creating ro/rt table need the existence of the base table.")
}
if (HoodieTableType.MERGE_ON_READ != hoodieCatalogTable.tableType) {
throw new AnalysisException("Creating ro/rt table should only apply to a mor table.")
}
}

try {
// create catalog table for this hoodie table
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists)
CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists, queryAsProp)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to create catalog table in metastore: ${e.getMessage}")
Expand All @@ -92,8 +105,11 @@ object CreateHoodieTableCommand {
}
}

def createTableInCatalog(sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable, ignoreIfExists: Boolean): Unit = {
def createTableInCatalog(
sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable,
ignoreIfExists: Boolean,
queryAsProp: Option[String] = None): Unit = {
val table = hoodieCatalogTable.table
assert(table.tableType != CatalogTableType.VIEW)

Expand Down Expand Up @@ -121,7 +137,8 @@ object CreateHoodieTableCommand {
Some(outputFormat),
Some(serdeFormat),
table.storage.compressed,
storageProperties + ("path" -> path))
storageProperties + ("path" -> path) ++ queryAsProp.map(ConfigUtils.IS_QUERY_AS_RO_TABLE -> _)
)

val tableName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.table)
val newDatabaseName = HoodieSqlCommonUtils.formatName(sparkSession, table.identifier.database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package org.apache.spark.sql.hudi.command

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.util.ConfigUtils

import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.needFilterProps
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}

import scala.collection.JavaConverters._

Expand All @@ -45,6 +47,11 @@ case class CreateHoodieTableAsSelectCommand(
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)

val hasQueryAsProp = (table.storage.properties ++ table.properties).contains(ConfigUtils.IS_QUERY_AS_RO_TABLE)
if (hasQueryAsProp) {
throw new AnalysisException("Not support CTAS for the ro/rt table")
}

val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = table.identifier.copy(database = Some(db))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,138 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
}
}

test("Test Create ro/rt Table In The Right Way") {
withTempDir { tmp =>
val parentPath = tmp.getCanonicalPath
val tableName1 = generateTableName
spark.sql(
s"""
|create table $tableName1 (
| id int,
| name string,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'mor'
| )
| location '$parentPath/$tableName1'
""".stripMargin)
spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")

// drop ro and rt table, and recreate them
val roTableName1 = tableName1 + "_ro"
val rtTableName1 = tableName1 + "_rt"
spark.sql(
s"""
|create table $roTableName1
|using hudi
|tblproperties (
| 'hoodie.query.as.ro.table' = 'true'
|)
|location '$parentPath/$tableName1'
|""".stripMargin
)
val roCatalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(roTableName1))
assertResult(roCatalogTable.properties("type"))("mor")
assertResult(roCatalogTable.properties("primaryKey"))("id")
assertResult(roCatalogTable.properties("preCombineField"))("ts")
assertResult(roCatalogTable.storage.properties("hoodie.query.as.ro.table"))("true")
checkAnswer(s"select id, name, ts from $roTableName1")(
Seq(1, "a1", 1000)
)

spark.sql(
s"""
|create table $rtTableName1
|using hudi
|tblproperties (
| 'hoodie.query.as.ro.table' = 'false'
|)
|location '$parentPath/$tableName1'
|""".stripMargin
)
val rtCatalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier(rtTableName1))
assertResult(rtCatalogTable.properties("type"))("mor")
assertResult(rtCatalogTable.properties("primaryKey"))("id")
assertResult(rtCatalogTable.properties("preCombineField"))("ts")
assertResult(rtCatalogTable.storage.properties("hoodie.query.as.ro.table"))("false")
checkAnswer(s"select id, name, ts from $rtTableName1")(
Seq(1, "a2", 1100)
)
}
}

test("Test Create ro/rt Table In The Wrong Way") {
withTempDir { tmp =>
val parentPath = tmp.getCanonicalPath

// test the case that create rt/rt table on cow table
val tableName1 = generateTableName
spark.sql(
s"""
|create table $tableName1 (
| id int,
| name string,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'cow'
| )
| location '$parentPath/$tableName1'
""".stripMargin)
spark.sql(s"insert into $tableName1 values (1, 'a1', 1000)")
spark.sql(s"insert into $tableName1 values (1, 'a2', 1100)")

val roTableName1 = tableName1 + "_ro"
checkExceptionContain(
s"""
|create table $roTableName1
|using hudi
|tblproperties (
| 'hoodie.query.as.ro.table' = 'true'
|)
|location '$parentPath/$tableName1'
|""".stripMargin
)("Creating ro/rt table should only apply to a mor table.")

// test the case that create rt/rt table on a nonexistent table
val tableName2 = generateTableName
val rtTableName2 = tableName2 + "_rt"
checkExceptionContain(
s"""
|create table $rtTableName2
|using hudi
|tblproperties (
| 'hoodie.query.as.ro.table' = 'true'
|)
|location '$parentPath/$tableName2'
|""".stripMargin
)("Creating ro/rt table need the existence of the base table.")

// test the case that CTAS
val tableName3 = generateTableName
checkExceptionContain(
s"""
| create table $tableName3 using hudi
| tblproperties(
| primaryKey = 'id',
| preCombineField = 'ts',
| type = 'mor',
| 'hoodie.query.as.ro.table' = 'true'
| )
| location '$parentPath/$tableName3'
| AS
| select 1 as id, 'a1' as name, 1000 as ts
| """.stripMargin
)("Not support CTAS for the ro/rt table")
}
}

test("Test Create Table As Select With Tblproperties For Filter Props") {
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
Expand Down