Skip to content

Commit

Permalink
Add support for dropping column in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 19, 2023
1 parent 8cfa24c commit 698171a
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdfFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format;
Expand Down Expand Up @@ -275,6 +276,7 @@ public class DeltaLakeMetadata
public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
public static final String DROP_COLUMN_OPERATION = "DROP COLUMNS";
public static final String INSERT_OPERATION = "WRITE";
public static final String MERGE_OPERATION = "MERGE";
public static final String OPTIMIZE_OPERATION = "OPTIMIZE";
Expand Down Expand Up @@ -1225,6 +1227,76 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
}
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle;
String dropColumnName = deltaLakeColumn.getName();

ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) {
throw new TrinoException(NOT_SUPPORTED, "Cannot drop column with the column mapping: " + columnMappingMode);
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, table);
long commitVersion = table.getReadVersion() + 1;
List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties());
if (partitionColumns.contains(dropColumnName)) {
throw new TrinoException(NOT_SUPPORTED, "Cannot drop partition column: " + dropColumnName);
}

List<DeltaLakeColumnHandle> columns = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden() && !column.getName().equals(dropColumnName))
.map(column -> toColumnHandle(column, column.getName(), column.getType(), partitionColumns))
.collect(toImmutableList());
Map<String, String> columnComments = getColumnComments(table.getMetadataEntry()).entrySet().stream()
.filter(column -> !column.getKey().equals(dropColumnName))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, Boolean> columnsNullability = getColumnsNullability(table.getMetadataEntry()).entrySet().stream()
.filter(column -> !column.getKey().equals(dropColumnName))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, Map<String, Object>> columnMetadata = getColumnsMetadata(table.getMetadataEntry()).entrySet().stream()
.filter(column -> !column.getKey().equals(dropColumnName))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));

try {
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation());
appendTableEntries(
commitVersion,
transactionLogWriter,
table.getMetadataEntry().getId(),
columns,
partitionColumns,
columnComments,
columnsNullability,
columnMetadata,
table.getMetadataEntry().getConfiguration(),
DROP_COLUMN_OPERATION,
session,
nodeVersion,
nodeId,
Optional.ofNullable(table.getMetadataEntry().getDescription()),
getProtocolEntry(session, table.getSchemaTableName()));
transactionLogWriter.flush();

statisticsAccess.readExtendedStatistics(session, table.getLocation()).ifPresent(existingStatistics -> {
ExtendedStatistics statistics = new ExtendedStatistics(
existingStatistics.getModelVersion(),
existingStatistics.getAlreadyAnalyzedModifiedTimeMax(),
existingStatistics.getColumnStatistics().entrySet().stream()
.filter(stats -> !stats.getKey().equals(dropColumnName))
.collect(toImmutableMap(Entry::getKey, Entry::getValue)),
existingStatistics.getAnalyzedColumns()
.map(analyzedColumns -> analyzedColumns.stream().filter(column -> !column.equals(dropColumnName)).collect(toImmutableSet())));
statisticsAccess.updateExtendedStatistics(session, table.getLocation(), statistics);
});
}
catch (Exception e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to drop '%s' column from: %s.%s", dropColumnName, table.getSchemaName(), table.getTableName()), e);
}
}

