Skip to content

Commit

Permalink
fixup! Support DML operations on Delta tables with name column mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Apr 17, 2023
1 parent a6a6bbe commit 76fb5eb
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE;
import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NONE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
Expand Down Expand Up @@ -287,9 +288,8 @@ public class DeltaLakeMetadata
public static final int DEFAULT_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports
private static final int MAX_READER_VERSION = 3;
private static final int MAX_WRITER_VERSION = 4;
private static final int MAX_WRITER_VERSION = 5;
private static final int CDF_SUPPORTED_WRITER_VERSION = 4;
private static final int MAX_DML_WRITER_VERSION = 5;

// Matches the dummy column Databricks stores in the metastore
private static final List<Column> DUMMY_DATA_COLUMNS = ImmutableList.of(
Expand Down Expand Up @@ -1064,6 +1064,11 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle.getSchemaTableName());
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/12638 Support setting a table comment for id and name column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Setting a table comment with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);

Expand Down Expand Up @@ -1104,6 +1109,11 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle deltaLakeColumnHandle = (DeltaLakeColumnHandle) column;
checkSupportedWriterVersion(session, deltaLakeTableHandle.getSchemaTableName());
ColumnMappingMode columnMappingMode = getColumnMappingMode(deltaLakeTableHandle.getMetadataEntry());
if (columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/12638 Support setting a column comment for id and name column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Setting a column comment with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, deltaLakeTableHandle);

Expand Down Expand Up @@ -1149,6 +1159,11 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle.getSchemaTableName());
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/12638 Support adding a column for id and name column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Adding a column with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}
if (changeDataFeedEnabled(handle.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) {
throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName()));
}
Expand Down Expand Up @@ -1271,9 +1286,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
format("Inserts are not enabled on the %1$s filesystem in order to avoid eventual data corruption which may be caused by concurrent data modifications on the table. " +
"Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY));
}
checkUnsupportedGeneratedColumns(table.getMetadataEntry());
checkUnsupportedDmlColumnMapping(table.getMetadataEntry());
checkSupportedDmlWriterVersion(session, table.getSchemaTableName());
checkUnsupportedWriter(session, table.getSchemaTableName(), table.getMetadataEntry());

List<DeltaLakeColumnHandle> inputColumns = columns.stream()
.map(handle -> (DeltaLakeColumnHandle) handle)
Expand Down Expand Up @@ -1445,9 +1458,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
checkUnsupportedDmlColumnMapping(handle.getMetadataEntry());
checkSupportedDmlWriterVersion(session, handle.getSchemaTableName());
checkUnsupportedWriter(session, handle.getSchemaTableName(), handle.getMetadataEntry());

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);

Expand Down Expand Up @@ -1664,6 +1675,11 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
"Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY));
}
checkSupportedWriterVersion(session, table.getSchemaTableName());
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
if (columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/12638 Support 'optimize' table procedure for id and name column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Executing 'optimize' procedure with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}

return new BeginTableExecuteResult<>(
executeHandle.withProcedureHandle(optimizeHandle.withCurrentVersion(table.getReadVersion())),
Expand Down Expand Up @@ -1754,29 +1770,22 @@ private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableH
}
}

private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)
{
Map<String, String> columnGeneratedExpressions = getGeneratedColumnExpressions(metadataEntry);
if (!columnGeneratedExpressions.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with generated columns is not supported");
}
}

private void checkUnsupportedDmlColumnMapping(MetadataEntry metadataEntry)
private void checkUnsupportedWriter(ConnectorSession session, SchemaTableName schemaTableName, MetadataEntry metadataEntry)
{
checkSupportedWriterVersion(session, schemaTableName);
checkUnsupportedGeneratedColumns(metadataEntry);
ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
if (!(columnMappingMode == ColumnMappingMode.NONE || columnMappingMode == ColumnMappingMode.NAME)) {
if (!(columnMappingMode == NONE || columnMappingMode == ColumnMappingMode.NAME)) {
throw new TrinoException(NOT_SUPPORTED, "Writing with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}
// TODO: Check writer-features
}

private void checkSupportedDmlWriterVersion(ConnectorSession session, SchemaTableName schemaTableName)
private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)
{
int requiredWriterVersion = getProtocolEntry(session, schemaTableName).getMinWriterVersion();
if (requiredWriterVersion > MAX_DML_WRITER_VERSION) {
throw new TrinoException(
NOT_SUPPORTED,
format("Table %s requires Delta Lake writer version %d which is not supported", schemaTableName, requiredWriterVersion));
Map<String, String> columnGeneratedExpressions = getGeneratedColumnExpressions(metadataEntry);
if (!columnGeneratedExpressions.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with generated columns is not supported");
}
}

Expand Down Expand Up @@ -2399,7 +2408,12 @@ private void updateTableStatistics(
Map<String, DeltaLakeColumnStatistics> mergedColumnStatistics = newColumnStatistics.entrySet().stream()
.map(entry -> {
String columnName = entry.getKey();
return Map.entry(physicalColumnNameMapping.map(mapping -> mapping.get(columnName)).orElse(columnName), entry.getValue());
String physicalColumnName = columnName;
if (physicalColumnNameMapping.isPresent()) {
physicalColumnName = physicalColumnNameMapping.get().get(columnName);
requireNonNull(physicalColumnName, () -> "%s doesn't exist in %s".formatted(columnName, physicalColumnNameMapping));
}
return Map.entry(physicalColumnName, entry.getValue());
})
.collect(toImmutableMap(
Entry::getKey,
Expand Down Expand Up @@ -2431,7 +2445,13 @@ private void updateTableStatistics(

analyzedColumns.ifPresent(analyzeColumns -> {
Set<String> analyzePhysicalColumns = analyzeColumns.stream()
.map(columnName -> physicalColumnNameMapping.map(mapping -> mapping.get(columnName)).orElse(columnName))
.map(columnName -> {
if (physicalColumnNameMapping.isPresent()) {
String physicalColumnName = physicalColumnNameMapping.get().get(columnName);
return requireNonNull(physicalColumnName, () -> "%s doesn't exist in %s".formatted(columnName, physicalColumnNameMapping));
}
return columnName;
})
.collect(toImmutableSet());
if (!mergedColumnStatistics.keySet().equals(analyzePhysicalColumns)) {
// sanity validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void testAddColumnWithCommentOnTrino()
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testAddColumnUnsupportedWriterVersion()
{
Expand All @@ -81,14 +82,14 @@ public void testAddColumnUnsupportedWriterVersion()
onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='5')",
"TBLPROPERTIES ('delta.minWriterVersion'='6')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col int"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported");
.hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported");
}
finally {
dropDeltaTableWithRetry("default." + tableName);
Expand Down Expand Up @@ -196,14 +197,14 @@ public void testCommentOnTableUnsupportedWriterVersion()
onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='5')",
"TBLPROPERTIES ('delta.minWriterVersion'='6')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test comment'"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported");
.hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
Expand Down Expand Up @@ -242,20 +243,44 @@ public void testCommentOnColumnUnsupportedWriterVersion()
onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='5')",
"TBLPROPERTIES ('delta.minWriterVersion'='6')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported");
.hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testOptimizeUnsupportedWriterVersion()
{
String tableName = "test_dl_optimize_unsupported_writer_" + randomNameSuffix();
String tableDirectory = "databricks-compatibility-test-" + tableName;

onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='6')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " EXECUTE OPTIMIZE"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported");
}
finally {
dropDeltaTableWithRetry(tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTrinoAlterTablePreservesTableMetadata()
Expand Down
Loading

0 comments on commit 76fb5eb

Please sign in to comment.