Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46410][SQL] Assign error classes/subclasses to JdbcUtils.classifyException #44358

Closed
wants to merge 15 commits into from
78 changes: 73 additions & 5 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,79 @@
],
"sqlState" : "38000"
},
"FAILED_JDBC" : {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FAILED_JDBC -> FAILED_JDBC_OPERATION

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too long, does _OPERATION bring any benefits?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just because FAILED_JDBC looks confused.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDBC means Java Database Connectivity, the connectivity might fails at any time, ops or while establishing connections or while keeping it. For instance, this is real example:

org.postgresql.util.PSQLException: The connection attempt failed.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:331)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the connectivity is not changed if create index failed

"message" : [
"Failed the JDBC operation:"
],
"subClass" : {
"ALTER_TABLE" : {
"message" : [
"Alter the table <tableName>."
]
},
"CREATE_INDEX" : {
"message" : [
"Create the index <indexName> in the <tableName> table."
]
},
"CREATE_NAMESPACE" : {
"message" : [
"Create the namespace <namespace>."
]
},
"CREATE_NAMESPACE_COMMENT" : {
"message" : [
"Create a comment on the namespace: <namespace>."
]
},
"CREATE_TABLE" : {
"message" : [
"Create the table <tableName>."
]
},
"DROP_INDEX" : {
"message" : [
"Drop the index <indexName> in the <tableName> table."
]
},
"DROP_NAMESPACE" : {
"message" : [
"Drop the namespace <namespace>."
]
},
"GET_TABLES" : {
"message" : [
"Get tables from the namespace: <namespace>."
]
},
"LIST_NAMESPACES" : {
"message" : [
"List namespaces."
]
},
"NAMESPACE_EXISTS" : {
"message" : [
"Check that the namespace <namespace> exists."
]
},
"REMOVE_NAMESPACE_COMMENT" : {
"message" : [
"Remove a comment on the namespace: <namespace>."
]
},
"RENAME_TABLE" : {
"message" : [
"Rename the table <oldName> to <newName>."
]
},
"TABLE_EXISTS" : {
"message" : [
"Check that the table <tableName> exists."
]
}
},
"sqlState" : "40000"
Copy link
Contributor

@srielau srielau Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be "transaction rollback"
This is all about reaching out to foreign data right?
HV000 FDW-specific condition?

Is it obvious from the context what we are connecting to? Should we add some JDBC level context (URL?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all about reaching out to foreign data right?

Right.

HV000 FDW-specific condition?

Seems like, but what do you propose? to get sqlState from SQLExpression?

Is it obvious from the context what we are connecting to?

Should be if an user reads data from one JDBC datasource.

Should we add some JDBC level context (URL?)

I can extract and pass option.url to AnalysisException with new error class FAILED_JDBC but how about other (classified exceptions) like:

          case "42P07" =>
            if (errorClass == "FAILED_JDBC.CREATE_INDEX") {
              throw new IndexAlreadyExistsException(
                indexName = messageParameters("indexName"),
                tableName = messageParameters("tableName"),
                cause = Some(e))

The IndexAlreadyExistsException has its own error class INDEX_ALREADY_EXISTS without any parameters related to JDBC url.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was proposing to replace 40000 with HV004.

This example is interesting. How do we get a 42P07 with FAILED_JDBC errorclass?

Copy link
Member Author

@MaxGekk MaxGekk Dec 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was proposing to replace 40000 with HV004.

@srielau I didn't get the logic why HV004 but not HV000? In postgresql for instance, it is related to fdw_invalid_data_type

How do we get a 42P07 with FAILED_JDBC errorclass?

From the doc https://www.postgresql.org/docs/14/errcodes-appendix.html and experiments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I fat-fingered. Yes HV000

},
"FAILED_PARSE_STRUCT_TYPE" : {
"message" : [
"Failed parsing struct: <raw>."
Expand Down Expand Up @@ -6778,11 +6851,6 @@
"pivot is not supported on a streaming DataFrames/Datasets"
]
},
"_LEGACY_ERROR_TEMP_3064" : {
"message" : [
"<msg>"
]
},
"_LEGACY_ERROR_TEMP_3065" : {
"message" : [
"<clazz>: <msg>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

test("CREATE TABLE with table property") {
withTable(s"$catalogName.new_table") {
val m = intercept[AnalysisException] {
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')")
}.message
assert(m.contains("Failed table creation"))
}
assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE")
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
Expand Down Expand Up @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
},
errorClass = "INDEX_ALREADY_EXISTS",
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)

sql(s"DROP index i1 ON $catalogName.new_table")
Expand All @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
sql(s"DROP index i1 ON $catalogName.new_table")
},
errorClass = "INDEX_NOT_FOUND",
parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)
}
}
Expand Down
80 changes: 80 additions & 0 deletions docs/sql-error-conditions-failed-jdbc-error-class.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
---
layout: global
title: FAILED_JDBC error class
displayTitle: FAILED_JDBC error class
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

