diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index b35750cfd9fb..f10868760c37 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -267,6 +267,8 @@ public class DeltaLakeMetadata private static final int WRITER_VERSION = 2; // The highest writer version Trino supports writing to private static final int MAX_WRITER_VERSION = 3; + // This constant should be used only for a new table + private static final ProtocolEntry DEFAULT_PROTOCOL = new ProtocolEntry(READER_VERSION, WRITER_VERSION); // Matches the dummy column Databricks stores in the metastore private static final List DUMMY_DATA_COLUMNS = ImmutableList.of( new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty())); @@ -718,7 +720,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe session, nodeVersion, nodeId, - tableMetadata.getComment()); + tableMetadata.getComment(), + DEFAULT_PROTOCOL); setRollback(() -> deleteRecursivelyIfExists(new HdfsContext(session), hdfsEnvironment, deltaLogDirectory)); transactionLogWriter.flush(); @@ -984,7 +987,8 @@ public Optional finishCreateTable( session, nodeVersion, nodeId, - handle.getComment()); + handle.getComment(), + DEFAULT_PROTOCOL); appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), true); transactionLogWriter.flush(); PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow()); @@ -1063,7 +1067,8 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table session, nodeVersion, nodeId, - comment); + comment, + getProtocolEntry(session, handle.getSchemaTableName())); transactionLogWriter.flush(); } catch (Exception e) { @@ -1110,7 +1115,8 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl session, nodeVersion, nodeId, - Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription())); + Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()), + getProtocolEntry(session, deltaLakeTableHandle.getSchemaTableName())); transactionLogWriter.flush(); } catch (Exception e) { @@ -1168,7 +1174,8 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle session, nodeVersion, nodeId, - Optional.ofNullable(handle.getMetadataEntry().getDescription())); + Optional.ofNullable(handle.getMetadataEntry().getDescription()), + getProtocolEntry(session, handle.getSchemaTableName())); transactionLogWriter.flush(); } catch (Exception e) { @@ -1190,7 +1197,8 @@ private static void appendTableEntries( ConnectorSession session, String nodeVersion, String nodeId, - Optional comment) + Optional comment, + ProtocolEntry protocolEntry) { long createdTime = System.currentTimeMillis(); transactionLogWriter.appendCommitInfoEntry( @@ -1208,7 +1216,7 @@ private static void appendTableEntries( ISOLATION_LEVEL, true)); - transactionLogWriter.appendProtocolEntry(new ProtocolEntry(READER_VERSION, WRITER_VERSION)); + transactionLogWriter.appendProtocolEntry(protocolEntry); transactionLogWriter.appendMetadataEntry( new MetadataEntry( @@ -1848,7 +1856,7 @@ private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry) private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableName schemaTableName) { - int requiredWriterVersion = metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)).getMinWriterVersion(); + int requiredWriterVersion = getProtocolEntry(session, schemaTableName).getMinWriterVersion(); if (requiredWriterVersion > MAX_WRITER_VERSION) { throw new TrinoException( NOT_SUPPORTED, @@ -1856,6 +1864,11 @@ private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableNa } } + private ProtocolEntry getProtocolEntry(ConnectorSession session, SchemaTableName schemaTableName) + { + return metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)); + } + private List getUnmodifiedColumns(DeltaLakeTableHandle tableHandle, List updatedColumns) { Set updatedColumnHandles = updatedColumns.stream() diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java index fbbe319d2d04..607da1e68e2b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java @@ -263,4 +263,32 @@ public void testTrinoAlterTablePreservesTableMetadata() onTrino().executeQuery("DROP TABLE delta.default." + tableName); } } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testTrinoAlterTablePreservesReaderAndWriterVersions() + { + String tableName = "test_trino_alter_table_preserves_versions_" + randomTableSuffix(); + String tableDirectory = "databricks-compatibility-test-" + tableName; + + onDelta().executeQuery(format("" + + "CREATE TABLE default.%s (col int) " + + "USING DELTA LOCATION 's3://%s/%s'" + + "TBLPROPERTIES ('delta.minReaderVersion'='1', 'delta.minWriterVersion'='1')", + tableName, + bucketName, + tableDirectory)); + try { + onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'"); + onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test table comment'"); + onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_column INT"); + + List minReaderVersion = getOnlyElement(onDelta().executeQuery("SHOW TBLPROPERTIES " + tableName + "(delta.minReaderVersion)").rows()); + assertEquals((String) minReaderVersion.get(1), "1"); + List minWriterVersion = getOnlyElement(onDelta().executeQuery("SHOW TBLPROPERTIES " + tableName + "(delta.minWriterVersion)").rows()); + assertEquals((String) minWriterVersion.get(1), "1"); + } + finally { + onTrino().executeQuery("DROP TABLE delta.default." + tableName); + } + } }