Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into revert
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jul 12, 2021
2 parents 736e6df + d03f716 commit c730be7
Show file tree
Hide file tree
Showing 12 changed files with 505 additions and 184 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import java.util.concurrent.TimeoutException

import com.fasterxml.jackson.core.JsonToken
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileStatus, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.InternalCompilerException

Expand Down Expand Up @@ -1544,6 +1545,23 @@ object QueryExecutionErrors {
new NullPointerException(s"Value at index $index is null")
}

def onlySupportDataSourcesProvidingFileFormatError(providingClass: String): Throwable = {
new SparkException(s"Only Data Sources providing FileFormat are supported: $providingClass")
}

def failToSetOriginalPermissionBackError(
permission: FsPermission,
path: Path,
e: Throwable): Throwable = {
new SecurityException(s"Failed to set original permission $permission back to " +
s"the created path: $path. Exception: ${e.getMessage}")
}

def failToSetOriginalACLBackError(aclEntries: String, path: Path, e: Throwable): Throwable = {
new SecurityException(s"Failed to set original ACL $aclEntries back to " +
s"the created path: $path. Exception: ${e.getMessage}")
}

def multiFailuresInStageMaterializationError(error: Throwable): Throwable = {
new SparkException("Multiple failures in stage materialization.", error)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.sql.execution.command

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._


Expand All @@ -44,7 +44,7 @@ case class AnalyzeColumnCommand(
tableIdent.database match {
case Some(db) if db == sparkSession.sharedState.globalTempViewManager.database =>
val plan = sessionState.catalog.getGlobalTempView(tableIdent.identifier).getOrElse {
throw new NoSuchTableException(db = db, table = tableIdent.identifier)
throw QueryCompilationErrors.noSuchTableError(db, tableIdent.identifier)
}
analyzeColumnInTempView(plan, sparkSession)
case Some(_) =>
Expand Down Expand Up @@ -72,8 +72,7 @@ case class AnalyzeColumnCommand(

private def analyzeColumnInTempView(plan: LogicalPlan, sparkSession: SparkSession): Unit = {
if (!analyzeColumnInCachedData(plan, sparkSession)) {
throw new AnalysisException(
s"Temporary view $tableIdent is not cached for analyzing columns.")
throw QueryCompilationErrors.tempViewNotCachedForAnalyzingColumnsError(tableIdent)
}
}

Expand All @@ -87,15 +86,14 @@ case class AnalyzeColumnCommand(
} else {
columnNames.get.map { col =>
val exprOption = relation.output.find(attr => conf.resolver(attr.name, col))
exprOption.getOrElse(throw new AnalysisException(s"Column $col does not exist."))
exprOption.getOrElse(throw QueryCompilationErrors.columnDoesNotExistError(col))
}
}
// Make sure the column types are supported for stats gathering.
columnsToAnalyze.foreach { attr =>
if (!supportsType(attr.dataType)) {
throw new AnalysisException(
s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " +
"and Spark does not support statistics collection on this column type.")
throw QueryCompilationErrors.columnTypeNotSupportStatisticsCollectionError(
attr.name, tableIdent, attr.dataType)
}
}
columnsToAnalyze
Expand All @@ -108,7 +106,7 @@ case class AnalyzeColumnCommand(
// Analyzes a catalog view if the view is cached
val plan = sparkSession.table(tableIdent.quotedString).logicalPlan
if (!analyzeColumnInCachedData(plan, sparkSession)) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError()
}
} else {
val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.util.PartitioningUtils

/**
Expand Down Expand Up @@ -58,11 +59,8 @@ case class AnalyzePartitionCommand(
val tableId = table.identifier
val schemaColumns = table.partitionColumnNames.mkString(",")
val specColumns = normalizedPartitionSpec.keys.mkString(",")
throw new AnalysisException("The list of partition columns with values " +
s"in partition specification for table '${tableId.table}' " +
s"in database '${tableId.database.get}' is not a prefix of the list of " +
"partition columns defined in the table schema. " +
s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].")
throw QueryCompilationErrors.unexpectedPartitionColumnPrefixError(
tableId.table, tableId.database.get, schemaColumns, specColumns)
}

val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get)
Expand All @@ -79,7 +77,7 @@ case class AnalyzePartitionCommand(
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError()
}

val partitionValueSpec = getPartitionSpec(tableMeta)
Expand All @@ -88,7 +86,8 @@ case class AnalyzePartitionCommand(

if (partitions.isEmpty) {
if (partitionValueSpec.isDefined) {
throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get)
throw QueryCompilationErrors.noSuchPartitionError(
db, tableIdent.table, partitionValueSpec.get)
} else {
// the user requested to analyze all partitions for a table which has no partitions
// return normally, since there is nothing to do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex}
import org.apache.spark.sql.internal.{SessionState, SQLConf}
Expand Down Expand Up @@ -217,7 +218,7 @@ object CommandUtils extends Logging {
table.count()
}
} else {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError()
}
} else {
// Compute stats for the whole table
Expand Down Expand Up @@ -381,8 +382,8 @@ object CommandUtils extends Logging {
Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
nullArray)
case _ =>
throw new AnalysisException("Analyzing column statistics is not supported for column " +
s"${col.name} of data type: ${col.dataType}.")
throw QueryCompilationErrors.analyzingColumnStatisticsNotSupportedForColumnTypeError(
col.name, col.dataType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import java.net.URI
import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand Down Expand Up @@ -114,10 +115,8 @@ object DataWritingCommand {
if (fs.exists(filePath) &&
fs.getFileStatus(filePath).isDirectory &&
fs.listStatus(filePath).length != 0) {
throw new AnalysisException(
s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " +
s"${tablePath} . To allow overwriting the existing non-empty directory, " +
s"set '${SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key}' to true.")
throw QueryCompilationErrors.createTableAsSelectWithNonEmptyDirectoryError(
tablePath.toString)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._

/**
Expand Down Expand Up @@ -61,8 +61,8 @@ case class InsertIntoDataSourceDirCommand(

val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
if (!isFileFormat) {
throw new SparkException(
"Only Data Sources providing FileFormat are supported: " + dataSource.providingClass)
throw QueryExecutionErrors.onlySupportDataSourcesProvidingFileFormatError(
dataSource.providingClass.toString)
}

val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{CommandExecutionMode, SparkPlan}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -53,7 +54,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
throw QueryCompilationErrors.tableAlreadyExistsError(table.identifier.unquotedString)
}
}

Expand Down Expand Up @@ -156,7 +157,8 @@ case class CreateDataSourceTableAsSelectCommand(
s"Expect the table $tableName has been dropped when the save mode is Overwrite")

if (mode == SaveMode.ErrorIfExists) {
throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
throw QueryCompilationErrors.tableAlreadyExistsError(
tableName, " You need to drop it first.")
}
if (mode == SaveMode.Ignore) {
// Since the table already exists and the save mode is Ignore, we will just return.
Expand Down
Loading

0 comments on commit c730be7

Please sign in to comment.