From de234eec8febce99ede5ef9ae2301e36739a0f85 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 19 Dec 2020 14:35:28 +0900 Subject: [PATCH] [SPARK-33812][SQL] Split the histogram column stats when saving to hive metastore as table property ### What changes were proposed in this pull request? Hive metastore has a limitation for the table property length. To work around it, Spark split the schema json string into several parts when saving to hive metastore as table properties. We need to do the same for histogram column stats as it can go very big. This PR refactors the table property splitting code, so that we can share it between the schema json string and histogram column stats. ### Why are the changes needed? To be able to analyze table when histogram data is big. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing test and new tests Closes #30809 from cloud-fan/cbo. Authored-by: Wenchen Fan Signed-off-by: HyukjinKwon --- .../sql/catalyst/catalog/interface.scala | 54 +++++++++++++++- .../apache/spark/sql/internal/SQLConf.scala | 14 ++++- .../apache/spark/sql/RuntimeConfigSuite.scala | 4 +- .../spark/sql/StatisticsCollectionSuite.scala | 9 +++ .../spark/sql/internal/SQLConfSuite.scala | 16 ++--- .../spark/sql/hive/HiveExternalCatalog.scala | 61 +++---------------- .../sql/hive/client/HiveClientImpl.scala | 6 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 4 +- 8 files changed, 97 insertions(+), 71 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 9876ee375cfa6..5cb237688f875 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -472,6 +472,51 @@ object CatalogTable { val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames" val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames" + + def splitLargeTableProp( + key: String, + value: String, + addProp: (String, String) => Unit, + defaultThreshold: Int): Unit = { + val threshold = SQLConf.get.getConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD) + .getOrElse(defaultThreshold) + if (value.length <= threshold) { + addProp(key, value) + } else { + val parts = value.grouped(threshold).toSeq + addProp(s"$key.numParts", parts.length.toString) + parts.zipWithIndex.foreach { case (part, index) => + addProp(s"$key.part.$index", part) + } + } + } + + def readLargeTableProp(props: Map[String, String], key: String): Option[String] = { + props.get(key).orElse { + if (props.filterKeys(_.startsWith(key)).isEmpty) { + None + } else { + val numParts = props.get(s"$key.numParts") + val errorMessage = s"Cannot read table property '$key' as it's corrupted." + if (numParts.isEmpty) { + throw new AnalysisException(errorMessage) + } else { + val parts = (0 until numParts.get.toInt).map { index => + props.getOrElse(s"$key.part.$index", { + throw new AnalysisException( + s"$errorMessage Missing part $index, ${numParts.get} parts are expected.") + }) + } + Some(parts.mkString) + } + } + } + } + + def isLargeTableProp(originalKey: String, propKey: String): Boolean = { + propKey == originalKey || propKey == s"$originalKey.numParts" || + propKey.startsWith(s"$originalKey.part.") + } } /** @@ -546,7 +591,11 @@ case class CatalogColumnStat( min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) } max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) } histogram.foreach { h => - map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h)) + CatalogTable.splitLargeTableProp( + s"$colName.${CatalogColumnStat.KEY_HISTOGRAM}", + HistogramSerializer.serialize(h), + map.put, + 4000) } map.toMap } @@ -650,7 +699,8 @@ object CatalogColumnStat extends Logging { nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)), avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong), maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong), - histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize), + histogram = CatalogTable.readLargeTableProp(map, s"$colName.$KEY_HISTOGRAM") + .map(HistogramSerializer.deserialize), version = map(s"${colName}.${KEY_VERSION}").toInt )) } catch { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3f0fd70a6eae6..b5547319f0ab3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -905,6 +905,16 @@ object SQLConf { .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString)) .createWithDefault(HiveCaseSensitiveInferenceMode.NEVER_INFER.toString) + val HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD = + buildConf("spark.sql.hive.tablePropertyLengthThreshold") + .internal() + .doc("The maximum length allowed in a single cell when storing Spark-specific information " + + "in Hive's metastore as table properties. Currently it covers 2 things: the schema's " + + "JSON string, the histogram of column statistics.") + .version("3.2.0") + .intConf + .createOptional + val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly") .internal() .doc("When true, enable the metadata-only query optimization that use the table's metadata " + @@ -3052,7 +3062,9 @@ object SQLConf { "Avoid to depend on this optimization to prevent a potential correctness issue. " + "If you must use, use 'SparkSessionExtensions' instead to inject it as a custom rule."), DeprecatedConfig(CONVERT_CTAS.key, "3.1", - s"Set '${LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key}' to false instead.") + s"Set '${LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key}' to false instead."), + DeprecatedConfig("spark.sql.sources.schemaStringLengthThreshold", "3.2", + s"Use '${HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key}' instead.") ) Map(configs.map { cfg => cfg.key -> cfg } : _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala index 720d570ca8384..4052130720811 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite import org.apache.spark.internal.config import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION -import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD +import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE class RuntimeConfigSuite extends SparkFunSuite { @@ -62,7 +62,7 @@ class RuntimeConfigSuite extends SparkFunSuite { val conf = newConf() // SQL configs - assert(!conf.isModifiable(SCHEMA_STRING_LENGTH_THRESHOLD.key)) + assert(!conf.isModifiable(GLOBAL_TEMP_DATABASE.key)) assert(conf.isModifiable(CHECKPOINT_LOCATION.key)) // Core configs assert(!conf.isModifiable(config.CPUS_PER_TASK.key)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 3b53a5324445b..cc3d8375db32f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -174,6 +174,15 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("SPARK-33812: column stats round trip serialization with splitting histogram property") { + withSQLConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key -> "10") { + statsWithHgms.foreach { case (k, v) => + val roundtrip = CatalogColumnStat.fromMap("t", k, v.toMap(k)) + assert(roundtrip == Some(v)) + } + } + } + test("analyze column command - result verification") { // (data.head.productArity - 1) because the last column does not support stats collection. assert(stats.size == data.head.productArity - 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 1ea2d4fd0b32c..e699c972268a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -282,23 +282,23 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("static SQL conf comes from SparkConf") { - val previousValue = sparkContext.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) + val previousValue = sparkContext.conf.get(GLOBAL_TEMP_DATABASE) try { - sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, 2000) + sparkContext.conf.set(GLOBAL_TEMP_DATABASE, "a") val newSession = new SparkSession(sparkContext) - assert(newSession.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) == 2000) + assert(newSession.conf.get(GLOBAL_TEMP_DATABASE) == "a") checkAnswer( - newSession.sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}"), - Row(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000")) + newSession.sql(s"SET ${GLOBAL_TEMP_DATABASE.key}"), + Row(GLOBAL_TEMP_DATABASE.key, "a")) } finally { - sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, previousValue) + sparkContext.conf.set(GLOBAL_TEMP_DATABASE, previousValue) } } test("cannot set/unset static SQL conf") { - val e1 = intercept[AnalysisException](sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10")) + val e1 = intercept[AnalysisException](sql(s"SET ${GLOBAL_TEMP_DATABASE.key}=10")) assert(e1.message.contains("Cannot modify the value of a static config")) - val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key)) + val e2 = intercept[AnalysisException](spark.conf.unset(GLOBAL_TEMP_DATABASE.key)) assert(e2.message.contains("Cannot modify the value of a static config")) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 54c237f78cb9c..b4aa073893df8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -429,18 +429,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val properties = new mutable.HashMap[String, String] properties.put(CREATED_SPARK_VERSION, table.createVersion) - - // Serialized JSON schema string may be too long to be stored into a single metastore table - // property. In this case, we split the JSON string and store each part as a separate table - // property. - val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) - val schemaJsonString = schema.json - // Split the JSON string. - val parts = schemaJsonString.grouped(threshold).toSeq - properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) - parts.zipWithIndex.foreach { case (part, index) => - properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) - } + CatalogTable.splitLargeTableProp( + DATASOURCE_SCHEMA, schema.json, properties.put, conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)) if (partitionColumns.nonEmpty) { properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) @@ -744,8 +734,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case None if table.tableType == VIEW => // If this is a view created by Spark 2.2 or higher versions, we should restore its schema // from table properties. - if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { - table = table.copy(schema = getSchemaFromTableProperties(table)) + CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson => + table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]) } // No provider in table properties, which means this is a Hive serde table. @@ -795,8 +785,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its // schema from table properties. - if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { - val schemaFromTableProps = getSchemaFromTableProperties(table) + val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) + if (schemaJson.isDefined) { + val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType] val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) @@ -836,7 +827,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) - val schemaFromTableProps = getSchemaFromTableProperties(table) + val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) + .map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType()) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) @@ -1340,7 +1332,6 @@ object HiveExternalCatalog { val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." - val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" @@ -1373,40 +1364,6 @@ object HiveExternalCatalog { val EMPTY_DATA_SCHEMA = new StructType() .add("col", "array", nullable = true, comment = "from deserializer") - // A persisted data source table always store its schema in the catalog. - private def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { - val errorMessage = "Could not read schema from the hive metastore because it is corrupted." - val props = metadata.properties - val schema = props.get(DATASOURCE_SCHEMA) - if (schema.isDefined) { - // Originally, we used `spark.sql.sources.schema` to store the schema of a data source table. - // After SPARK-6024, we removed this flag. - // Although we are not using `spark.sql.sources.schema` any more, we need to still support. - DataType.fromJson(schema.get).asInstanceOf[StructType] - } else if (props.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) { - // If there is no schema information in table properties, it means the schema of this table - // was empty when saving into metastore, which is possible in older version(prior to 2.1) of - // Spark. We should respect it. - new StructType() - } else { - val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS) - if (numSchemaParts.isDefined) { - val parts = (0 until numSchemaParts.get.toInt).map { index => - val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull - if (part == null) { - throw new AnalysisException(errorMessage + - s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).") - } - part - } - // Stick all parts back to a single schema string. - DataType.fromJson(parts.mkString).asInstanceOf[StructType] - } else { - throw new AnalysisException(errorMessage) - } - } - } - private def getColumnNamesByType( props: Map[String, String], colType: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 6a964a0ce3613..e779a80f7c323 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -57,7 +57,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} +import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -580,9 +580,7 @@ private[hive] class HiveClientImpl( val it = oldTable.getParameters.entrySet.iterator while (it.hasNext) { val entry = it.next() - val isSchemaProp = entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) || - entry.getKey == DATASOURCE_SCHEMA || entry.getKey == DATASOURCE_SCHEMA_NUMPARTS - if (isSchemaProp) { + if (CatalogTable.isLargeTableProp(DATASOURCE_SCHEMA, entry.getKey)) { it.remove() } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0593dbe7f6653..ecbb104070b70 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1338,7 +1338,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val e = intercept[AnalysisException] { sharedState.externalCatalog.getTable("default", "t") }.getMessage - assert(e.contains(s"Could not read schema from the hive metastore because it is corrupted")) + assert(e.contains("Cannot read table property 'spark.sql.sources.schema' as it's corrupted")) withDebugMode { val tableMeta = sharedState.externalCatalog.getTable("default", "t") @@ -1355,7 +1355,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val newSession = sparkSession.newSession() newSession.sql("CREATE TABLE abc(i int) USING json") val tableMeta = newSession.sessionState.catalog.getTableMetadata(TableIdentifier("abc")) - assert(tableMeta.properties(DATASOURCE_SCHEMA_NUMPARTS).toInt == 1) + assert(tableMeta.properties.contains(DATASOURCE_SCHEMA)) assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json") } }