Skip to content

Commit

Permalink
splt the histogram column stats when saving to hive metastore as tabl…
Browse files Browse the repository at this point in the history
…e property
  • Loading branch information
cloud-fan committed Dec 17, 2020
1 parent ef7f690 commit 4e8e984
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,46 @@ object CatalogTable {

val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"

def splitLargeTableProp(key: String, value: String, addProp: (String, String) => Unit): Unit = {
val threshold = SQLConf.get.getConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD)
if (value.length <= threshold) {
addProp(key, value)
} else {
val parts = value.grouped(threshold).toSeq
addProp(s"$key.numParts", parts.length.toString)
parts.zipWithIndex.foreach { case (part, index) =>
addProp(s"$key.part.$index", part)
}
}
}

def readLargeTableProp(props: Map[String, String], key: String): Option[String] = {
props.get(key).orElse {
if (props.filterKeys(_.startsWith(key)).isEmpty) {
None
} else {
val numParts = props.get(s"$key.numParts")
val errorMessage = s"Cannot read table property '$key' as it's corrupted."
if (numParts.isEmpty) {
throw new AnalysisException(errorMessage)
} else {
val parts = (0 until numParts.get.toInt).map { index =>
props.getOrElse(s"$key.part.$index", {
throw new AnalysisException(
s"$errorMessage Missing part $index, ${numParts.get} parts are expected.")
})
}
Some(parts.mkString)
}
}
}
}

def isLargeTableProp(originalKey: String, propKey: String): Boolean = {
propKey == originalKey || propKey == s"$originalKey.numParts" ||
propKey.startsWith(s"$originalKey.part.")
}
}

