Skip to content

Commit

Permalink
fixup! Support DML operations on Delta tables with name column mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Apr 13, 2023
1 parent 65d7d5f commit 5f8a0d1
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@
import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE;
import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NONE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
Expand Down Expand Up @@ -285,9 +286,8 @@ public class DeltaLakeMetadata
public static final int DEFAULT_READER_VERSION = 1;
public static final int DEFAULT_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports writing to
public static final int MAX_WRITER_VERSION = 4;
public static final int MAX_WRITER_VERSION = 5;
private static final int CDF_SUPPORTED_WRITER_VERSION = 4;
private static final int MAX_DML_WRITER_VERSION = 5;

// Matches the dummy column Databricks stores in the metastore
private static final List<Column> DUMMY_DATA_COLUMNS = ImmutableList.of(
Expand Down Expand Up @@ -1048,6 +1048,11 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle.getSchemaTableName());
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/12638 Support setting a table comment for id and name column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Setting a table comment with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);

Expand Down Expand Up @@ -1088,6 +1093,11 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle deltaLakeColumnHandle = (DeltaLakeColumnHandle) column;
checkSupportedWriterVersion(session, deltaLakeTableHandle.getSchemaTableName());
ColumnMappingMode columnMappingMode = getColumnMappingMode(deltaLakeTableHandle.getMetadataEntry());
if (columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/12638 Support setting a column comment for id and name column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Setting a column comment with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, deltaLakeTableHandle);

Expand Down Expand Up @@ -1133,6 +1143,11 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle.getSchemaTableName());
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/12638 Support adding a column for id and name column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Adding a column with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}
if (changeDataFeedEnabled(handle.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) {
throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName()));
}
Expand Down Expand Up @@ -1255,9 +1270,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
format("Inserts are not enabled on the %1$s filesystem in order to avoid eventual data corruption which may be caused by concurrent data modifications on the table. " +
"Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY));
}
checkUnsupportedGeneratedColumns(table.getMetadataEntry());
checkUnsupportedDmlColumnMapping(table.getMetadataEntry());
checkSupportedDmlWriterVersion(session, table.getSchemaTableName());
checkUnsupportedWriter(session, table.getSchemaTableName(), table.getMetadataEntry());

List<DeltaLakeColumnHandle> inputColumns = columns.stream()
.map(handle -> (DeltaLakeColumnHandle) handle)
Expand Down Expand Up @@ -1429,9 +1442,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
checkUnsupportedDmlColumnMapping(handle.getMetadataEntry());
checkSupportedDmlWriterVersion(session, handle.getSchemaTableName());
checkUnsupportedWriter(session, handle.getSchemaTableName(), handle.getMetadataEntry());

ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle);

Expand Down Expand Up @@ -1648,6 +1659,11 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
"Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY));
}
checkSupportedWriterVersion(session, table.getSchemaTableName());
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
if (columnMappingMode != NONE) {
// TODO https://github.com/trinodb/trino/issues/12638 Support 'optimize' table procedure for id and name column mapping mode
throw new TrinoException(NOT_SUPPORTED, "Executing 'optimize' procedure with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}

return new BeginTableExecuteResult<>(
executeHandle.withProcedureHandle(optimizeHandle.withCurrentVersion(table.getReadVersion())),
Expand Down Expand Up @@ -1738,29 +1754,22 @@ private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableH
}
}

private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)
{
Map<String, String> columnGeneratedExpressions = getGeneratedColumnExpressions(metadataEntry);
if (!columnGeneratedExpressions.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with generated columns is not supported");
}
}

