diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 6743b052fb3a1..31a07a9b0fe6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -471,6 +471,46 @@ object CatalogTable { val VIEW_REFERRED_TEMP_VIEW_NAMES = VIEW_PREFIX + "referredTempViewNames" val VIEW_REFERRED_TEMP_FUNCTION_NAMES = VIEW_PREFIX + "referredTempFunctionsNames" + + def splitLargeTableProp(key: String, value: String, addProp: (String, String) => Unit): Unit = { + val threshold = SQLConf.get.getConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD) + if (value.length <= threshold) { + addProp(key, value) + } else { + val parts = value.grouped(threshold).toSeq + addProp(s"$key.numParts", parts.length.toString) + parts.zipWithIndex.foreach { case (part, index) => + addProp(s"$key.part.$index", part) + } + } + } + + def readLargeTableProp(props: Map[String, String], key: String): Option[String] = { + props.get(key).orElse { + if (props.filterKeys(_.startsWith(key)).isEmpty) { + None + } else { + val numParts = props.get(s"$key.numParts") + val errorMessage = s"Cannot read table property '$key' as it's corrupted." + if (numParts.isEmpty) { + throw new AnalysisException(errorMessage) + } else { + val parts = (0 until numParts.get.toInt).map { index => + props.getOrElse(s"$key.part.$index", { + throw new AnalysisException( + s"$errorMessage Missing part $index, ${numParts.get} parts are expected.") + }) + } + Some(parts.mkString) + } + } + } + } + + def isLargeTableProp(originalKey: String, propKey: String): Boolean = { + propKey == originalKey || propKey == s"$originalKey.numParts" || + propKey.startsWith(s"$originalKey.part.") + } } /** @@ -545,7 +585,10 @@ case class CatalogColumnStat( min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) } max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) } histogram.foreach { h => - map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h)) + CatalogTable.splitLargeTableProp( + s"$colName.${CatalogColumnStat.KEY_HISTOGRAM}", + HistogramSerializer.serialize(h), + map.put) } map.toMap } @@ -649,7 +692,8 @@ object CatalogColumnStat extends Logging { nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)), avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong), maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong), - histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize), + histogram = CatalogTable.readLargeTableProp(map, s"$colName.$KEY_HISTOGRAM") + .map(HistogramSerializer.deserialize), version = map(s"${colName}.${KEY_VERSION}").toInt )) } catch { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fd6a30ac6a81c..94715e1dc7c83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -882,6 +882,16 @@ object SQLConf { .checkValues(HiveCaseSensitiveInferenceMode.values.map(_.toString)) .createWithDefault(HiveCaseSensitiveInferenceMode.NEVER_INFER.toString) + val HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD = + buildConf("spark.sql.hive.tablePropertyLengthThreshold") + .internal() + .doc("The maximum length allowed in a single cell when storing Spark-specific information " + + "in Hive's metastore as table properties. Currently it covers 2 things: the schema's " + + "JSON string, the histogram of column statistics.") + .version("3.2.0") + .intConf + .createWithDefault(4000) + val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly") .internal() .doc("When true, enable the metadata-only query optimization that use the table's metadata " + @@ -3075,7 +3085,9 @@ object SQLConf { RemovedConfig("spark.sql.optimizer.planChangeLog.rules", "3.1.0", "", s"Please use `${PLAN_CHANGE_LOG_RULES.key}` instead."), RemovedConfig("spark.sql.optimizer.planChangeLog.batches", "3.1.0", "", - s"Please use `${PLAN_CHANGE_LOG_BATCHES.key}` instead.") + s"Please use `${PLAN_CHANGE_LOG_BATCHES.key}` instead."), + RemovedConfig("spark.sql.sources.schemaStringLengthThreshold", "3.2.0", "4000", + s"Please use `${HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key}` instead.") ) Map(configs.map { cfg => cfg.key -> cfg } : _*) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 02cb6f29622f5..4cf953294f1b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -54,21 +54,6 @@ object StaticSQLConf { .transform(_.toLowerCase(Locale.ROOT)) .createWithDefault("global_temp") - // This is used to control when we will split a schema's JSON string to multiple pieces - // in order to fit the JSON string in metastore's table property (by default, the value has - // a length restriction of 4000 characters, so do not use a value larger than 4000 as the default - // value of this property). We will split the JSON string of a schema to its length exceeds the - // threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session, - // that's why this conf has to be a static SQL conf. - val SCHEMA_STRING_LENGTH_THRESHOLD = - buildStaticConf("spark.sql.sources.schemaStringLengthThreshold") - .internal() - .doc("The maximum length allowed in a single cell when " + - "storing additional schema information in Hive's metastore.") - .version("1.3.1") - .intConf - .createWithDefault(4000) - val FILESOURCE_TABLE_RELATION_CACHE_SIZE = buildStaticConf("spark.sql.filesourceTableRelationCacheSize") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala index 720d570ca8384..4052130720811 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import org.apache.spark.SparkFunSuite import org.apache.spark.internal.config import org.apache.spark.sql.internal.SQLConf.CHECKPOINT_LOCATION -import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD +import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE class RuntimeConfigSuite extends SparkFunSuite { @@ -62,7 +62,7 @@ class RuntimeConfigSuite extends SparkFunSuite { val conf = newConf() // SQL configs - assert(!conf.isModifiable(SCHEMA_STRING_LENGTH_THRESHOLD.key)) + assert(!conf.isModifiable(GLOBAL_TEMP_DATABASE.key)) assert(conf.isModifiable(CHECKPOINT_LOCATION.key)) // Core configs assert(!conf.isModifiable(config.CPUS_PER_TASK.key)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 3fc679f6b9fc7..4e93bcad0e99c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -174,6 +174,15 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("SPARK-33812: column stats round trip serialization with splitting histogram property") { + withSQLConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD.key -> "10") { + statsWithHgms.foreach { case (k, v) => + val roundtrip = CatalogColumnStat.fromMap("t", k, v.toMap(k)) + assert(roundtrip == Some(v)) + } + } + } + test("analyze column command - result verification") { // (data.head.productArity - 1) because the last column does not support stats collection. assert(stats.size == data.head.productArity - 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 1ea2d4fd0b32c..e699c972268a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -282,23 +282,23 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("static SQL conf comes from SparkConf") { - val previousValue = sparkContext.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) + val previousValue = sparkContext.conf.get(GLOBAL_TEMP_DATABASE) try { - sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, 2000) + sparkContext.conf.set(GLOBAL_TEMP_DATABASE, "a") val newSession = new SparkSession(sparkContext) - assert(newSession.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) == 2000) + assert(newSession.conf.get(GLOBAL_TEMP_DATABASE) == "a") checkAnswer( - newSession.sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}"), - Row(SCHEMA_STRING_LENGTH_THRESHOLD.key, "2000")) + newSession.sql(s"SET ${GLOBAL_TEMP_DATABASE.key}"), + Row(GLOBAL_TEMP_DATABASE.key, "a")) } finally { - sparkContext.conf.set(SCHEMA_STRING_LENGTH_THRESHOLD, previousValue) + sparkContext.conf.set(GLOBAL_TEMP_DATABASE, previousValue) } } test("cannot set/unset static SQL conf") { - val e1 = intercept[AnalysisException](sql(s"SET ${SCHEMA_STRING_LENGTH_THRESHOLD.key}=10")) + val e1 = intercept[AnalysisException](sql(s"SET ${GLOBAL_TEMP_DATABASE.key}=10")) assert(e1.message.contains("Cannot modify the value of a static config")) - val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key)) + val e2 = intercept[AnalysisException](spark.conf.unset(GLOBAL_TEMP_DATABASE.key)) assert(e2.message.contains("Cannot modify the value of a static config")) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 54c237f78cb9c..98a7ed5a4ea87 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -429,18 +429,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val properties = new mutable.HashMap[String, String] properties.put(CREATED_SPARK_VERSION, table.createVersion) - - // Serialized JSON schema string may be too long to be stored into a single metastore table - // property. In this case, we split the JSON string and store each part as a separate table - // property. - val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) - val schemaJsonString = schema.json - // Split the JSON string. - val parts = schemaJsonString.grouped(threshold).toSeq - properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) - parts.zipWithIndex.foreach { case (part, index) => - properties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part) - } + CatalogTable.splitLargeTableProp(DATASOURCE_SCHEMA, schema.json, properties.put) if (partitionColumns.nonEmpty) { properties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, partitionColumns.length.toString) @@ -744,8 +733,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case None if table.tableType == VIEW => // If this is a view created by Spark 2.2 or higher versions, we should restore its schema // from table properties. - if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { - table = table.copy(schema = getSchemaFromTableProperties(table)) + CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA).foreach { schemaJson => + table = table.copy(schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]) } // No provider in table properties, which means this is a Hive serde table. @@ -795,8 +784,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // If this is a Hive serde table created by Spark 2.1 or higher versions, we should restore its // schema from table properties. - if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) { - val schemaFromTableProps = getSchemaFromTableProperties(table) + val schemaJson = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) + if (schemaJson.isDefined) { + val schemaFromTableProps = DataType.fromJson(schemaJson.get).asInstanceOf[StructType] val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) @@ -836,7 +826,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat storageWithLocation.properties.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) - val schemaFromTableProps = getSchemaFromTableProperties(table) + val schemaFromTableProps = CatalogTable.readLargeTableProp(table.properties, DATASOURCE_SCHEMA) + .map(json => DataType.fromJson(json).asInstanceOf[StructType]).getOrElse(new StructType()) val partColumnNames = getPartitionColumnsFromTableProperties(table) val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames) @@ -1340,7 +1331,6 @@ object HiveExternalCatalog { val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider" val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema" val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "." - val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts" val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols" val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols" val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + "numBuckets" @@ -1373,40 +1363,6 @@ object HiveExternalCatalog { val EMPTY_DATA_SCHEMA = new StructType() .add("col", "array", nullable = true, comment = "from deserializer") - // A persisted data source table always store its schema in the catalog. - private def getSchemaFromTableProperties(metadata: CatalogTable): StructType = { - val errorMessage = "Could not read schema from the hive metastore because it is corrupted." - val props = metadata.properties - val schema = props.get(DATASOURCE_SCHEMA) - if (schema.isDefined) { - // Originally, we used `spark.sql.sources.schema` to store the schema of a data source table. - // After SPARK-6024, we removed this flag. - // Although we are not using `spark.sql.sources.schema` any more, we need to still support. - DataType.fromJson(schema.get).asInstanceOf[StructType] - } else if (props.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) { - // If there is no schema information in table properties, it means the schema of this table - // was empty when saving into metastore, which is possible in older version(prior to 2.1) of - // Spark. We should respect it. - new StructType() - } else { - val numSchemaParts = props.get(DATASOURCE_SCHEMA_NUMPARTS) - if (numSchemaParts.isDefined) { - val parts = (0 until numSchemaParts.get.toInt).map { index => - val part = metadata.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull - if (part == null) { - throw new AnalysisException(errorMessage + - s" (missing part $index of the schema, ${numSchemaParts.get} parts are expected).") - } - part - } - // Stick all parts back to a single schema string. - DataType.fromJson(parts.mkString).asInstanceOf[StructType] - } else { - throw new AnalysisException(errorMessage) - } - } - } - private def getColumnNamesByType( props: Map[String, String], colType: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 6a964a0ce3613..e779a80f7c323 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -57,7 +57,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} +import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_SCHEMA import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -580,9 +580,7 @@ private[hive] class HiveClientImpl( val it = oldTable.getParameters.entrySet.iterator while (it.hasNext) { val entry = it.next() - val isSchemaProp = entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) || - entry.getKey == DATASOURCE_SCHEMA || entry.getKey == DATASOURCE_SCHEMA_NUMPARTS - if (isSchemaProp) { + if (CatalogTable.isLargeTableProp(DATASOURCE_SCHEMA, entry.getKey)) { it.remove() } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0593dbe7f6653..6e14034d8d548 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -701,7 +701,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } test("SPARK-6024 wide schema support") { - assert(spark.sparkContext.conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) == 4000) + assert(conf.getConf(SQLConf.HIVE_TABLE_PROPERTY_LENGTH_THRESHOLD) == 4000) withTable("wide_schema") { withTempDir { tempDir => // We will need 80 splits for this schema if the threshold is 4000. @@ -1355,7 +1355,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val newSession = sparkSession.newSession() newSession.sql("CREATE TABLE abc(i int) USING json") val tableMeta = newSession.sessionState.catalog.getTableMetadata(TableIdentifier("abc")) - assert(tableMeta.properties(DATASOURCE_SCHEMA_NUMPARTS).toInt == 1) + assert(tableMeta.properties.contains(DATASOURCE_SCHEMA)) assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json") } }