Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33812][SQL] Split the histogram column stats when saving to hive metastore as table property #30809

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,51 @@ object CatalogTable {

val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames"
val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames"

def splitLargeTableProp(
key: String,
value: String,
addProp: (String, String) => Unit,
defaultThreshold: Int): Unit = {
val threshold = SQLConf.get.getConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD)
.getOrElse(defaultThreshold)
if (value.length <= threshold) {
addProp(key, value)
} else {
val parts = value.grouped(threshold).toSeq
addProp(s"$key.numParts", parts.length.toString)
parts.zipWithIndex.foreach { case (part, index) =>
addProp(s"$key.part.$index", part)
}
}
}

def readLargeTableProp(props: Map[String, String], key: String): Option[String] = {
props.get(key).orElse {
if (props.filterKeys(_.startsWith(key)).isEmpty) {
None
} else {
val numParts = props.get(s"$key.numParts")
val errorMessage = s"Cannot read table property '$key' as it's corrupted."
if (numParts.isEmpty) {
throw new AnalysisException(errorMessage)
} else {
val parts = (0 until numParts.get.toInt).map { index =>
props.getOrElse(s"$key.part.$index", {
throw new AnalysisException(
s"$errorMessage Missing part $index, ${numParts.get} parts are expected.")
})
}
Some(parts.mkString)
}
}
}
}

def isLargeTableProp(originalKey: String, propKey: String): Boolean = {
propKey == originalKey || propKey == s"$originalKey.numParts" ||
propKey.startsWith(s"$originalKey.part.")
}
}