private static void appendTableEntries(
long commitVersion,
TransactionLogWriter transactionLogWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public void testDropAndRecreateTable()
public void testDropColumnNotSupported()
{
registerTableFromResources("testdropcolumn", "io/trino/plugin/deltalake/testing/resources/databricks/nation", getQueryRunner());
assertQueryFails("ALTER TABLE testdropcolumn DROP COLUMN comment", ".*This connector does not support dropping columns.*");
assertQueryFails("ALTER TABLE testdropcolumn DROP COLUMN comment", "Cannot drop column with the column mapping: NONE");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_RENAME_SCHEMA:
return false;

case SUPPORTS_DROP_COLUMN:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_SET_COLUMN_TYPE:
return false;
Expand Down Expand Up @@ -363,6 +362,33 @@ public void testDropNonEmptySchemaWithTable()
assertUpdate("DROP SCHEMA " + schemaName);
}

@Override
public void testDropColumn()
{
// Override because the connector doesn't support dropping columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(super::testDropColumn)
.hasMessageContaining("Cannot drop column with the column mapping: NONE");
}

@Override
public void testAddAndDropColumnName(String columnName)
{
// Override because the connector doesn't support dropping columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(() -> super.testAddAndDropColumnName(columnName))
.hasMessageContaining("Cannot drop column with the column mapping: NONE");
}

@Override
public void testDropAndAddColumnWithSameName()
{
// Override because the connector doesn't support dropping columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(super::testDropAndAddColumnWithSameName)
.hasMessageContaining("Cannot drop column with the column mapping: NONE");
}

@Override
public void testCharVarcharComparison()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public List<SuiteTestRun> getTestRuns(EnvironmentConfig config)
return ImmutableList.of(
testOnEnvironment(EnvSinglenodeDeltaLakeDatabricks104.class)
.withGroups("configured_features", "delta-lake-databricks")
.withExcludedGroups("delta-lake-exclude-104")
.withExcludedTests(getExcludedTests())
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public final class TestGroups
public static final String DELTA_LAKE_DATABRICKS = "delta-lake-databricks";
public static final String DELTA_LAKE_EXCLUDE_73 = "delta-lake-exclude-73";
public static final String DELTA_LAKE_EXCLUDE_91 = "delta-lake-exclude-91";
public static final String DELTA_LAKE_EXCLUDE_104 = "delta-lake-exclude-104";
public static final String DELTA_LAKE_EXCLUDE_113 = "delta-lake-exclude-113";
public static final String HUDI = "hudi";
public static final String PARQUET = "parquet";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
Expand Down Expand Up @@ -352,8 +353,6 @@ public void testUnsupportedOperationsColumnMappingModeName(String mode)
.hasMessageContaining("Delta Lake writer version 5 which is not supported");
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN a_number TO renamed_column"))
.hasMessageContaining("This connector does not support renaming columns");
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN a_number"))
.hasMessageContaining("This connector does not support dropping columns");
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
Expand Down Expand Up @@ -391,6 +390,160 @@ public void testSpecialCharacterColumnNamesWithColumnMappingMode(String mode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTrinoDropColumnWithColumnMappingMode(String mode)
{
String tableName = "test_drop_column_" + randomNameSuffix();

onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
" (id INT, data INT, part STRING)" +
" USING delta " +
" PARTITIONED BY (part) " +
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");

try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')");

onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN data");
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN part"))
.hasMessageContaining("Cannot drop partition column: part");

assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, "part#1"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, "part#1"));

// TODO: Add a new column on Trino once the connector supports adding a column with the column mapping mode
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN data INTEGER"))
.hasMessageContaining("Table default.%s requires Delta Lake writer version 5 which is not supported".formatted(tableName));
onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN data INTEGER");
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, "part#1", null));
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, "part#1", null));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testSparkDropColumnWithColumnMappingMode(String mode)
{
String tableName = "test_spark_drop_column_" + randomNameSuffix();

onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
" (id INT, data INT, part STRING)" +
" USING delta " +
" PARTITIONED BY (part) " +
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");

try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')");

onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN data");
assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN part"))
.hasMessageContaining("Dropping partition columns (part) is not allowed");

assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, "part#1"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, "part#1"));

// Verify adding a new column with the same name doesn't allow accessing the old data
onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN data INTEGER");
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, "part#1", null));
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, "part#1", null));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTrinoExtendedStatisticsDropAndAddColumnWithColumnMappingMode(String mode)
{
String tableName = "test_drop_and_add_column_" + randomNameSuffix();

onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
" (a INT, b INT)" +
" USING delta " +
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");

try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 2)");
onTrino().executeQuery("ANALYZE delta.default." + tableName);
assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName))
.containsOnly(
row("a", null, 1.0, 0.0, null, "1", "1"),
row("b", null, 1.0, 0.0, null, "2", "2"),
row(null, null, null, null, 1.0, null, null));

onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN b");
assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName))
.containsOnly(
row("a", null, 1.0, 0.0, null, "1", "1"),
row(null, null, null, null, 1.0, null, null));

// TODO: Add a new column on Trino once the connector supports adding a column with the column mapping mode
onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN b INTEGER");

// Verify column statistics of dropped column isn't restored
onTrino().executeQuery("ANALYZE delta.default." + tableName);
assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName))
.containsOnly(
row("a", null, 1.0, 0.0, null, "1", "1"),
row("b", 0.0, 0.0, 1.0, null, null, null),
row(null, null, null, null, 1.0, null, null));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUnsupportedDropColumnWithColumnMappingModeNone()
{
String tableName = "test_unsupported_drop_column_" + randomNameSuffix();

onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
" (id INT, data INT, part STRING)" +
" USING delta " +
" PARTITIONED BY (part) " +
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
" TBLPROPERTIES ('delta.columnMapping.mode' = 'none')");

try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')");

assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN part"))
.hasMessageContaining("Cannot drop column with the column mapping: NONE");
assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN part"))
.hasMessageContaining("DROP COLUMN is not supported for your Delta table");

assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 10, "part#1"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 10, "part#1"));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@DataProvider
public Object[][] columnMappingDataProvider()
{
Expand Down

0 comments on commit 698171a

Please sign in to comment.