diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 079ed6ebb9e..62527ec3ee1 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -407,6 +407,9 @@ case class CreateDeltaTableCommand( ignoreIfExists = false, validateLocation = false) } + if (conf.getConf(DeltaSQLConf.DELTA_SAVE_SCHEMA_GLUE_CATALOG_ENABLED)) { + spark.sessionState.catalog.alterTableDataSchema(cleaned.identifier, cleaned.schema) + } } /** Clean up the information we pass on to store in the catalog. */ @@ -421,8 +424,14 @@ case class CreateDeltaTableCommand( table.storage.copy(properties = Map.empty) } + val newSchema = if (conf.getConf(DeltaSQLConf.DELTA_SAVE_SCHEMA_GLUE_CATALOG_ENABLED)) { + table.schema.copy() + } else { + new StructType() + } + table.copy( - schema = new StructType(), + schema = newSchema, properties = Map.empty, partitionColumnNames = Nil, // Remove write specific options when updating the catalog diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 21ce4dd9a70..e8afcf638fa 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1151,6 +1151,18 @@ trait DeltaSQLConfBase { |Only change this for testing!""".stripMargin) .booleanConf .createWithDefault(true) + + val DELTA_SAVE_SCHEMA_GLUE_CATALOG_ENABLED = + buildConf("fixSchema.GlueCatalog") + .internal() + .doc( + """ + | This conf fix the schema in tableCatalog object and force an alter table + | schema command after upload the schema. As in spark project the schema is removed + | because delta is not a valid serDe configuration. + |""".stripMargin) + .booleanConf + .createWithDefault(false) } object DeltaSQLConf extends DeltaSQLConfBase diff --git a/core/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala b/core/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala index d7cc09fc828..d5bd40fa37a 100644 --- a/core/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala +++ b/core/src/test/scala/io/delta/tables/DeltaTableBuilderSuite.scala @@ -456,4 +456,41 @@ class DeltaTableBuilderSuite extends QueryTest with SharedSparkSession with Delt } } + test("Test schema external table delta glue catalog conf activated") { + withSQLConf(DeltaSQLConf.DELTA_SAVE_SCHEMA_GLUE_CATALOG_ENABLED.key -> "true") { + withTable("deltaTable") { + withTempDir { dir => + spark.range(10).toDF("key").write.format("delta") + .option("mergeSchema", true) + .mode("overwrite") + .save(dir.getAbsolutePath) + val existingSchema = spark.read.format("delta").load(dir.getAbsolutePath).schema + verifyTestTableMetadata(s"delta.`${dir.getAbsolutePath}`", + "key bigint", colNullables = Set("key")) + } + } + } + } + + test("Test schema delta glue catalog conf activated") { + withSQLConf(DeltaSQLConf.DELTA_SAVE_SCHEMA_GLUE_CATALOG_ENABLED.key -> "true") { + withTable("table2") { + withTempDir { dir => + spark.range(10).toDF("key").write.format("delta") + .mode("overwrite") + .saveAsTable("tableA") + val existingSchema = spark.read.format("delta") + .table("tableA").schema + io.delta.tables.DeltaTable.create() + .tableName("tableB") + .location(dir.getAbsolutePath) + .addColumns(existingSchema) + .addColumn("value", "string", false) + .execute() + verifyTestTableMetadata(s"delta.`${dir.getAbsolutePath}`", + "key bigint, value string", colNullables = Set("key")) + } + } + } + } }