/**
Expand Down Expand Up @@ -545,7 +590,11 @@ case class CatalogColumnStat(
min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
histogram.foreach { h =>
map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
CatalogTable.splitLargeTableProp(
s"$colName.${CatalogColumnStat.KEY_HISTOGRAM}",
HistogramSerializer.serialize(h),
map.put,
4000)
}
map.toMap
}
Expand Down Expand Up @@ -649,7 +698,8 @@ object CatalogColumnStat extends Logging {
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize),
histogram = CatalogTable.readLargeTableProp(map, s"$colName.$KEY_HISTOGRAM")
.map(HistogramSerializer.deserialize),
version = map(s"${colName}.${KEY_VERSION}").toInt
))
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,16 @@ object SQLConf {
.checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString))
.createWithDefault(HiveCaseSensitiveInferenceMode.NEVER_INFER.toString)

val HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD =
buildConf("spark.sql.hive.tablePropertyLengthThreshold")
.internal()
.doc("The maximum length allowed in a single cell when storing Spark-specific information " +
"in Hive's metastore as table properties. Currently it covers 2 things: the schema's " +
"JSON string, the histogram of column statistics.")
.version("3.2.0")
.intConf
.createOptional

val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.internal()
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
Expand Down Expand Up @@ -3029,7 +3039,9 @@ object SQLConf {
"Avoid to depend on this optimization to prevent a potential correctness issue. " +
"If you must use, use 'SparkSessionExtensions' instead to inject it as a custom rule."),
DeprecatedConfig(CONVERT_CTAS.key, "3.1",
s"Set '${LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key}' to false instead.")
s"Set '${LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key}' to false instead."),
DeprecatedConfig("spark.sql.sources.schemaStringLengthThreshold", "3.2",
s"Use '${HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key}' instead.")
)

Map(configs.map { cfg => cfg.key -> cfg } : _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config
import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION
import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE

class RuntimeConfigSuite extends SparkFunSuite {

Expand Down Expand Up @@ -62,7 +62,7 @@ class RuntimeConfigSuite extends SparkFunSuite {
val conf = newConf()

// SQL configs
assert(!conf.isModifiable(SCHEMA_STRING_LENGTH_THRESHOLD.key))
assert(!conf.isModifiable(GLOBAL_TEMP_DATABASE.key))
assert(conf.isModifiable(CHECKPOINT_LOCATION.key))
// Core configs
assert(!conf.isModifiable(config.CPUS_PER_TASK.key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}

test("SPARK-33812: column stats round trip serialization with splitting histogram property") {
withSQLConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key -> "10") {
statsWithHgms.foreach { case (k, v) =>
val roundtrip = CatalogColumnStat.fromMap("t", k, v.toMap(k))
assert(roundtrip == Some(v))
}
}
}

test("analyze column command - result verification") {
// (data.head.productArity - 1) because the last column does not support stats collection.
assert(stats.size == data.head.productArity - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,23 +282,23 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
}

test("static SQL conf comes from SparkConf") {
val previousValue = sparkContext.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val previousValue = sparkContext.conf.get(GLOBAL_TEMP_DATABASE)
try {
sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, 2000)
sparkContext.conf.set(GLOBAL_TEMP_DATABASE, "a")
val newSession = new SparkSession(sparkContext)
assert(newSession.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) == 2000)
assert(newSession.conf.get(GLOBAL_TEMP_DATABASE) == "a")
checkAnswer(
newSession.sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}"),
Row(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000"))
newSession.sql(s"SET ${GLOBAL_TEMP_DATABASE.key}"),
Row(GLOBAL_TEMP_DATABASE.key, "a"))
} finally {
sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, previousValue)
sparkContext.conf.set(GLOBAL_TEMP_DATABASE, previousValue)
}
}

test("cannot set/unset static SQL conf") {
val e1 = intercept[AnalysisException](sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10"))
val e1 = intercept[AnalysisException](sql(s"SET ${GLOBAL_TEMP_DATABASE.key}=10"))
assert(e1.message.contains("Cannot modify the value of a static config"))
val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key))
val e2 = intercept[AnalysisException](spark.conf.unset(GLOBAL_TEMP_DATABASE.key))
assert(e2.message.contains("Cannot modify the value of a static config"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,18 +429,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val properties = new mutable.HashMap[String, String]

properties.put(CREATED_SPARK_VERSION, table.createVersion)

// Serialized JSON schema string may be too long to be stored into a single metastore table
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
parts.zipWithIndex.foreach { case (part, index) =>
properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
}
CatalogTable.splitLargeTableProp(
DATASOURCE_SCHEMA, schema.json, properties.put, conf.get(SCHEMA_STRING_LENGTH_THRESHOLD))

if (partitionColumns.nonEmpty) {
properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString)
Expand Down Expand Up @@ -744,8 +734,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
case None if table.tableType == VIEW =>
// If this is a view created by Spark 2.2 or higher versions, we should restore its schema
// from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
table = table.copy(schema = getSchemaFromTableProperties(table))
CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson =>
table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType])
}

// No provider in table properties, which means this is a Hive serde table.
Expand Down Expand Up @@ -795,8 +785,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat

// If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its
// schema from table properties.
if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
if (schemaJson.isDefined) {
val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType]
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down Expand Up @@ -836,7 +827,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)

val schemaFromTableProps = getSchemaFromTableProperties(table)
val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA)
.map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType())
val partColumnNames = getPartitionColumnsFromTableProperties(table)
val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)

Expand Down Expand Up @@ -1340,7 +1332,6 @@ object HiveExternalCatalog {
val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets"
Expand Down Expand Up @@ -1373,40 +1364,6 @@ object HiveExternalCatalog {
val EMPTY_DATA_SCHEMA = new StructType()
.add("col", "array<string>", nullable = true, comment = "from deserializer")

// A persisted data source table always store its schema in the catalog.
private def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
val props = metadata.properties
val schema = props.get(DATASOURCE_SCHEMA)
if (schema.isDefined) {
// Originally, we used `spark.sql.sources.schema` to store the schema of a data source table.
// After SPARK-6024, we removed this flag.
// Although we are not using `spark.sql.sources.schema` any more, we need to still support.
DataType.fromJson(schema.get).asInstanceOf[StructType]
} else if (props.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) {
// If there is no schema information in table properties, it means the schema of this table
// was empty when saving into metastore, which is possible in older version(prior to 2.1) of
// Spark. We should respect it.
new StructType()
} else {
val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS)
if (numSchemaParts.isDefined) {
val parts = (0 until numSchemaParts.get.toInt).map { index =>
val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
if (part == null) {
throw new AnalysisException(errorMessage +
s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).")
}
part
}
// Stick all parts back to a single schema string.
DataType.fromJson(parts.mkString).asInstanceOf[StructType]
} else {
throw new AnalysisException(errorMessage)
}
}
}

private def getColumnNamesByType(
props: Map[String, String],
colType: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -580,9 +580,7 @@ private[hive] class HiveClientImpl(
val it = oldTable.getParameters.entrySet.iterator
while (it.hasNext) {
val entry = it.next()
val isSchemaProp = entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) ||
entry.getKey == DATASOURCE_SCHEMA || entry.getKey == DATASOURCE_SCHEMA_NUMPARTS
if (isSchemaProp) {
if (CatalogTable.isLargeTableProp(DATASOURCE_SCHEMA, entry.getKey)) {
it.remove()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val e = intercept[AnalysisException] {
sharedState.externalCatalog.getTable("default", "t")
}.getMessage
assert(e.contains(s"Could not read schema from the hive metastore because it is corrupted"))
assert(e.contains("Cannot read table property 'spark.sql.sources.schema' as it's corrupted"))

withDebugMode {
val tableMeta = sharedState.externalCatalog.getTable("default", "t")
Expand All @@ -1355,7 +1355,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val newSession = sparkSession.newSession()
newSession.sql("CREATE TABLE abc(i int) USING json")
val tableMeta = newSession.sessionState.catalog.getTableMetadata(TableIdentifier("abc"))
assert(tableMeta.properties(DATASOURCE_SCHEMA_NUMPARTS).toInt == 1)
assert(tableMeta.properties.contains(DATASOURCE_SCHEMA))
assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json")
}
}
Expand Down