Skip to content

Commit

Permalink
Add support for dropping column in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed May 30, 2023
1 parent bad3833 commit a2d17db
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.common.collect.Sets.difference;
import static com.google.common.primitives.Ints.max;
Expand Down Expand Up @@ -293,6 +294,7 @@ public class DeltaLakeMetadata
public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
public static final String DROP_COLUMN_OPERATION = "DROP COLUMNS";
public static final String INSERT_OPERATION = "WRITE";
public static final String MERGE_OPERATION = "MERGE";
public static final String UPDATE_OPERATION = "UPDATE"; // used by old Trino versions and Spark
Expand Down Expand Up @@ -1322,6 +1324,78 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
}
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle;
String dropColumnName = deltaLakeColumn.getBaseColumnName();
MetadataEntry metadataEntry = table.getMetadataEntry();

checkSupportedWriterVersion(session, table);
ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) {
throw new TrinoException(NOT_SUPPORTED, "Cannot drop column with the column mapping: " + columnMappingMode);
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, table);
long commitVersion = table.getReadVersion() + 1;
List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties());
if (partitionColumns.contains(dropColumnName)) {
throw new TrinoException(NOT_SUPPORTED, "Cannot drop partition column: " + dropColumnName);
}

List<DeltaLakeColumnMetadata> columns = extractSchema(metadataEntry, typeManager);
ImmutableList.Builder<String> columnNames = ImmutableList.builderWithExpectedSize(columns.size() - 1);
ImmutableMap.Builder<String, String> physicalColumnNameMappingBuilder = ImmutableMap.builderWithExpectedSize(columns.size());
for (DeltaLakeColumnMetadata column : columns) {
DeltaLakeColumnHandle handle = toColumnHandle(column.getColumnMetadata(), column.getName(), column.getType(), partitionColumns);
if (!column.getName().equals(dropColumnName)) {
columnNames.add(handle.getColumnName());
}
physicalColumnNameMappingBuilder.put(column.getName(), column.getPhysicalName());
}
Map<String, String> physicalColumnNameMapping = physicalColumnNameMappingBuilder.buildOrThrow();

Map<String, Object> columnTypes = filterKeys(getColumnTypes(metadataEntry), name -> !name.equals(dropColumnName));
Map<String, String> columnComments = filterKeys(getColumnComments(metadataEntry), name -> !name.equals(dropColumnName));
Map<String, Boolean> columnsNullability = filterKeys(getColumnsNullability(metadataEntry), name -> !name.equals(dropColumnName));
Map<String, Map<String, Object>> columnMetadata = filterKeys(getColumnsMetadata(metadataEntry), name -> !name.equals(dropColumnName));
try {
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation());
appendTableEntries(
commitVersion,
transactionLogWriter,
metadataEntry.getId(),
columnNames.build(),
partitionColumns,
columnTypes,
columnComments,
columnsNullability,
columnMetadata,
metadataEntry.getConfiguration(),
DROP_COLUMN_OPERATION,
session,
Optional.ofNullable(metadataEntry.getDescription()),
getProtocolEntry(session, table));
transactionLogWriter.flush();

statisticsAccess.readExtendedStatistics(session, table.getLocation()).ifPresent(existingStatistics -> {
ExtendedStatistics statistics = new ExtendedStatistics(
existingStatistics.getAlreadyAnalyzedModifiedTimeMax(),
existingStatistics.getColumnStatistics().entrySet().stream()
.filter(stats -> !stats.getKey().equals(toPhysicalColumnName(dropColumnName, Optional.of(physicalColumnNameMapping))))
.collect(toImmutableMap(Entry::getKey, Entry::getValue)),
existingStatistics.getAnalyzedColumns()
.map(analyzedColumns -> analyzedColumns.stream().filter(column -> !column.equals(dropColumnName)).collect(toImmutableSet())));
statisticsAccess.updateExtendedStatistics(session, table.getLocation(), statistics);
});
}
catch (Exception e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to drop '%s' column from: %s.%s", dropColumnName, table.getSchemaName(), table.getTableName()), e);
}
}

