Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert hive #1

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy.{ANSI, STRICT}
import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, Decimal, DecimalType, MapType, NullType, StringType, StructField, StructType, UserDefinedType}
import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, Decimal, DecimalType, MapType, NullType, StructField, StructType, UserDefinedType}
import org.apache.spark.sql.types.DecimalType.{forType, fromDecimal}

object DataTypeUtils {
Expand All @@ -47,31 +47,6 @@ object DataTypeUtils {
DataType.equalsIgnoreCaseAndNullability(from, to)
}

/**
* Compares two types, ignoring nullability of ArrayType, MapType, StructType, ignoring case
* sensitivity of field names in StructType as well as differences in collation for String types.
*/
def equalsIgnoreCaseNullabilityAndCollation(from: DataType, to: DataType): Boolean = {
(from, to) match {
case (ArrayType(fromElement, _), ArrayType(toElement, _)) =>
equalsIgnoreCaseNullabilityAndCollation(fromElement, toElement)

case (MapType(fromKey, fromValue, _), MapType(toKey, toValue, _)) =>
equalsIgnoreCaseNullabilityAndCollation(fromKey, toKey) &&
equalsIgnoreCaseNullabilityAndCollation(fromValue, toValue)

case (StructType(fromFields), StructType(toFields)) =>
fromFields.length == toFields.length &&
fromFields.zip(toFields).forall { case (l, r) =>
l.name.equalsIgnoreCase(r.name) &&
equalsIgnoreCaseNullabilityAndCollation(l.dataType, r.dataType)
}

case (_: StringType, _: StringType) => true
case (fromDataType, toDataType) => fromDataType == toDataType
}
}

private val SparkGeneratedName = """col\d+""".r
private def isSparkGeneratedName(name: String): Boolean = name match {
case SparkGeneratedName(_*) => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,21 +303,4 @@ private[spark] object SchemaUtils {
case _ => false
}
}

/**
* Replaces any collated string type with non collated StringType
* recursively in the given data type.
*/
def replaceCollatedStringWithString(dt: DataType): DataType = dt match {
case ArrayType(et, nullable) =>
ArrayType(replaceCollatedStringWithString(et), nullable)
case MapType(kt, vt, nullable) =>
MapType(replaceCollatedStringWithString(kt), replaceCollatedStringWithString(vt), nullable)
case StructType(fields) =>
StructType(fields.map { field =>
field.copy(dataType = replaceCollatedStringWithString(field.dataType))
})
case _: StringType => StringType
case _ => dt
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils

/**
* A persistent implementation of the system catalog using Hive.
Expand Down Expand Up @@ -234,39 +233,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
tableDefinition.storage.locationUri
}

val hiveCompatibleSchema = tryGetHiveCompatibleSchema(tableDefinition.schema)

if (DDLUtils.isDatasourceTable(tableDefinition)) {
// To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
// support, no column nullability, etc., we should do some extra works before saving table
// metadata into Hive metastore:
// 1. Put table metadata like table schema, partition columns, etc. in table properties.
// 2. Check if this table is hive compatible.
// 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket
// spec to empty and save table metadata to Hive.
// 2.2 If it's hive compatible, set serde information in table metadata and try to save
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
val tableProperties = tableMetaToTableProps(tableDefinition)

// put table provider and partition provider in table properties.
tableProperties.put(DATASOURCE_PROVIDER, tableDefinition.provider.get)
if (tableDefinition.tracksPartitionsInCatalog) {
tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
}

// we have to set the table schema here so that the table schema JSON
// string in the table properties still uses the original schema
val hiveTable = tableDefinition.copy(
schema = hiveCompatibleSchema,
properties = tableDefinition.properties ++ tableProperties
)

createDataSourceTable(
hiveTable.withNewStorage(locationUri = tableLocation),
tableDefinition.withNewStorage(locationUri = tableLocation),
ignoreIfExists)
} else {
val tableWithDataSourceProps = tableDefinition.copy(
schema = hiveCompatibleSchema,
// We can't leave `locationUri` empty and count on Hive metastore to set a default table
// location, because Hive metastore uses hive.metastore.warehouse.dir to generate default
// table location for tables in default database, while we expect to use the location of
Expand Down Expand Up @@ -296,6 +268,23 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val provider = table.provider.get
val options = new SourceOptions(table.storage.properties)

// To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
// support, no column nullability, etc., we should do some extra works before saving table
// metadata into Hive metastore:
// 1. Put table metadata like table schema, partition columns, etc. in table properties.
// 2. Check if this table is hive compatible.
// 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket
// spec to empty and save table metadata to Hive.
// 2.2 If it's hive compatible, set serde information in table metadata and try to save
// it to Hive. If it fails, treat it as not hive compatible and go back to 2.1
val tableProperties = tableMetaToTableProps(table)

// put table provider and partition provider in table properties.
tableProperties.put(DATASOURCE_PROVIDER, provider)
if (table.tracksPartitionsInCatalog) {
tableProperties.put(TABLE_PARTITION_PROVIDER, TABLE_PARTITION_PROVIDER_CATALOG)
}

// Ideally we should also put `locationUri` in table properties like provider, schema, etc.
// However, in older version of Spark we already store table location in storage properties
// with key "path". Here we keep this behaviour for backward compatibility.
Expand All @@ -314,7 +303,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
locationUri = None,
properties = storagePropsWithLocation),
schema = StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema),
bucketSpec = None)
bucketSpec = None,
properties = table.properties ++ tableProperties)
}

// converts the table metadata to Hive compatible format, i.e. set the serde information.
Expand All @@ -336,8 +326,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
outputFormat = serde.outputFormat,
serde = serde.serde,
properties = storagePropsWithLocation
)
)
),
properties = table.properties ++ tableProperties)
}

val qualifiedTableName = table.identifier.quotedString
Expand Down Expand Up @@ -679,16 +669,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val schemaProps =
tableMetaToTableProps(oldTable, StructType(newDataSchema ++ oldTable.partitionSchema)).toMap

val hiveSchema = tryGetHiveCompatibleSchema(newDataSchema)

if (isDatasourceTable(oldTable)) {
// For data source tables, first try to write it with the schema set; if that does not work,
// try again with updated properties and the partition schema. This is a simplified version of
// what createDataSourceTable() does, and may leave the table in a state unreadable by Hive
// (for example, the schema does not match the data source schema, or does not match the
// storage descriptor).
try {
client.alterTableDataSchema(db, table, hiveSchema, schemaProps)
client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
} catch {
case NonFatal(e) =>
val warningMessage = log"Could not alter schema of table " +
Expand All @@ -698,21 +686,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, schemaProps)
}
} else {
client.alterTableDataSchema(db, table, hiveSchema, schemaProps)
client.alterTableDataSchema(db, table, newDataSchema, schemaProps)
}
}

/**
* Tries to fix the schema so that all column data types are Hive-compatible
* ie. the types are converted to the types that Hive supports.
*/
private def tryGetHiveCompatibleSchema(schema: StructType): StructType = {
// Since collated strings do not exist in Hive as a type we need to replace them with
// the the regular string type. However, as we save the original schema in the table
// properties we will be able to restore the original schema when reading back the table.
SchemaUtils.replaceCollatedStringWithString(schema).asInstanceOf[StructType]
}

/** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */
override def alterTableStats(
db: String,
Expand Down Expand Up @@ -815,7 +792,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

if (DataTypeUtils.equalsIgnoreCaseNullabilityAndCollation(reorderedSchema, table.schema) ||
if (DataTypeUtils.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema) ||
options.respectSparkSchema) {
hiveTable.copy(
schema = reorderedSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,14 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
})
}

test("write collated strings as regular strings in hive - but read them back as collated") {
test("table with collated strings in schema") {
val catalog = newBasicCatalog()
val tableName = "collation_tbl"
val columnName = "col1"

val collationsSchema = StructType(Seq(
StructField(columnName, StringType("UNICODE"))
))
val noCollationsSchema = StructType(Seq(
StructField(columnName, StringType)
))

val tableDDL = CatalogTable(
identifier = TableIdentifier(tableName, Some("db1")),
Expand All @@ -223,9 +220,6 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {

catalog.createTable(tableDDL, ignoreIfExists = false)

val rawTable = externalCatalog.getRawTable("db1", tableName)
assert(DataTypeUtils.sameType(rawTable.schema, noCollationsSchema))

val readBackTable = externalCatalog.getTable("db1", tableName)
assert(DataTypeUtils.sameType(readBackTable.schema, collationsSchema))

Expand All @@ -235,9 +229,6 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
))
catalog.alterTableDataSchema("db1", tableName, newSchema)

val alteredRawTable = externalCatalog.getRawTable("db1", tableName)
assert(DataTypeUtils.sameType(alteredRawTable.schema, noCollationsSchema))

val alteredTable = externalCatalog.getTable("db1", tableName)
assert(DataTypeUtils.sameType(alteredTable.schema, newSchema))
}
Expand Down
Loading