Skip to content

Commit

Permalink
[HUDI-4315] Do not throw exception in BaseSpark3Adapter#toTableIdenti…
Browse files Browse the repository at this point in the history
…fier
  • Loading branch information
leesf committed Jun 26, 2022
1 parent 1c43c59 commit af61da1
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_NAME, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
Expand Down Expand Up @@ -696,4 +697,47 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
}

test("Test Insert Into With Catalog Identifier for spark >= 3.2.0") {
Seq("hudi", "parquet").foreach { format =>
withTempDir { tmp =>
val tableName = s"spark_catalog.default.$generateTableName"
// Create a partitioned table
if (HoodieSparkUtils.gteqSpark3_2) {
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using $format
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)
// Insert into dynamic partition
spark.sql(
s"""
| insert into $tableName
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-01-05' as dt
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
// Insert into static partition
spark.sql(
s"""
| insert into $tableName partition(dt = '2021-01-05')
| select 2 as id, 'a2' as name, 10 as price, 1000 as ts
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 10.0, 1000, "2021-01-05")
)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,31 @@ package org.apache.spark.sql.adapter

import org.apache.hudi.Spark3RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.SPARK_VERSION
import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Like, Predicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{Row, SparkSession}

import scala.util.control.NonFatal

/**
* Base implementation of [[SparkAdapter]] for Spark 3.x branch
*/
abstract class BaseSpark3Adapter extends SparkAdapter {
abstract class BaseSpark3Adapter extends SparkAdapter with Logging {

override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = {
new Spark3RowSerDe(encoder)
Expand Down Expand Up @@ -115,7 +116,13 @@ abstract class BaseSpark3Adapter extends SparkAdapter {
unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
case relation: UnresolvedRelation =>
isHoodieTable(toTableIdentifier(relation), spark)
try {
isHoodieTable(toTableIdentifier(relation), spark)
} catch {
case NonFatal(e) =>
logWarning("Failed to determine whether the table is a hoodie table", e)
false
}
case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
case _=> false
}
Expand Down

0 comments on commit af61da1

Please sign in to comment.