private void appendTableEntries(
long commitVersion,
TransactionLogWriter transactionLogWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public void testDropAndRecreateTable()
public void testDropColumnNotSupported()
{
registerTableFromResources("testdropcolumn", "io/trino/plugin/deltalake/testing/resources/databricks/nation", getQueryRunner());
assertQueryFails("ALTER TABLE testdropcolumn DROP COLUMN comment", ".*This connector does not support dropping columns.*");
assertQueryFails("ALTER TABLE testdropcolumn DROP COLUMN comment", "Cannot drop column with the column mapping: NONE");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,54 @@ public void testAddNestedColumnWithColumnMappingMode(String columnMappingMode)
.containsPattern("(delta\\.columnMapping\\.physicalName.*?){11}");
}

/**
* @see deltalake.column_mapping_mode_id
* @see deltalake.column_mapping_mode_name
*/
@Test(dataProvider = "columnMappingModeDataProvider")
public void testDropColumnWithColumnMappingMode(String columnMappingMode)
throws Exception
{
// The table contains 'x' column with column mapping mode
String tableName = "test_add_column_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/column_mapping_mode_" + columnMappingMode).toURI()).toPath(), tableLocation);

assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
assertThat(query("DESCRIBE " + tableName)).projected("Column", "Type").skippingTypesCheck().matches("VALUES ('x', 'integer')");
assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);

assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN second_col row(a array(integer), b map(integer, integer), c row(field integer))");
MetadataEntry metadata = loadMetadataEntry(1, tableLocation);
Assertions.assertThat(metadata.getConfiguration().get("delta.columnMapping.maxColumnId"))
.isEqualTo("6"); // +5 comes from second_col + second_col.a + second_col.b + second_col.c + second_col.c.field
Assertions.assertThat(metadata.getSchemaString())
.containsPattern("(delta\\.columnMapping\\.id.*?){6}")
.containsPattern("(delta\\.columnMapping\\.physicalName.*?){6}");

JsonNode schema = OBJECT_MAPPER.readTree(metadata.getSchemaString());
List<JsonNode> fields = ImmutableList.copyOf(schema.get("fields").elements());
Assertions.assertThat(fields).hasSize(2);
JsonNode nestedColumn = fields.get(1);
List<JsonNode> rowFields = ImmutableList.copyOf(nestedColumn.get("type").get("fields").elements());
Assertions.assertThat(rowFields).hasSize(3);

// Drop 'x' column and verify that nested metadata and table configuration are preserved
assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN x");

MetadataEntry droppedMetadata = loadMetadataEntry(2, tableLocation);
JsonNode droppedSchema = OBJECT_MAPPER.readTree(droppedMetadata.getSchemaString());
List<JsonNode> droppedFields = ImmutableList.copyOf(droppedSchema.get("fields").elements());
Assertions.assertThat(droppedFields).hasSize(1);
Assertions.assertThat(droppedFields.get(0)).isEqualTo(nestedColumn);

Assertions.assertThat(droppedMetadata.getConfiguration())
.isEqualTo(metadata.getConfiguration());
Assertions.assertThat(droppedMetadata.getSchemaString())
.containsPattern("(delta\\.columnMapping\\.id.*?){5}")
.containsPattern("(delta\\.columnMapping\\.physicalName.*?){5}");
}

@DataProvider
public Object[][] columnMappingModeDataProvider()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_RENAME_SCHEMA:
return false;

case SUPPORTS_DROP_COLUMN:
case SUPPORTS_DROP_FIELD:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_SET_COLUMN_TYPE:
return false;
Expand Down Expand Up @@ -339,6 +339,33 @@ public void testDropNonEmptySchemaWithTable()
assertUpdate("DROP SCHEMA " + schemaName);
}

@Override
public void testDropColumn()
{
// Override because the connector doesn't support dropping columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(super::testDropColumn)
.hasMessageContaining("Cannot drop column with the column mapping: NONE");
}

@Override
public void testAddAndDropColumnName(String columnName)
{
// Override because the connector doesn't support dropping columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(() -> super.testAddAndDropColumnName(columnName))
.hasMessageContaining("Cannot drop column with the column mapping: NONE");
}

@Override
public void testDropAndAddColumnWithSameName()
{
// Override because the connector doesn't support dropping columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(super::testDropAndAddColumnWithSameName)
.hasMessageContaining("Cannot drop column with the column mapping: NONE");
}

@Override
public void testCharVarcharComparison()
{
Expand Down
Loading

0 comments on commit a2d17db

Please sign in to comment.