SQLSTATE: 40000

Failed the JDBC operation:

This error class has the following derived error classes:

## ALTER_TABLE

Alter the table `<tableName>`.

## CREATE_INDEX

Create the index `<indexName>` in the `<tableName>` table.

## CREATE_NAMESPACE

Create the namespace `<namespace>`.

## CREATE_NAMESPACE_COMMENT

Create a comment on the namespace: `<namespace>`.

## CREATE_TABLE

Create the table `<tableName>`.

## DROP_INDEX

Drop the index `<indexName>` in the `<tableName>` table.

## DROP_NAMESPACE

Drop the namespace `<namespace>`.

## GET_TABLES

Get tables from the namespace: `<namespace>`.

## LIST_NAMESPACES

List namespaces.

## NAMESPACE_EXISTS

Check that the namespace `<namespace>` exists.

## REMOVE_NAMESPACE_COMMENT

Remove a comment on the namespace: `<namespace>`.

## RENAME_TABLE

Rename the table `<oldName>` to `<newName>`.

## TABLE_EXISTS

Check that the table `<tableName>` exists.


8 changes: 8 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,14 @@ User defined function (`<functionName>`: (`<signature>`) => `<result>`) failed d

Failed preparing of the function `<funcName>` for call. Please, double check function's arguments.

### [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html)

SQLSTATE: 40000

Failed the JDBC operation:

For more details see [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html)

### FAILED_PARSE_STRUCT_TYPE

[SQLSTATE: 22018](sql-error-conditions-sqlstates.html#class-22-data-exception)
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.CacheId$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.CacheId.apply"),

// SPARK-46410: Assign error classes/subclasses to JdbcUtils.classifyException
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.JdbcDialect.classifyException"),

(problem: Problem) => problem match {
case MissingClassProblem(cls) => !cls.fullName.startsWith("org.sparkproject.jpmml") &&
!cls.fullName.startsWith("org.sparkproject.dmg.pmml")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,4 @@ case class NonEmptyNamespaceException(
"details" -> details)) {

def this(namespace: Array[String]) = this(namespace, "", None)

def this(details: String, cause: Option[Throwable]) =
this(Array.empty, details, cause)
}
Original file line number Diff line number Diff line change
Expand Up @@ -1180,12 +1180,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}

def classifyException[T](message: String, dialect: JdbcDialect)(f: => T): T = {
def classifyException[T](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a breaking change, isn't it? JDBCDialect is a public developer API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when was this API added? @beliefer do you know?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should keep two versions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And how will you combine exceptions from the both versions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are no nested exception here. We pass a default error class to classifyException and if a dialect cannot classify an exception from JDBC driver, it throws the default one otherwise it forms and throws another one. The exception from JDBC driver is added as cause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a dialect can classify an exception, we will lose the error class which is actually worse?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also enable MiMa for this? I believe this is being skipped because it's under execution package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a dialect can classify an exception, we will lose the error class which is actually worse?

@cloud-fan A dialect can override proposed error class and make our default error class more precise using the driver specific info. And yes, the original error class will be lost.

Can we also enable MiMa for this? I believe this is being skipped because it's under execution package.

@HyukjinKwon I made the modification because MiMa complained. Did you change MiMa to expect different behaviour?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think losing the original error class is bad. We added classifyException for getting more precise java exception type, but I don't think this is useful anymore as we have error classes. My suggestion is to deprecate classifyException, saying that Spark never calls it anymore.

errorClass: String,
messageParameters: Map[String, String],
dialect: JdbcDialect)(f: => T): T = {
try {
f
} catch {
case e: SparkThrowable with Throwable => throw e
case e: Throwable => throw dialect.classifyException(message, e)
case e: Throwable => throw dialect.classifyException(e, errorClass, messageParameters)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.errors.DataTypeErrorsBase
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JdbcUtils}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions)
extends Table with SupportsRead with SupportsWrite with SupportsIndex {
extends Table
with SupportsRead
with SupportsWrite
with SupportsIndex
with DataTypeErrorsBase {

override def name(): String = ident.toString

Expand All @@ -58,8 +63,12 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to create index $indexName in ${name()}",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.classifyException(
errorClass = "FAILED_JDBC.CREATE_INDEX",
messageParameters = Map(
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.createIndex(
conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions)
}
Expand All @@ -74,8 +83,12 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt

override def dropIndex(indexName: String): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
JdbcUtils.classifyException(s"Failed to drop index $indexName in ${name()}",
JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.classifyException(
errorClass = "FAILED_JDBC.DROP_INDEX",
messageParameters = Map(
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
}
}
Expand Down
Loading