diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 4f82e2595c353..d1dcbbcf6ced6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql.errors +import scala.collection.mutable + import org.apache.hadoop.fs.Path import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, InvalidUDFClassException} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, LogicalPlan, SerdeInfo, Window} @@ -1696,6 +1699,369 @@ private[spark] object QueryCompilationErrors { s"Found duplicate column(s) $colType: ${duplicateCol.sorted.mkString(", ")}") } + def noSuchTableError(db: String, table: String): Throwable = { + new NoSuchTableException(db = db, table = table) + } + + def tempViewNotCachedForAnalyzingColumnsError(tableIdent: TableIdentifier): Throwable = { + new AnalysisException(s"Temporary view $tableIdent is not cached for analyzing columns.") + } + + def columnTypeNotSupportStatisticsCollectionError( + name: String, + tableIdent: TableIdentifier, + dataType: DataType): Throwable = { + new AnalysisException(s"Column $name in table $tableIdent is of type $dataType, " + + "and Spark does not support statistics collection on this column type.") + } + + def analyzeTableNotSupportedOnViewsError(): Throwable = { + new AnalysisException("ANALYZE TABLE is not supported on views.") + } + + def unexpectedPartitionColumnPrefixError( + table: String, + database: String, + schemaColumns: String, + specColumns: String): Throwable = { + new AnalysisException( + s""" + |The list of partition columns with values + |in partition specification for table '${table}' + |in database '${database}' is not a prefix of the list of + |partition columns defined in the table schema. + |Expected a prefix of [${schemaColumns}], but got [${specColumns}]. + """.stripMargin.replaceAll("\n", " ")) + } + + def noSuchPartitionError( + db: String, + table: String, + partition: TablePartitionSpec): Throwable = { + new NoSuchPartitionException(db, table, partition) + } + + def analyzingColumnStatisticsNotSupportedForColumnTypeError( + name: String, + dataType: DataType): Throwable = { + new AnalysisException("Analyzing column statistics is not supported for column " + + s"$name of data type: $dataType.") + } + + def tableAlreadyExistsError(table: String, guide: String = ""): Throwable = { + new AnalysisException(s"Table $table already exists." + guide) + } + + def createTableAsSelectWithNonEmptyDirectoryError(tablePath: String): Throwable = { + new AnalysisException( + s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " + + s"${tablePath} . To allow overwriting the existing non-empty directory, " + + s"set '${SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key}' to true.") + } + + def tableOrViewNotFoundError(table: String): Throwable = { + new AnalysisException(s"Table or view not found: $table") + } + + def unsetNonExistentPropertyError(property: String, table: TableIdentifier): Throwable = { + new AnalysisException(s"Attempted to unset non-existent property '$property' in table '$table'") + } + + def alterTableChangeColumnNotSupportedForColumnTypeError( + originColumn: StructField, + newColumn: StructField): Throwable = { + new AnalysisException("ALTER TABLE CHANGE COLUMN is not supported for changing column " + + s"'${originColumn.name}' with type '${originColumn.dataType}' to " + + s"'${newColumn.name}' with type '${newColumn.dataType}'") + } + + def cannotFindColumnError(name: String, fieldNames: Array[String]): Throwable = { + new AnalysisException(s"Can't find column `$name` given table data columns " + + s"${fieldNames.mkString("[`", "`, `", "`]")}") + } + + def alterTableSetSerdeForSpecificPartitionNotSupportedError(): Throwable = { + new AnalysisException("Operation not allowed: ALTER TABLE SET " + + "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " + + "for tables created with the datasource API") + } + + def alterTableSetSerdeNotSupportedError(): Throwable = { + new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " + + "not supported for tables created with the datasource API") + } + + def cmdOnlyWorksOnPartitionedTablesError(cmd: String, tableIdentWithDB: String): Throwable = { + new AnalysisException( + s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB") + } + + def cmdOnlyWorksOnTableWithLocationError(cmd: String, tableIdentWithDB: String): Throwable = { + new AnalysisException(s"Operation not allowed: $cmd only works on table with " + + s"location provided: $tableIdentWithDB") + } + + def actionNotAllowedOnTableWithFilesourcePartitionManagementDisabledError( + action: String, + tableName: String): Throwable = { + new AnalysisException( + s"$action is not allowed on $tableName since filesource partition management is " + + "disabled (spark.sql.hive.manageFilesourcePartitions = false).") + } + + def actionNotAllowedOnTableSincePartitionMetadataNotStoredError( + action: String, + tableName: String): Throwable = { + new AnalysisException( + s"$action is not allowed on $tableName since its partition metadata is not stored in " + + "the Hive metastore. To import this information into the metastore, run " + + s"`msck repair table $tableName`") + } + + def cannotAlterViewWithAlterTableError(): Throwable = { + new AnalysisException( + "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead") + } + + def cannotAlterTableWithAlterViewError(): Throwable = { + new AnalysisException( + "Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead") + } + + def cannotOverwritePathBeingReadFromError(): Throwable = { + new AnalysisException("Cannot overwrite a path that is also being read from.") + } + + def createFuncWithBothIfNotExistsAndReplaceError(): Throwable = { + new AnalysisException("CREATE FUNCTION with both IF NOT EXISTS and REPLACE is not allowed.") + } + + def defineTempFuncWithIfNotExistsError(): Throwable = { + new AnalysisException("It is not allowed to define a TEMPORARY function with IF NOT EXISTS.") + } + + def specifyingDBInCreateTempFuncError(databaseName: String): Throwable = { + new AnalysisException( + s"Specifying a database in CREATE TEMPORARY FUNCTION is not allowed: '$databaseName'") + } + + def specifyingDBInDropTempFuncError(databaseName: String): Throwable = { + new AnalysisException( + s"Specifying a database in DROP TEMPORARY FUNCTION is not allowed: '$databaseName'") + } + + def cannotDropNativeFuncError(functionName: String): Throwable = { + new AnalysisException(s"Cannot drop native function '$functionName'") + } + + def cannotRefreshBuiltInFuncError(functionName: String): Throwable = { + new AnalysisException(s"Cannot refresh built-in function $functionName") + } + + def cannotRefreshTempFuncError(functionName: String): Throwable = { + new AnalysisException(s"Cannot refresh temporary function $functionName") + } + + def noSuchFunctionError(identifier: FunctionIdentifier): Throwable = { + new NoSuchFunctionException(identifier.database.get, identifier.funcName) + } + + def alterAddColNotSupportViewError(table: TableIdentifier): Throwable = { + new AnalysisException( + s""" + |ALTER ADD COLUMNS does not support views. + |You must drop and re-create the views for adding the new columns. Views: $table + """.stripMargin) + } + + def alterAddColNotSupportDatasourceTableError( + tableType: Any, + table: TableIdentifier): Throwable = { + new AnalysisException( + s""" + |ALTER ADD COLUMNS does not support datasource table with type $tableType. + |You must drop and re-create the table for adding the new columns. Tables: $table + """.stripMargin) + } + + def loadDataNotSupportedForDatasourceTablesError(tableIdentWithDB: String): Throwable = { + new AnalysisException(s"LOAD DATA is not supported for datasource tables: $tableIdentWithDB") + } + + def loadDataWithoutPartitionSpecProvidedError(tableIdentWithDB: String): Throwable = { + new AnalysisException(s"LOAD DATA target table $tableIdentWithDB is partitioned, " + + s"but no partition spec is provided") + } + + def loadDataPartitionSizeNotMatchNumPartitionColumnsError( + tableIdentWithDB: String, + partitionSize: Int, + targetTableSize: Int): Throwable = { + new AnalysisException( + s""" + |LOAD DATA target table $tableIdentWithDB is partitioned, + |but number of columns in provided partition spec ($partitionSize) + |do not match number of partitioned columns in table ($targetTableSize) + """.stripMargin.replaceAll("\n", " ")) + } + + def loadDataTargetTableNotPartitionedButPartitionSpecWasProvidedError( + tableIdentWithDB: String): Throwable = { + new AnalysisException(s"LOAD DATA target table $tableIdentWithDB is not " + + s"partitioned, but a partition spec was provided.") + } + + def loadDataInputPathNotExistError(path: String): Throwable = { + new AnalysisException(s"LOAD DATA input path does not exist: $path") + } + + def truncateTableOnExternalTablesError(tableIdentWithDB: String): Throwable = { + new AnalysisException( + s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentWithDB") + } + + def truncateTablePartitionNotSupportedForNotPartitionedTablesError( + tableIdentWithDB: String): Throwable = { + new AnalysisException(s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported" + + s" for tables that are not partitioned: $tableIdentWithDB") + } + + def failToTruncateTableWhenRemovingDataError( + tableIdentWithDB: String, + path: Path, + e: Throwable): Throwable = { + new AnalysisException(s"Failed to truncate table $tableIdentWithDB when " + + s"removing data of the path: $path because of ${e.toString}") + } + + def descPartitionNotAllowedOnTempView(table: String): Throwable = { + new AnalysisException(s"DESC PARTITION is not allowed on a temporary view: $table") + } + + def descPartitionNotAllowedOnView(table: String): Throwable = { + new AnalysisException(s"DESC PARTITION is not allowed on a view: $table") + } + + def showPartitionNotAllowedOnTableNotPartitionedError(tableIdentWithDB: String): Throwable = { + new AnalysisException( + s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB") + } + + def showCreateTableNotSupportedOnTempView(table: String): Throwable = { + new AnalysisException(s"SHOW CREATE TABLE is not supported on a temporary view: $table") + } + + def showCreateTableFailToExecuteUnsupportedFeatureError(table: CatalogTable): Throwable = { + new AnalysisException("Failed to execute SHOW CREATE TABLE against table " + + s"${table.identifier}, which is created by Hive and uses the " + + s"following unsupported feature(s)\n" + + table.unsupportedFeatures.map(" - " + _).mkString("\n") + ". " + + s"Please use `SHOW CREATE TABLE ${table.identifier} AS SERDE` to show Hive DDL instead.") + } + + def showCreateTableNotSupportTransactionalHiveTableError(table: CatalogTable): Throwable = { + new AnalysisException("SHOW CREATE TABLE doesn't support transactional Hive table. " + + s"Please use `SHOW CREATE TABLE ${table.identifier} AS SERDE` " + + "to show Hive DDL instead.") + } + + def showCreateTableFailToExecuteUnsupportedConfError( + table: TableIdentifier, + builder: mutable.StringBuilder): Throwable = { + new AnalysisException("Failed to execute SHOW CREATE TABLE against table " + + s"${table.identifier}, which is created by Hive and uses the " + + "following unsupported serde configuration\n" + + builder.toString() + ) + } + + def descPartitionNotAllowedOnViewError(table: String): Throwable = { + new AnalysisException(s"DESC PARTITION is not allowed on a view: $table") + } + + def showCreateTableAsSerdeNotAllowedOnSparkDataSourceTableError( + table: TableIdentifier): Throwable = { + new AnalysisException( + s"$table is a Spark data source table. Use `SHOW CREATE TABLE` without `AS SERDE` instead.") + } + + def showCreateTableOrViewFailToExecuteUnsupportedFeatureError( + table: CatalogTable, + features: Seq[String]): Throwable = { + new AnalysisException( + s"Failed to execute SHOW CREATE TABLE against table/view ${table.identifier}, " + + "which is created by Hive and uses the following unsupported feature(s)\n" + + features.map(" - " + _).mkString("\n")) + } + + def createViewWithBothIfNotExistsAndReplaceError(): Throwable = { + new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.") + } + + def defineTempViewWithIfNotExistsError(): Throwable = { + new AnalysisException("It is not allowed to define a TEMPORARY view with IF NOT EXISTS.") + } + + def notAllowedToAddDBPrefixForTempViewError(database: String): Throwable = { + new AnalysisException( + s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.") + } + + def logicalPlanForViewNotAnalyzedError(): Throwable = { + new AnalysisException("The logical plan that represents the view is not analyzed.") + } + + def createViewNumColumnsMismatchUserSpecifiedColumnLengthError( + analyzedPlanLength: Int, + userSpecifiedColumnsLength: Int): Throwable = { + new AnalysisException(s"The number of columns produced by the SELECT clause " + + s"(num: `$analyzedPlanLength`) does not match the number of column names " + + s"specified by CREATE VIEW (num: `$userSpecifiedColumnsLength`).") + } + + def tableIsNotViewError(name: TableIdentifier): Throwable = { + new AnalysisException(s"$name is not a view") + } + + def viewAlreadyExistsError(name: TableIdentifier): Throwable = { + new AnalysisException( + s"View $name already exists. If you want to update the view definition, " + + "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS") + } + + def createPersistedViewFromDatasetAPINotAllowedError(): Throwable = { + new AnalysisException("It is not allowed to create a persisted view from the Dataset API") + } + + def recursiveViewDetectedError( + viewIdent: TableIdentifier, + newPath: Seq[TableIdentifier]): Throwable = { + new AnalysisException(s"Recursive view $viewIdent detected " + + s"(cycle: ${newPath.mkString(" -> ")})") + } + + def notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError( + name: TableIdentifier, + attrName: String): Throwable = { + new AnalysisException(s"Not allowed to create a permanent view $name without " + + s"explicitly assigning an alias for expression $attrName") + } + + def notAllowedToCreatePermanentViewByReferencingTempViewError( + name: TableIdentifier, + nameParts: String): Throwable = { + new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary view $nameParts. " + + "Please create a temp view instead by CREATE TEMP VIEW") + } + + def notAllowedToCreatePermanentViewByReferencingTempFuncError( + name: TableIdentifier, + funcName: String): Throwable = { + new AnalysisException(s"Not allowed to create a permanent view $name by " + + s"referencing a temporary function `$funcName`") + } + def queryFromRawFilesIncludeCorruptRecordColumnError(): Throwable = { new AnalysisException( """ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index a57acf2800aad..7f77243af8a88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -28,6 +28,7 @@ import java.util.concurrent.TimeoutException import com.fasterxml.jackson.core.JsonToken import org.apache.hadoop.fs.{FileAlreadyExistsException, FileStatus, Path} +import org.apache.hadoop.fs.permission.FsPermission import org.codehaus.commons.compiler.CompileException import org.codehaus.janino.InternalCompilerException @@ -1544,6 +1545,23 @@ object QueryExecutionErrors { new NullPointerException(s"Value at index $index is null") } + def onlySupportDataSourcesProvidingFileFormatError(providingClass: String): Throwable = { + new SparkException(s"Only Data Sources providing FileFormat are supported: $providingClass") + } + + def failToSetOriginalPermissionBackError( + permission: FsPermission, + path: Path, + e: Throwable): Throwable = { + new SecurityException(s"Failed to set original permission $permission back to " + + s"the created path: $path. Exception: ${e.getMessage}") + } + + def failToSetOriginalACLBackError(aclEntries: String, path: Path, e: Throwable): Throwable = { + new SecurityException(s"Failed to set original ACL $aclEntries back to " + + s"the created path: $path. Exception: ${e.getMessage}") + } + def multiFailuresInStageMaterializationError(error: Throwable): Throwable = { new SparkException("Multiple failures in stage materialization.", error) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index e3c2e90a42dec..5cb347868b164 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types._ @@ -44,7 +44,7 @@ case class AnalyzeColumnCommand( tableIdent.database match { case Some(db) if db == sparkSession.sharedState.globalTempViewManager.database => val plan = sessionState.catalog.getGlobalTempView(tableIdent.identifier).getOrElse { - throw new NoSuchTableException(db = db, table = tableIdent.identifier) + throw QueryCompilationErrors.noSuchTableError(db, tableIdent.identifier) } analyzeColumnInTempView(plan, sparkSession) case Some(_) => @@ -72,8 +72,7 @@ case class AnalyzeColumnCommand( private def analyzeColumnInTempView(plan: LogicalPlan, sparkSession: SparkSession): Unit = { if (!analyzeColumnInCachedData(plan, sparkSession)) { - throw new AnalysisException( - s"Temporary view $tableIdent is not cached for analyzing columns.") + throw QueryCompilationErrors.tempViewNotCachedForAnalyzingColumnsError(tableIdent) } } @@ -87,15 +86,14 @@ case class AnalyzeColumnCommand( } else { columnNames.get.map { col => val exprOption = relation.output.find(attr => conf.resolver(attr.name, col)) - exprOption.getOrElse(throw new AnalysisException(s"Column $col does not exist.")) + exprOption.getOrElse(throw QueryCompilationErrors.columnDoesNotExistError(col)) } } // Make sure the column types are supported for stats gathering. columnsToAnalyze.foreach { attr => if (!supportsType(attr.dataType)) { - throw new AnalysisException( - s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " + - "and Spark does not support statistics collection on this column type.") + throw QueryCompilationErrors.columnTypeNotSupportStatisticsCollectionError( + attr.name, tableIdent, attr.dataType) } } columnsToAnalyze @@ -108,7 +106,7 @@ case class AnalyzeColumnCommand( // Analyzes a catalog view if the view is cached val plan = sparkSession.table(tableIdent.quotedString).logicalPlan if (!analyzeColumnInCachedData(plan, sparkSession)) { - throw new AnalysisException("ANALYZE TABLE is not supported on views.") + throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError() } } else { val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 5b3cb7476608b..38d92ba752ad7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession} +import org.apache.spark.sql.{Column, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.util.PartitioningUtils /** @@ -58,11 +59,8 @@ case class AnalyzePartitionCommand( val tableId = table.identifier val schemaColumns = table.partitionColumnNames.mkString(",") val specColumns = normalizedPartitionSpec.keys.mkString(",") - throw new AnalysisException("The list of partition columns with values " + - s"in partition specification for table '${tableId.table}' " + - s"in database '${tableId.database.get}' is not a prefix of the list of " + - "partition columns defined in the table schema. " + - s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].") + throw QueryCompilationErrors.unexpectedPartitionColumnPrefixError( + tableId.table, tableId.database.get, schemaColumns, specColumns) } val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get) @@ -79,7 +77,7 @@ case class AnalyzePartitionCommand( val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) if (tableMeta.tableType == CatalogTableType.VIEW) { - throw new AnalysisException("ANALYZE TABLE is not supported on views.") + throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError() } val partitionValueSpec = getPartitionSpec(tableMeta) @@ -88,7 +86,8 @@ case class AnalyzePartitionCommand( if (partitions.isEmpty) { if (partitionValueSpec.isDefined) { - throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get) + throw QueryCompilationErrors.noSuchPartitionError( + db, tableIdent.table, partitionValueSpec.get) } else { // the user requested to analyze all partitions for a table which has no partitions // return normally, since there is nothing to do diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index da5d00c595cb7..312f17543ce26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -25,13 +25,14 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.{SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex} import org.apache.spark.sql.internal.{SessionState, SQLConf} @@ -217,7 +218,7 @@ object CommandUtils extends Logging { table.count() } } else { - throw new AnalysisException("ANALYZE TABLE is not supported on views.") + throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError() } } else { // Compute stats for the whole table @@ -381,8 +382,8 @@ object CommandUtils extends Logging { Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)), nullArray) case _ => - throw new AnalysisException("Analyzing column statistics is not supported for column " + - s"${col.name} of data type: ${col.dataType}.") + throw QueryCompilationErrors.analyzingColumnStatisticsNotSupportedForColumnTypeError( + col.name, col.dataType) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index ca51b88de0d88..338ce8cac420f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -22,9 +22,10 @@ import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext -import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} +import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -114,10 +115,8 @@ object DataWritingCommand { if (fs.exists(filePath) && fs.getFileStatus(filePath).isDirectory && fs.listStatus(filePath).length != 0) { - throw new AnalysisException( - s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " + - s"${tablePath} . To allow overwriting the existing non-empty directory, " + - s"set '${SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key}' to true.") + throw QueryCompilationErrors.createTableAsSelectWithNonEmptyDirectoryError( + tablePath.toString) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala index be680a733eac9..35c8bec371624 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ /** @@ -61,8 +61,8 @@ case class InsertIntoDataSourceDirCommand( val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass) if (!isFileFormat) { - throw new SparkException( - "Only Data Sources providing FileFormat are supported: " + dataSource.providingClass) + throw QueryExecutionErrors.onlySupportDataSourcesProvidingFileFormatError( + dataSource.providingClass.toString) } val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 3caf850bfb07f..fcad25d1937e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{CommandExecutionMode, SparkPlan} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.BaseRelation @@ -53,7 +54,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo if (ignoreIfExists) { return Seq.empty[Row] } else { - throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.") + throw QueryCompilationErrors.tableAlreadyExistsError(table.identifier.unquotedString) } } @@ -156,7 +157,8 @@ case class CreateDataSourceTableAsSelectCommand( s"Expect the table $tableName has been dropped when the save mode is Overwrite") if (mode == SaveMode.ErrorIfExists) { - throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.") + throw QueryCompilationErrors.tableAlreadyExistsError( + tableName, " You need to drop it first.") } if (mode == SaveMode.Ignore) { // Since the table already exists and the save mode is Ignore, we will just return. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 06c684783a51e..605d98ee54f44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog._ @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog} import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter @@ -222,11 +223,9 @@ case class DropTableCommand( // issue an exception. catalog.getTableMetadata(tableName).tableType match { case CatalogTableType.VIEW if !isView => - throw new AnalysisException( - "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") + throw QueryCompilationErrors.cannotDropViewWithDropTableError() case o if o != CatalogTableType.VIEW && isView => - throw new AnalysisException( - s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") + throw QueryCompilationErrors.cannotDropViewWithDropTableError() case _ => } } @@ -245,7 +244,7 @@ case class DropTableCommand( } else if (ifExists) { // no-op } else { - throw new AnalysisException(s"Table or view not found: ${tableName.identifier}") + throw QueryCompilationErrors.tableOrViewNotFoundError(tableName.identifier) } Seq.empty[Row] } @@ -303,8 +302,7 @@ case class AlterTableUnsetPropertiesCommand( if (!ifExists) { propKeys.foreach { k => if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) { - throw new AnalysisException( - s"Attempted to unset non-existent property '$k' in table '${table.identifier}'") + throw QueryCompilationErrors.unsetNonExistentPropertyError(k, table.identifier) } } } @@ -346,10 +344,8 @@ case class AlterTableChangeColumnCommand( val originColumn = findColumnByName(table.dataSchema, columnName, resolver) // Throw an AnalysisException if the column name/dataType is changed. if (!columnEqual(originColumn, newColumn, resolver)) { - throw new AnalysisException( - "ALTER TABLE CHANGE COLUMN is not supported for changing column " + - s"'${originColumn.name}' with type '${originColumn.dataType}' to " + - s"'${newColumn.name}' with type '${newColumn.dataType}'") + throw QueryCompilationErrors.alterTableChangeColumnNotSupportedForColumnTypeError( + originColumn, newColumn) } val newDataSchema = table.dataSchema.fields.map { field => @@ -371,9 +367,7 @@ case class AlterTableChangeColumnCommand( schema: StructType, name: String, resolver: Resolver): StructField = { schema.fields.collectFirst { case field if resolver(field.name, name) => field - }.getOrElse(throw new AnalysisException( - s"Can't find column `$name` given table data columns " + - s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) + }.getOrElse(throw QueryCompilationErrors.cannotFindColumnError(name, schema.fieldNames)) } // Add the comment to a column, if comment is empty, return the original column. @@ -413,13 +407,10 @@ case class AlterTableSerDePropertiesCommand( val table = catalog.getTableRawMetadata(tableName) // For datasource tables, disallow setting serde or specifying partition if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) { - throw new AnalysisException("Operation not allowed: ALTER TABLE SET " + - "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " + - "for tables created with the datasource API") + throw QueryCompilationErrors.alterTableSetSerdeForSpecificPartitionNotSupportedError() } if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) { - throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " + - "not supported for tables created with the datasource API") + throw QueryCompilationErrors.alterTableSetSerdeNotSupportedError() } if (partSpec.isEmpty) { val newTable = table.withNewStorage( @@ -629,13 +620,11 @@ case class RepairTableCommand( val table = catalog.getTableRawMetadata(tableName) val tableIdentWithDB = table.identifier.quotedString if (table.partitionColumnNames.isEmpty) { - throw new AnalysisException( - s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB") + throw QueryCompilationErrors.cmdOnlyWorksOnPartitionedTablesError(cmd, tableIdentWithDB) } if (table.storage.locationUri.isEmpty) { - throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " + - s"location provided: $tableIdentWithDB") + throw QueryCompilationErrors.cmdOnlyWorksOnTableWithLocationError(cmd, tableIdentWithDB) } val root = new Path(table.location) @@ -901,15 +890,12 @@ object DDLUtils { spark: SparkSession, table: CatalogTable, action: String): Unit = { val tableName = table.identifier.table if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) { - throw new AnalysisException( - s"$action is not allowed on $tableName since filesource partition management is " + - "disabled (spark.sql.hive.manageFilesourcePartitions = false).") + throw QueryCompilationErrors + .actionNotAllowedOnTableWithFilesourcePartitionManagementDisabledError(action, tableName) } if (!table.tracksPartitionsInCatalog && isDatasourceTable(table)) { - throw new AnalysisException( - s"$action is not allowed on $tableName since its partition metadata is not stored in " + - "the Hive metastore. To import this information into the metastore, run " + - s"`msck repair table $tableName`") + throw QueryCompilationErrors.actionNotAllowedOnTableSincePartitionMetadataNotStoredError( + action, tableName) } } @@ -929,11 +915,9 @@ object DDLUtils { if (!catalog.isTempView(tableMetadata.identifier)) { tableMetadata.tableType match { case CatalogTableType.VIEW if !isView => - throw new AnalysisException( - "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead") + throw QueryCompilationErrors.cannotAlterViewWithAlterTableError() case o if o != CatalogTableType.VIEW && isView => - throw new AnalysisException( - s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead") + throw QueryCompilationErrors.cannotAlterTableWithAlterViewError() case _ => } } @@ -972,8 +956,7 @@ object DDLUtils { }.flatten if (inputPaths.contains(outputPath)) { - throw new AnalysisException( - "Cannot overwrite a path that is also being read from.") + throw QueryCompilationErrors.cannotOverwritePathBeingReadFromError() } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index 0eda90a596999..ae9a77d56c367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import java.util.Locale -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException} import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource} @@ -58,20 +58,17 @@ case class CreateFunctionCommand( extends LeafRunnableCommand { if (ignoreIfExists && replace) { - throw new AnalysisException("CREATE FUNCTION with both IF NOT EXISTS and REPLACE" + - " is not allowed.") + throw QueryCompilationErrors.createFuncWithBothIfNotExistsAndReplaceError() } // Disallow to define a temporary function with `IF NOT EXISTS` if (ignoreIfExists && isTemp) { - throw new AnalysisException( - "It is not allowed to define a TEMPORARY function with IF NOT EXISTS.") + throw QueryCompilationErrors.defineTempFuncWithIfNotExistsError() } // Temporary function names should not contain database prefix like "database.function" if (databaseName.isDefined && isTemp) { - throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " + - s"is not allowed: '${databaseName.get}'") + throw QueryCompilationErrors.specifyingDBInCreateTempFuncError(databaseName.get) } override def run(sparkSession: SparkSession): Seq[Row] = { @@ -183,11 +180,10 @@ case class DropFunctionCommand( val catalog = sparkSession.sessionState.catalog if (isTemp) { if (databaseName.isDefined) { - throw new AnalysisException(s"Specifying a database in DROP TEMPORARY FUNCTION " + - s"is not allowed: '${databaseName.get}'") + throw QueryCompilationErrors.specifyingDBInDropTempFuncError(databaseName.get) } if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { - throw new AnalysisException(s"Cannot drop native function '$functionName'") + throw QueryCompilationErrors.cannotDropNativeFuncError(functionName) } catalog.dropTempFunction(functionName, ifExists) } else { @@ -260,10 +256,10 @@ case class RefreshFunctionCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName, databaseName))) { - throw new AnalysisException(s"Cannot refresh built-in function $functionName") + throw QueryCompilationErrors.cannotRefreshBuiltInFuncError(functionName) } if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { - throw new AnalysisException(s"Cannot refresh temporary function $functionName") + throw QueryCompilationErrors.cannotRefreshTempFuncError(functionName) } val identifier = FunctionIdentifier( @@ -276,7 +272,7 @@ case class RefreshFunctionCommand( } else { // clear cached function and throw exception catalog.unregisterFunction(identifier) - throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) + throw QueryCompilationErrors.noSuchFunctionError(identifier) } Seq.empty[Row] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 7d4d227ad55f2..059962192c594 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -26,9 +26,9 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileContext, FsConstants, Path} import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, FsAction, FsPermission} -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier, CaseInsensitiveMap, CharVarcharUtils} -import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat @@ -255,11 +255,7 @@ case class AlterTableAddColumnsCommand( val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) if (catalogTable.tableType == CatalogTableType.VIEW) { - throw new AnalysisException( - s""" - |ALTER ADD COLUMNS does not support views. - |You must drop and re-create the views for adding the new columns. Views: $table - """.stripMargin) + throw QueryCompilationErrors.alterAddColNotSupportViewError(table) } if (DDLUtils.isDatasourceTable(catalogTable)) { @@ -274,11 +270,7 @@ case class AlterTableAddColumnsCommand( _: OrcDataSourceV2 | _: ParquetDataSourceV2 => case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") => case s => - throw new AnalysisException( - s""" - |ALTER ADD COLUMNS does not support datasource table with type $s. - |You must drop and re-create the table for adding the new columns. Tables: $table - """.stripMargin) + throw QueryCompilationErrors.alterAddColNotSupportDatasourceTableError(s, table) } } catalogTable @@ -305,34 +297,30 @@ case class LoadDataCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val targetTable = catalog.getTableMetadata(table) - val tableIdentwithDB = targetTable.identifier.quotedString + val tableIdentWithDB = targetTable.identifier.quotedString val normalizedSpec = partition.map { spec => PartitioningUtils.normalizePartitionSpec( spec, targetTable.partitionSchema, - tableIdentwithDB, + tableIdentWithDB, sparkSession.sessionState.conf.resolver) } if (DDLUtils.isDatasourceTable(targetTable)) { - throw new AnalysisException( - s"LOAD DATA is not supported for datasource tables: $tableIdentwithDB") + throw QueryCompilationErrors.loadDataNotSupportedForDatasourceTablesError(tableIdentWithDB) } if (targetTable.partitionColumnNames.nonEmpty) { if (partition.isEmpty) { - throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + - s"but no partition spec is provided") + throw QueryCompilationErrors.loadDataWithoutPartitionSpecProvidedError(tableIdentWithDB) } if (targetTable.partitionColumnNames.size != partition.get.size) { - throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " + - s"but number of columns in provided partition spec (${partition.get.size}) " + - s"do not match number of partitioned columns in table " + - s"(${targetTable.partitionColumnNames.size})") + throw QueryCompilationErrors.loadDataPartitionSizeNotMatchNumPartitionColumnsError( + tableIdentWithDB, partition.get.size, targetTable.partitionColumnNames.size) } } else { if (partition.nonEmpty) { - throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " + - s"partitioned, but a partition spec was provided.") + throw QueryCompilationErrors + .loadDataTargetTableNotPartitionedButPartitionSpecWasProvidedError(tableIdentWithDB) } } val loadPath = { @@ -367,12 +355,12 @@ case class LoadDataCommand( try { val fileStatus = fs.globStatus(loadPath) if (fileStatus == null || fileStatus.isEmpty) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + throw QueryCompilationErrors.loadDataInputPathNotExistError(path) } } catch { case e: IllegalArgumentException => log.warn(s"Exception while validating the load path $path ", e) - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") + throw QueryCompilationErrors.loadDataInputPathNotExistError(path) } if (partition.nonEmpty) { catalog.loadPartition( @@ -391,7 +379,7 @@ case class LoadDataCommand( } // Refresh the data and metadata cache to ensure the data visible to the users - sparkSession.catalog.refreshTable(tableIdentwithDB) + sparkSession.catalog.refreshTable(tableIdentWithDB) CommandUtils.updateTableStats(sparkSession, targetTable) Seq.empty[Row] @@ -449,13 +437,11 @@ case class TruncateTableCommand( val tableIdentWithDB = table.identifier.quotedString if (table.tableType == CatalogTableType.EXTERNAL) { - throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentWithDB") + throw QueryCompilationErrors.truncateTableOnExternalTablesError(tableIdentWithDB) } if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { - throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + - s"for tables that are not partitioned: $tableIdentWithDB") + throw QueryCompilationErrors.truncateTablePartitionNotSupportedForNotPartitionedTablesError( + tableIdentWithDB) } if (partitionSpec.isDefined) { DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION") @@ -479,7 +465,8 @@ case class TruncateTableCommand( // Fail if the partition spec is fully specified (not partial) and the partition does not // exist. for (spec <- partitionSpec if partLocations.isEmpty && spec.size == partCols.length) { - throw new NoSuchPartitionException(table.database, table.identifier.table, spec) + throw QueryCompilationErrors.noSuchPartitionError(table.database, + table.identifier.table, spec) } partLocations @@ -522,9 +509,8 @@ case class TruncateTableCommand( fs.setPermission(path, permission) } catch { case NonFatal(e) => - throw new SecurityException( - s"Failed to set original permission $permission back to " + - s"the created path: $path. Exception: ${e.getMessage}") + throw QueryExecutionErrors.failToSetOriginalPermissionBackError( + permission, path, e) } } optAcls.foreach { acls => @@ -547,17 +533,15 @@ case class TruncateTableCommand( fs.setAcl(path, aclEntries) } catch { case NonFatal(e) => - throw new SecurityException( - s"Failed to set original ACL $aclEntries back to " + - s"the created path: $path. Exception: ${e.getMessage}") + throw QueryExecutionErrors.failToSetOriginalACLBackError(aclEntries.toString, + path, e) } } } } catch { case NonFatal(e) => - throw new AnalysisException( - s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " + - s"because of ${e.toString}") + throw QueryCompilationErrors.failToTruncateTableWhenRemovingDataError(tableIdentWithDB, + path, e) } } } @@ -617,8 +601,7 @@ case class DescribeTableCommand( if (catalog.isTempView(table)) { if (partitionSpec.nonEmpty) { - throw new AnalysisException( - s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}") + throw QueryCompilationErrors.descPartitionNotAllowedOnTempView(table.identifier) } val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema describeSchema(schema, result, header = false) @@ -672,8 +655,7 @@ case class DescribeTableCommand( metadata: CatalogTable, result: ArrayBuffer[Row]): Unit = { if (metadata.tableType == CatalogTableType.VIEW) { - throw new AnalysisException( - s"DESC PARTITION is not allowed on a view: ${table.identifier}") + throw QueryCompilationErrors.descPartitionNotAllowedOnView(table.identifier) } DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION") val partition = catalog.getPartition(table, partitionSpec) @@ -970,8 +952,8 @@ case class ShowPartitionsCommand( */ if (table.partitionColumnNames.isEmpty) { - throw new AnalysisException( - s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB") + throw QueryCompilationErrors.showPartitionNotAllowedOnTableNotPartitionedError( + tableIdentWithDB) } DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "SHOW PARTITIONS") @@ -1086,8 +1068,7 @@ case class ShowCreateTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog if (catalog.isTempView(table)) { - throw new AnalysisException( - s"SHOW CREATE TABLE is not supported on a temporary view: ${table.identifier}") + throw QueryCompilationErrors.showCreateTableNotSupportedOnTempView(table.identifier) } else { val tableMetadata = catalog.getTableRawMetadata(table) @@ -1098,21 +1079,13 @@ case class ShowCreateTableCommand( } else { // For a Hive serde table, we try to convert it to Spark DDL. if (tableMetadata.unsupportedFeatures.nonEmpty) { - throw new AnalysisException( - "Failed to execute SHOW CREATE TABLE against table " + - s"${tableMetadata.identifier}, which is created by Hive and uses the " + - "following unsupported feature(s)\n" + - tableMetadata.unsupportedFeatures.map(" - " + _).mkString("\n") + ". " + - s"Please use `SHOW CREATE TABLE ${tableMetadata.identifier} AS SERDE` " + - "to show Hive DDL instead." - ) + throw QueryCompilationErrors.showCreateTableFailToExecuteUnsupportedFeatureError( + tableMetadata) } if ("true".equalsIgnoreCase(tableMetadata.properties.getOrElse("transactional", "false"))) { - throw new AnalysisException( - "SHOW CREATE TABLE doesn't support transactional Hive table. " + - s"Please use `SHOW CREATE TABLE ${tableMetadata.identifier} AS SERDE` " + - "to show Hive DDL instead.") + throw QueryCompilationErrors.showCreateTableNotSupportTransactionalHiveTableError( + tableMetadata) } if (tableMetadata.tableType == VIEW) { @@ -1160,12 +1133,7 @@ case class ShowCreateTableCommand( hiveSerde.outputFormat.foreach { format => builder ++= s" OUTPUTFORMAT: $format" } - throw new AnalysisException( - "Failed to execute SHOW CREATE TABLE against table " + - s"${tableMetadata.identifier}, which is created by Hive and uses the " + - "following unsupported serde configuration\n" + - builder.toString() - ) + throw QueryCompilationErrors.showCreateTableFailToExecuteUnsupportedConfError(table, builder) } else { // TODO: should we keep Hive serde properties? val newStorage = tableMetadata.storage.copy(properties = Map.empty) @@ -1242,8 +1210,8 @@ case class ShowCreateTableAsSerdeCommand( val tableMetadata = catalog.getTableRawMetadata(table) val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { - throw new AnalysisException( - s"$table is a Spark data source table. Use `SHOW CREATE TABLE` without `AS SERDE` instead.") + throw QueryCompilationErrors.showCreateTableAsSerdeNotAllowedOnSparkDataSourceTableError( + table) } else { showCreateHiveTable(tableMetadata) } @@ -1253,11 +1221,8 @@ case class ShowCreateTableAsSerdeCommand( private def showCreateHiveTable(metadata: CatalogTable): String = { def reportUnsupportedError(features: Seq[String]): Unit = { - throw new AnalysisException( - s"Failed to execute SHOW CREATE TABLE against table/view ${metadata.identifier}, " + - "which is created by Hive and uses the following unsupported feature(s)\n" + - features.map(" - " + _).mkString("\n") - ) + throw QueryCompilationErrors.showCreateTableOrViewFailToExecuteUnsupportedFeatureError( + metadata, features) } if (metadata.unsupportedFeatures.nonEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 5e92ce2195382..2eb5d76f02437 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -23,7 +23,7 @@ import org.json4s.JsonAST.{JArray, JString} import org.json4s.jackson.JsonMethods._ import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, ViewType} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation} @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpr import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.{MetadataBuilder, StructType} import org.apache.spark.sql.util.SchemaUtils @@ -86,35 +87,32 @@ case class CreateViewCommand( } if (allowExisting && replace) { - throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.") + throw QueryCompilationErrors.createViewWithBothIfNotExistsAndReplaceError() } private def isTemporary = viewType == LocalTempView || viewType == GlobalTempView // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE' if (allowExisting && isTemporary) { - throw new AnalysisException( - "It is not allowed to define a TEMPORARY view with IF NOT EXISTS.") + throw QueryCompilationErrors.defineTempViewWithIfNotExistsError() } // Temporary view names should NOT contain database prefix like "database.table" if (isTemporary && name.database.isDefined) { val database = name.database.get - throw new AnalysisException( - s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.") + throw QueryCompilationErrors.notAllowedToAddDBPrefixForTempViewError(database) } override def run(sparkSession: SparkSession): Seq[Row] = { if (!isAnalyzed) { - throw new AnalysisException("The logical plan that represents the view is not analyzed.") + throw QueryCompilationErrors.logicalPlanForViewNotAnalyzedError() } val analyzedPlan = plan if (userSpecifiedColumns.nonEmpty && userSpecifiedColumns.length != analyzedPlan.output.length) { - throw new AnalysisException(s"The number of columns produced by the SELECT clause " + - s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " + - s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).") + throw QueryCompilationErrors.createViewNumColumnsMismatchUserSpecifiedColumnLengthError( + analyzedPlan.output.length, userSpecifiedColumns.length) } val catalog = sparkSession.sessionState.catalog @@ -154,7 +152,7 @@ case class CreateViewCommand( // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view // already exists. } else if (tableMetadata.tableType != CatalogTableType.VIEW) { - throw new AnalysisException(s"$name is not a view") + throw QueryCompilationErrors.tableIsNotViewError(name) } else if (replace) { // Detect cyclic view reference on CREATE OR REPLACE VIEW. val viewIdent = tableMetadata.identifier @@ -171,9 +169,7 @@ case class CreateViewCommand( } else { // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already // exists. - throw new AnalysisException( - s"View $name already exists. If you want to update the view definition, " + - "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS") + throw QueryCompilationErrors.viewAlreadyExistsError(name) } } else { // Create the view if it doesn't exist. @@ -207,8 +203,7 @@ case class CreateViewCommand( */ private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = { if (originalText.isEmpty) { - throw new AnalysisException( - "It is not allowed to create a persisted view from the Dataset API") + throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() } val aliasedSchema = CharVarcharUtils.getRawSchema( aliasPlan(session, analyzedPlan).schema) @@ -517,8 +512,7 @@ object ViewHelper extends SQLConfHelper with Logging { // If the table identifier equals to the `viewIdent`, current view node is the same with // the altered view. We detect a view reference cycle, should throw an AnalysisException. if (ident == viewIdent) { - throw new AnalysisException(s"Recursive view $viewIdent detected " + - s"(cycle: ${newPath.mkString(" -> ")})") + throw QueryCompilationErrors.recursiveViewDetectedError(viewIdent, newPath) } else { v.children.foreach { child => checkCyclicViewReference(child, newPath, viewIdent) @@ -543,8 +537,9 @@ object ViewHelper extends SQLConfHelper with Logging { if (!isTemporary && !conf.allowAutoGeneratedAliasForView) { child.output.foreach { attr => if (attr.metadata.contains("__autoGeneratedAlias")) { - throw new AnalysisException(s"Not allowed to create a permanent view $name without " + - s"explicitly assigning an alias for expression ${attr.name}") + throw QueryCompilationErrors + .notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError(name, + attr.name) } } } @@ -562,13 +557,12 @@ object ViewHelper extends SQLConfHelper with Logging { if (!isTemporary) { val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, child) tempViews.foreach { nameParts => - throw new AnalysisException(s"Not allowed to create a permanent view $name by " + - s"referencing a temporary view ${nameParts.quoted}. " + - "Please create a temp view instead by CREATE TEMP VIEW") + throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempViewError( + name, nameParts.quoted) } tempFunctions.foreach { funcName => - throw new AnalysisException(s"Not allowed to create a permanent view $name by " + - s"referencing a temporary function `${funcName}`") + throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempFuncError( + name, funcName) } } }