Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48585][SQL] Make built-in JdbcDialect's method classifyException throw out the original exception #46937

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def testCreateTableWithProperty(tbl: String): Unit = {}

def checkErrorFailedLoadTable(e: AnalysisException, tbl: String): Unit = {
checkError(
private def checkErrorFailedJDBC(
e: AnalysisException,
errorClass: String,
tbl: String): Unit = {
checkErrorMatchPVals(
exception = e,
errorClass = "FAILED_JDBC.UNCLASSIFIED",
errorClass = errorClass,
parameters = Map(
"url" -> "jdbc:",
"message" -> s"Failed to load table: $tbl"
)
"url" -> "jdbc:.*",
"tableName" -> s"`$tbl`")
)
}

Expand Down Expand Up @@ -132,7 +134,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... drop column") {
Expand All @@ -154,7 +156,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column type") {
Expand All @@ -170,7 +172,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... rename column") {
Expand Down Expand Up @@ -198,7 +200,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column nullability") {
Expand All @@ -209,7 +211,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("CREATE TABLE with table comment") {
Expand All @@ -231,7 +233,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')")
}
assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED")
checkErrorFailedJDBC(e, "FAILED_JDBC.CREATE_TABLE", "new_table")
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.types.{DataType, MetadataBuilder}
*
* @param dialects List of dialects.
*/
private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
private class AggregatedDialect(dialects: List[JdbcDialect])
extends JdbcDialect with NoLegacyJDBCError {

require(dialects.nonEmpty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types._

private case class DB2Dialect() extends JdbcDialect with SQLConfHelper {
private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:db2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._

private case class DatabricksDialect() extends JdbcDialect {
private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCError {

override def canHandle(url: String): Boolean = {
url.startsWith("jdbc:databricks")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors
import org.apache.spark.sql.types._


private case class DerbyDialect() extends JdbcDialect {
private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, N
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType, TimestampType}

private[sql] case class H2Dialect() extends JdbcDialect {
private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,23 @@ abstract class JdbcDialect extends Serializable with Logging {
metadata: MetadataBuilder): Unit = {}
}

/**
* Make the `classifyException` method throw out the original exception
*/
trait NoLegacyJDBCError extends JdbcDialect {

override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
new AnalysisException(
errorClass = errorClass,
messageParameters = messageParameters,
cause = Some(e))
}
}

/**
* :: DeveloperApi ::
* Registry of dialects that apply to every new jdbc `org.apache.spark.sql.DataFrame`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.jdbc.MsSqlServerDialect.{GEOGRAPHY, GEOMETRY}
import org.apache.spark.sql.types._


private case class MsSqlServerDialect() extends JdbcDialect {
private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types._

private case class MySQLDialect() extends JdbcDialect with SQLConfHelper {
private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError {

override def canHandle(url : String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.jdbc.OracleDialect._
import org.apache.spark.sql.types._


private case class OracleDialect() extends JdbcDialect with SQLConfHelper {
private case class OracleDialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
import org.apache.spark.sql.types._


private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
private case class PostgresDialect()
extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types.{BooleanType, DataType}

private case class SnowflakeDialect() extends JdbcDialect {
private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types._


private case class TeradataDialect() extends JdbcDialect {
private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:teradata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,15 +619,15 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {

test("CREATE TABLE with table property") {
withTable("h2.test.new_table") {
checkError(
checkErrorMatchPVals(
exception = intercept[AnalysisException] {
sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" +
" TBLPROPERTIES('ENGINE'='tableEngineName')")
},
errorClass = "FAILED_JDBC.UNCLASSIFIED",
errorClass = "FAILED_JDBC.CREATE_TABLE",
parameters = Map(
"url" -> "jdbc:",
"message" -> "Failed table creation: test.new_table"))
"url" -> "jdbc:.*",
"tableName" -> "`test`.`new_table`"))
}
}

Expand All @@ -639,14 +639,14 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
}

test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length") {
checkError(
checkErrorMatchPVals(
exception = intercept[AnalysisException]{
sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))")
},
errorClass = "FAILED_JDBC.UNCLASSIFIED",
errorClass = "FAILED_JDBC.CREATE_TABLE",
parameters = Map(
"url" -> "jdbc:",
"message" -> "Failed table creation: test.new_table"))
"url" -> "jdbc:.*",
"tableName" -> "`test`.`new_table`"))
}

test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") {
Expand Down