Skip to content

Commit

Permalink
[SPARK-33812][SQL] Split the histogram column stats when saving to hi…
Browse files Browse the repository at this point in the history
…ve metastore as table property

### What changes were proposed in this pull request?

Hive metastore has a limitation for the table property length. To work around it, Spark split the schema json string into several parts when saving to hive metastore as table properties. We need to do the same for histogram column stats as it can go very big.

This PR refactors the table property splitting code, so that we can share it between the schema json string and histogram column stats.

### Why are the changes needed?

To be able to analyze table when histogram data is big.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing test and new tests

Closes #30809 from cloud-fan/cbo.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
cloud-fan authored and HyukjinKwon committed Dec 19, 2020
1 parent 554600c commit de234ee
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,51 @@ object CatalogTable {

val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"

def splitLargeTableProp(
key: String,
value: String,
addProp: (String, String) => Unit,
defaultThreshold: Int): Unit = {
val threshold = SQLConf.get.getConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD)
.getOrElse(defaultThreshold)
if (value.length <= threshold) {
addProp(key, value)
} else {
val parts = value.grouped(threshold).toSeq
addProp(s"$key.numParts", parts.length.toString)
parts.zipWithIndex.foreach { case (part, index) =>
addProp(s"$key.part.$index", part)
}
}
}

def readLargeTableProp(props: Map[String, String], key: String): Option[String] = {
props.get(key).orElse {
if (props.filterKeys(_.startsWith(key)).isEmpty) {
None
} else {
val numParts = props.get(s"$key.numParts")
val errorMessage = s"Cannot read table property '$key' as it's corrupted."
if (numParts.isEmpty) {
throw new AnalysisException(errorMessage)
} else {
val parts = (0 until numParts.get.toInt).map { index =>
props.getOrElse(s"$key.part.$index", {
throw new AnalysisException(
s"$errorMessage Missing part $index, ${numParts.get} parts are expected.")
})
}
Some(parts.mkString)
}
}
}
}

def isLargeTableProp(originalKey: String, propKey: String): Boolean = {
propKey == originalKey || propKey == s"$originalKey.numParts" ||
propKey.startsWith(s"$originalKey.part.")
}
}