private void checkUnsupportedDmlColumnMapping(MetadataEntry metadataEntry)
private void checkUnsupportedWriter(ConnectorSession session, SchemaTableName schemaTableName, MetadataEntry metadataEntry)
{
checkSupportedWriterVersion(session, schemaTableName);
checkUnsupportedGeneratedColumns(metadataEntry);
ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
if (!(columnMappingMode == ColumnMappingMode.NONE || columnMappingMode == ColumnMappingMode.NAME)) {
if (!(columnMappingMode == NONE || columnMappingMode == ColumnMappingMode.NAME)) {
throw new TrinoException(NOT_SUPPORTED, "Writing with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH)));
}
// TODO: Check writer-features
}

private void checkSupportedDmlWriterVersion(ConnectorSession session, SchemaTableName schemaTableName)
private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)
{
int requiredWriterVersion = getProtocolEntry(session, schemaTableName).getMinWriterVersion();
if (requiredWriterVersion > MAX_DML_WRITER_VERSION) {
throw new TrinoException(
NOT_SUPPORTED,
format("Table %s requires Delta Lake writer version %d which is not supported", schemaTableName, requiredWriterVersion));
Map<String, String> columnGeneratedExpressions = getGeneratedColumnExpressions(metadataEntry);
if (!columnGeneratedExpressions.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with generated columns is not supported");
}
}

Expand Down Expand Up @@ -2383,7 +2392,12 @@ private void updateTableStatistics(
Map<String, DeltaLakeColumnStatistics> mergedColumnStatistics = newColumnStatistics.entrySet().stream()
.map(entry -> {
String columnName = entry.getKey();
return Map.entry(physicalColumnNameMapping.map(mapping -> mapping.get(columnName)).orElse(columnName), entry.getValue());
String physicalColumnName = columnName;
if (physicalColumnNameMapping.isPresent()) {
physicalColumnName = physicalColumnNameMapping.get().get(columnName);
requireNonNull(physicalColumnName, () -> "%s doesn't exist in %s".formatted(columnName, physicalColumnNameMapping));
}
return Map.entry(physicalColumnName, entry.getValue());
})
.collect(toImmutableMap(
Entry::getKey,
Expand Down Expand Up @@ -2415,7 +2429,13 @@ private void updateTableStatistics(

analyzedColumns.ifPresent(analyzeColumns -> {
Set<String> analyzePhysicalColumns = analyzeColumns.stream()
.map(columnName -> physicalColumnNameMapping.map(mapping -> mapping.get(columnName)).orElse(columnName))
.map(columnName -> {
if (physicalColumnNameMapping.isPresent()) {
String physicalColumnName = physicalColumnNameMapping.get().get(columnName);
return requireNonNull(physicalColumnName, () -> "%s doesn't exist in %s".formatted(columnName, physicalColumnNameMapping));
}
return columnName;
})
.collect(toImmutableSet());
if (!mergedColumnStatistics.keySet().equals(analyzePhysicalColumns)) {
// sanity validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ public void testAddColumnUnsupportedWriterVersion()
onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='5')",
"TBLPROPERTIES ('delta.minWriterVersion'='6')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col int"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported");
.hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported");
}
finally {
dropDeltaTableWithRetry("default." + tableName);
Expand Down Expand Up @@ -195,14 +195,14 @@ public void testCommentOnTableUnsupportedWriterVersion()
onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='5')",
"TBLPROPERTIES ('delta.minWriterVersion'='6')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test comment'"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported");
.hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
Expand Down Expand Up @@ -241,20 +241,44 @@ public void testCommentOnColumnUnsupportedWriterVersion()
onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='5')",
"TBLPROPERTIES ('delta.minWriterVersion'='6')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported");
.hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported");
}
finally {
onTrino().executeQuery("DROP TABLE delta.default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testOptimizeUnsupportedWriterVersion()
{
String tableName = "test_dl_optimize_unsupported_writer_" + randomNameSuffix();
String tableDirectory = "databricks-compatibility-test-" + tableName;

onDelta().executeQuery(format("" +
"CREATE TABLE default.%s (col int) " +
"USING DELTA LOCATION 's3://%s/%s'" +
"TBLPROPERTIES ('delta.minWriterVersion'='6')",
tableName,
bucketName,
tableDirectory));

try {
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " EXECUTE OPTIMIZE"))
.hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported");
}
finally {
dropDeltaTableWithRetry(tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTrinoAlterTablePreservesTableMetadata()
Expand Down
Loading

0 comments on commit 5f8a0d1

Please sign in to comment.