Skip to content

Commit

Permalink
Don't overwrite reader and writer versions during metadata change in …
Browse files Browse the repository at this point in the history
…Delta
  • Loading branch information
ebyhr committed Oct 13, 2022
1 parent cfaa3ec commit 72524e2
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ public class DeltaLakeMetadata
private static final int WRITER_VERSION = 2;
// The highest writer version Trino supports writing to
private static final int MAX_WRITER_VERSION = 3;
// This constant should be used only for a new table
private static final ProtocolEntry DEFAULT_PROTOCOL = new ProtocolEntry(READER_VERSION, WRITER_VERSION);
// Matches the dummy column Databricks stores in the metastore
private static final List<Column> DUMMY_DATA_COLUMNS = ImmutableList.of(
new Column("col", HiveType.toHiveType(new ArrayType(VarcharType.createUnboundedVarcharType())), Optional.empty()));
Expand Down Expand Up @@ -718,7 +720,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
session,
nodeVersion,
nodeId,
tableMetadata.getComment());
tableMetadata.getComment(),
DEFAULT_PROTOCOL);

setRollback(() -> deleteRecursivelyIfExists(new HdfsContext(session), hdfsEnvironment, deltaLogDirectory));
transactionLogWriter.flush();
Expand Down Expand Up @@ -984,7 +987,8 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
session,
nodeVersion,
nodeId,
handle.getComment());
handle.getComment(),
DEFAULT_PROTOCOL);
appendAddFileEntries(transactionLogWriter, dataFileInfos, handle.getPartitionedBy(), true);
transactionLogWriter.flush();
PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow());
Expand Down Expand Up @@ -1063,7 +1067,8 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
session,
nodeVersion,
nodeId,
comment);
comment,
getProtocolEntry(session, handle.getSchemaTableName()));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1110,7 +1115,8 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
session,
nodeVersion,
nodeId,
Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()));
Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()),
getProtocolEntry(session, deltaLakeTableHandle.getSchemaTableName()));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1168,7 +1174,8 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
session,
nodeVersion,
nodeId,
Optional.ofNullable(handle.getMetadataEntry().getDescription()));
Optional.ofNullable(handle.getMetadataEntry().getDescription()),
getProtocolEntry(session, handle.getSchemaTableName()));
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -1190,7 +1197,8 @@ private static void appendTableEntries(
ConnectorSession session,
String nodeVersion,
String nodeId,
Optional<String> comment)
Optional<String> comment,
ProtocolEntry protocolEntry)
{
long createdTime = System.currentTimeMillis();
transactionLogWriter.appendCommitInfoEntry(
Expand All @@ -1208,7 +1216,7 @@ private static void appendTableEntries(
ISOLATION_LEVEL,
true));

transactionLogWriter.appendProtocolEntry(new ProtocolEntry(READER_VERSION, WRITER_VERSION));
transactionLogWriter.appendProtocolEntry(protocolEntry);

transactionLogWriter.appendMetadataEntry(
new MetadataEntry(
Expand Down Expand Up @@ -1848,14 +1856,19 @@ private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)

private void checkSupportedWriterVersion(ConnectorSession session, SchemaTableName schemaTableName)
{
int requiredWriterVersion = metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session)).getMinWriterVersion();
int requiredWriterVersion = getProtocolEntry(session, schemaTableName).getMinWriterVersion();
if (requiredWriterVersion > MAX_WRITER_VERSION) {
throw new TrinoException(
NOT_SUPPORTED,
format("Table %s requires Delta Lake writer version %d which is not supported", schemaTableName, requiredWriterVersion));
}
}

private ProtocolEntry getProtocolEntry(ConnectorSession session, SchemaTableName schemaTableName)
{
return metastore.getProtocol(session, metastore.getSnapshot(schemaTableName, session));
}

private List<DeltaLakeColumnHandle> getUnmodifiedColumns(DeltaLakeTableHandle tableHandle, List<ColumnHandle> updatedColumns)
{
Set<DeltaLakeColumnHandle> updatedColumnHandles = updatedColumns.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,32 @@ public void testTrinoAlterTablePreservesTableMetadata()
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
public void testTrinoAlterTablePreservesReaderAndWriterVersions()
{
String tableName = "test_trino_alter_table_preserves_versions_" + randomTableSuffix();
String tableDirectory = "databricks-compatibility-test-" + tableName;

onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minReaderVersion'='1', 'delta.minWriterVersion'='1')",
tableName,
bucketName,
tableDirectory));
try {
onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'");
onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test table comment'");
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_column INT");

List<?> minReaderVersion = getOnlyElement(onDelta().executeQuery("SHOW TBLPROPERTIES " + tableName + "(delta.minReaderVersion)").rows());
assertEquals((String) minReaderVersion.get(1), "1");
List<?> minWriterVersion = getOnlyElement(onDelta().executeQuery("SHOW TBLPROPERTIES " + tableName + "(delta.minWriterVersion)").rows());
assertEquals((String) minWriterVersion.get(1), "1");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}
}

0 comments on commit 72524e2

Please sign in to comment.