/**
Expand Down Expand Up @@ -546,7 +591,11 @@ case class CatalogColumnStat(
min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
histogram.foreach { h =>
map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
CatalogTable.splitLargeTableProp(
s"$colName.${CatalogColumnStat.KEY_HISTOGRAM}",
HistogramSerializer.serialize(h),
map.put,
4000)
}
map.toMap
}
Expand Down Expand Up @@ -650,7 +699,8 @@ object CatalogColumnStat extends Logging {
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize),
histogram = CatalogTable.readLargeTableProp(map, s"$colName.$KEY_HISTOGRAM")
.map(HistogramSerializer.deserialize),
version = map(s"${colName}.${KEY_VERSION}").toInt
))
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,16 @@ object SQLConf {
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
.createWithDefault(HiveCaseSensitiveInferenceMode.NEVER_INFER.toString)

val HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD =
buildConf("spark.sql.hive.tablePropertyLengthThreshold")
.internal()
.doc("The maximum length allowed in a single cell when storing Spark-specific information " +
"in Hive's metastore as table properties. Currently it covers 2 things: the schema's " +
"JSON string, the histogram of column statistics.")
.version("3.2.0")
.intConf
.createOptional

val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.internal()
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
Expand Down Expand Up @@ -3052,7 +3062,9 @@ object SQLConf {
"Avoid to depend on this optimization to prevent a potential correctness issue. " +
"If you must use, use 'SparkSessionExtensions' instead to inject it as a custom rule."),
DeprecatedConfig(CONVERT_CTAS.key, "3.1",
s"Set '${LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key}' to false instead.")
s"Set '${LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key}' to false instead."),
DeprecatedConfig("spark.sql.sources.schemaStringLengthThreshold", "3.2",
s"Use '${HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key}' instead.")
)

Map(configs.map { cfg => cfg.key -> cfg } : _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config
import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE

class RuntimeConfigSuite extends SparkFunSuite {

Expand Down Expand Up @@ -62,7 +62,7 @@ class RuntimeConfigSuite extends SparkFunSuite {
val conf = newConf()

// SQL configs
assert(!conf.isModifiable(SCHEMA_STRING_LENGTH_THRESHOLD.key))
assert(!conf.isModifiable(GLOBAL_TEMP_DATABASE.key))
assert(conf.isModifiable(CHECKPOINT_LOCATION.key))
// Core configs
assert(!conf.isModifiable(config.CPUS_PER_TASK.key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}

test("SPARK-33812: column stats round trip serialization with splitting histogram property") {
withSQLConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key -> "10") {
statsWithHgms.foreach { case (k, v) =>
val roundtrip = CatalogColumnStat.fromMap("t", k, v.toMap(k))
assert(roundtrip == Some(v))
}
}
}

test("analyze column command - result verification") {
// (data.head.productArity - 1) because the last column does not support stats collection.
assert(stats.size == data.head.productArity - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,23 +282,23 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
}

test("static SQL conf comes from SparkConf") {
val previousValue = sparkContext.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val previousValue = sparkContext.conf.get(GLOBAL_TEMP_DATABASE)
try {
sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, 2000)
sparkContext.conf.set(GLOBAL_TEMP_DATABASE, "a")
val newSession = new SparkSession(sparkContext)
assert(newSession.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) == 2000)
assert(newSession.conf.get(GLOBAL_TEMP_DATABASE) == "a")
checkAnswer(
newSession.sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}"),
Row(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000"))
newSession.sql(s"SET ${GLOBAL_TEMP_DATABASE.key}"),
Row(GLOBAL_TEMP_DATABASE.key, "a"))
} finally {
sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, previousValue)
sparkContext.conf.set(GLOBAL_TEMP_DATABASE, previousValue)
}
}

test("cannot set/unset static SQL conf") {
val e1 = intercept[AnalysisException](sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10"))
val e1 = intercept[AnalysisException](sql(s"SET ${GLOBAL_TEMP_DATABASE.key}=10"))
assert(e1.message.contains("Cannot modify the value of a static config"))
val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key))
val e2 = intercept[AnalysisException](spark.conf.unset(GLOBAL_TEMP_DATABASE.key))
assert(e2.message.contains("Cannot modify the value of a static config"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,18 +429,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val properties = new mutable.HashMap[String, String]

properties.put(CREATED_SPARK_VERSION, table.createVersion)

// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}
CatalogTable.splitLargeTableProp(
DATASOURCE_SCHEMA, schema.json, properties.put, conf.get(SCHEMA_STRING_LENGTH_THRESHOLD))

if (partitionColumns.nonEmpty) {
properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
Expand Down Expand Up @@ -744,8 +734,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
case None if table.tableType == VIEW =>
// If this is a view created by Spark 2.2 or higher versions, we should restore its schema
// from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
table = table.copy(schema = getSchemaFromTableProperties(table))
CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson =>
table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType])
}

// No provider in table properties, which means this is a Hive serde table.
Expand Down Expand Up @@ -795,8 +785,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

// If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
// schema from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
if (schemaJson.isDefined) {
val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType]
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down Expand Up @@ -836,7 +827,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)

val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
.map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType())
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down Expand Up @@ -1340,7 +1332,6 @@ object HiveExternalCatalog {
val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
Expand Down Expand Up @@ -1373,40 +1364,6 @@ object HiveExternalCatalog {
val EMPTY_DATA_SCHEMA = new StructType()
.add("col", "array<string>", nullable = true, comment = "from deserializer")

// A persisted data source table always store its schema in the catalog.
private def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
val props = metadata.properties
val schema = props.get(DATASOURCE_SCHEMA)
if (schema.isDefined) {
// Originally, we used `spark.sql.sources.schema` to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using `spark.sql.sources.schema` any more, we need to still support.
DataType.fromJson(schema.get).asInstanceOf[StructType]
} else if (props.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) {
// If there is no schema information in table properties, it means the schema of this table
// was empty when saving into metastore, which is possible in older version(prior to 2.1) of
// Spark. We should respect it.
new StructType()
} else {
val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS)
if (numSchemaParts.isDefined) {
val parts = (0 until numSchemaParts.get.toInt).map { index =>
val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
if (part == null) {
throw new AnalysisException(errorMessage +
s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).")
}
part
}
// Stick all parts back to a single schema string.
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
} else {
throw new AnalysisException(errorMessage)
}
}
}

private def getColumnNamesByType(
props: Map[String, String],
colType: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -580,9 +580,7 @@ private[hive] class HiveClientImpl(
val it = oldTable.getParameters.entrySet.iterator
while (it.hasNext) {
val entry = it.next()
val isSchemaProp = entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) ||
entry.getKey == DATASOURCE_SCHEMA || entry.getKey == DATASOURCE_SCHEMA_NUMPARTS
if (isSchemaProp) {
if (CatalogTable.isLargeTableProp(DATASOURCE_SCHEMA, entry.getKey)) {
it.remove()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val e = intercept[AnalysisException] {
sharedState.externalCatalog.getTable("default", "t")
}.getMessage
assert(e.contains(s"Could not read schema from the hive metastore because it is corrupted"))
assert(e.contains("Cannot read table property 'spark.sql.sources.schema' as it's corrupted"))

withDebugMode {
val tableMeta = sharedState.externalCatalog.getTable("default", "t")
Expand All @@ -1355,7 +1355,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val newSession = sparkSession.newSession()
newSession.sql("CREATE TABLE abc(i int) USING json")
val tableMeta = newSession.sessionState.catalog.getTableMetadata(TableIdentifier("abc"))
assert(tableMeta.properties(DATASOURCE_SCHEMA_NUMPARTS).toInt == 1)
assert(tableMeta.properties.contains(DATASOURCE_SCHEMA))
assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json")
}
}
Expand Down

0 comments on commit de234ee

Please sign in to comment.