/**
Expand Down Expand Up @@ -545,7 +585,10 @@ case class CatalogColumnStat(
min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
histogram.foreach { h =>
map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
CatalogTable.splitLargeTableProp(
s"$colName.${CatalogColumnStat.KEY_HISTOGRAM}",
HistogramSerializer.serialize(h),
map.put)
}
map.toMap
}
Expand Down Expand Up @@ -649,7 +692,8 @@ object CatalogColumnStat extends Logging {
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize),
histogram = CatalogTable.readLargeTableProp(map, s"$colName.$KEY_HISTOGRAM")
.map(HistogramSerializer.deserialize),
version = map(s"${colName}.${KEY_VERSION}").toInt
))
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,16 @@ object SQLConf {
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
.createWithDefault(HiveCaseSensitiveInferenceMode.NEVER_INFER.toString)

val HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD =
buildConf("spark.sql.hive.tablePropertyLengthThreshold")
.internal()
.doc("The maximum length allowed in a single cell when storing Spark-specific information " +
"in Hive's metastore as table properties. Currently it covers 2 things: the schema's " +
"JSON string, the histogram of column statistics.")
.version("3.2.0")
.intConf
.createWithDefault(4000)

val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.internal()
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
Expand Down Expand Up @@ -3075,7 +3085,9 @@ object SQLConf {
RemovedConfig("spark.sql.optimizer.planChangeLog.rules", "3.1.0", "",
s"Please use `${PLAN_CHANGE_LOG_RULES.key}` instead."),
RemovedConfig("spark.sql.optimizer.planChangeLog.batches", "3.1.0", "",
s"Please use `${PLAN_CHANGE_LOG_BATCHES.key}` instead.")
s"Please use `${PLAN_CHANGE_LOG_BATCHES.key}` instead."),
RemovedConfig("spark.sql.sources.schemaStringLengthThreshold", "3.2.0", "4000",
s"Please use `${HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key}` instead.")
)

Map(configs.map { cfg => cfg.key -> cfg } : _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,6 @@ object StaticSQLConf {
.transform(_.toLowerCase(Locale.ROOT))
.createWithDefault("global_temp")

// This is used to control when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
// value of this property). We will split the JSON string of a schema to its length exceeds the
// threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
// that's why this conf has to be a static SQL conf.
val SCHEMA_STRING_LENGTH_THRESHOLD =
buildStaticConf("spark.sql.sources.schemaStringLengthThreshold")
.internal()
.doc("The maximum length allowed in a single cell when " +
"storing additional schema information in Hive's metastore.")
.version("1.3.1")
.intConf
.createWithDefault(4000)

val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
buildStaticConf("spark.sql.filesourceTableRelationCacheSize")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config
import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE

class RuntimeConfigSuite extends SparkFunSuite {

Expand Down Expand Up @@ -62,7 +62,7 @@ class RuntimeConfigSuite extends SparkFunSuite {
val conf = newConf()

// SQL configs
assert(!conf.isModifiable(SCHEMA_STRING_LENGTH_THRESHOLD.key))
assert(!conf.isModifiable(GLOBAL_TEMP_DATABASE.key))
assert(conf.isModifiable(CHECKPOINT_LOCATION.key))
// Core configs
assert(!conf.isModifiable(config.CPUS_PER_TASK.key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}

test("SPARK-33812: column stats round trip serialization with splitting histogram property") {
withSQLConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key -> "10") {
statsWithHgms.foreach { case (k, v) =>
val roundtrip = CatalogColumnStat.fromMap("t", k, v.toMap(k))
assert(roundtrip == Some(v))
}
}
}

test("analyze column command - result verification") {
// (data.head.productArity - 1) because the last column does not support stats collection.
assert(stats.size == data.head.productArity - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,23 +282,23 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
}

test("static SQL conf comes from SparkConf") {
val previousValue = sparkContext.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val previousValue = sparkContext.conf.get(GLOBAL_TEMP_DATABASE)
try {
sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, 2000)
sparkContext.conf.set(GLOBAL_TEMP_DATABASE, "a")
val newSession = new SparkSession(sparkContext)
assert(newSession.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) == 2000)
assert(newSession.conf.get(GLOBAL_TEMP_DATABASE) == "a")
checkAnswer(
newSession.sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}"),
Row(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000"))
newSession.sql(s"SET ${GLOBAL_TEMP_DATABASE.key}"),
Row(GLOBAL_TEMP_DATABASE.key, "a"))
} finally {
sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, previousValue)
sparkContext.conf.set(GLOBAL_TEMP_DATABASE, previousValue)
}
}

test("cannot set/unset static SQL conf") {
val e1 = intercept[AnalysisException](sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10"))
val e1 = intercept[AnalysisException](sql(s"SET ${GLOBAL_TEMP_DATABASE.key}=10"))
assert(e1.message.contains("Cannot modify the value of a static config"))
val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key))
val e2 = intercept[AnalysisException](spark.conf.unset(GLOBAL_TEMP_DATABASE.key))
assert(e2.message.contains("Cannot modify the value of a static config"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,18 +429,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val properties = new mutable.HashMap[String, String]

properties.put(CREATED_SPARK_VERSION, table.createVersion)

// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}
CatalogTable.splitLargeTableProp(DATASOURCE_SCHEMA, schema.json, properties.put)

if (partitionColumns.nonEmpty) {
properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
Expand Down Expand Up @@ -744,8 +733,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
case None if table.tableType == VIEW =>
// If this is a view created by Spark 2.2 or higher versions, we should restore its schema
// from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
table = table.copy(schema = getSchemaFromTableProperties(table))
CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson =>
table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType])
}

// No provider in table properties, which means this is a Hive serde table.
Expand Down Expand Up @@ -795,8 +784,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

// If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
// schema from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
if (schemaJson.isDefined) {
val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType]
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down Expand Up @@ -836,7 +826,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)

val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
.map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType())
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down Expand Up @@ -1340,7 +1331,6 @@ object HiveExternalCatalog {
val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
Expand Down Expand Up @@ -1373,40 +1363,6 @@ object HiveExternalCatalog {
val EMPTY_DATA_SCHEMA = new StructType()
.add("col", "array<string>", nullable = true, comment = "from deserializer")

// A persisted data source table always store its schema in the catalog.
private def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
val props = metadata.properties
val schema = props.get(DATASOURCE_SCHEMA)
if (schema.isDefined) {
// Originally, we used `spark.sql.sources.schema` to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using `spark.sql.sources.schema` any more, we need to still support.
DataType.fromJson(schema.get).asInstanceOf[StructType]
} else if (props.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) {
// If there is no schema information in table properties, it means the schema of this table
// was empty when saving into metastore, which is possible in older version(prior to 2.1) of
// Spark. We should respect it.
new StructType()
} else {
val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS)
if (numSchemaParts.isDefined) {
val parts = (0 until numSchemaParts.get.toInt).map { index =>
val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
if (part == null) {
throw new AnalysisException(errorMessage +
s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).")
}
part
}
// Stick all parts back to a single schema string.
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
} else {
throw new AnalysisException(errorMessage)
}
}
}

private def getColumnNamesByType(
props: Map[String, String],
colType: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -580,9 +580,7 @@ private[hive] class HiveClientImpl(
val it = oldTable.getParameters.entrySet.iterator
while (it.hasNext) {
val entry = it.next()
val isSchemaProp = entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) ||
entry.getKey == DATASOURCE_SCHEMA || entry.getKey == DATASOURCE_SCHEMA_NUMPARTS
if (isSchemaProp) {
if (CatalogTable.isLargeTableProp(DATASOURCE_SCHEMA, entry.getKey)) {
it.remove()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}

test("SPARK-6024 wide schema support") {
assert(spark.sparkContext.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) == 4000)
assert(conf.getConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD) == 4000)
withTable("wide_schema") {
withTempDir { tempDir =>
// We will need 80 splits for this schema if the threshold is 4000.
Expand Down Expand Up @@ -1355,7 +1355,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val newSession = sparkSession.newSession()
newSession.sql("CREATE TABLE abc(i int) USING json")
val tableMeta = newSession.sessionState.catalog.getTableMetadata(TableIdentifier("abc"))
assert(tableMeta.properties(DATASOURCE_SCHEMA_NUMPARTS).toInt == 1)
assert(tableMeta.properties.contains(DATASOURCE_SCHEMA))
assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json")
}
}
Expand Down

0 comments on commit 4e8e984

Please sign in to comment.