Skip to content

Commit

Permalink
Add support for dropping column in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 6, 2023
1 parent a925020 commit d91e861
Show file tree
Hide file tree
Showing 7 changed files with 367 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnNotFoundException;
import io.trino.spi.connector.ConnectorAnalyzeMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
Expand Down Expand Up @@ -165,6 +166,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Maps.filterKeys;
import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.common.collect.Sets.difference;
import static com.google.common.primitives.Ints.max;
Expand Down Expand Up @@ -225,6 +227,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnTypes;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsNullability;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getExactColumnNames;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getGeneratedColumnExpressions;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getMaxColumnId;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly;
Expand Down Expand Up @@ -301,6 +304,7 @@ public class DeltaLakeMetadata
public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
public static final String DROP_COLUMN_OPERATION = "DROP COLUMNS";
public static final String INSERT_OPERATION = "WRITE";
public static final String MERGE_OPERATION = "MERGE";
public static final String UPDATE_OPERATION = "UPDATE"; // used by old Trino versions and Spark
Expand Down Expand Up @@ -1393,6 +1397,84 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
}
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle;
verify(deltaLakeColumn.isBaseColumn(), "Unexpected dereference: %s", deltaLakeColumn);
String dropColumnName = deltaLakeColumn.getBaseColumnName();
MetadataEntry metadataEntry = table.getMetadataEntry();

checkSupportedWriterVersion(session, table);
ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) {
throw new TrinoException(NOT_SUPPORTED, "Cannot drop column from table using column mapping mode " + columnMappingMode);
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, table);
long commitVersion = table.getReadVersion() + 1;
List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties());
if (partitionColumns.contains(dropColumnName)) {
throw new TrinoException(NOT_SUPPORTED, "Cannot drop partition column: " + dropColumnName);
}

// Use equalsIgnoreCase because the remote column name can contain uppercase characters
// Creating a table with ambiguous names (e.g. "a" and "A") is disallowed, so this should be safe
List<DeltaLakeColumnMetadata> columns = extractSchema(metadataEntry, typeManager);
List<String> columnNames = getExactColumnNames(metadataEntry).stream()
.filter(name -> !name.equalsIgnoreCase(dropColumnName))
.collect(toImmutableList());
if (columns.size() == columnNames.size()) {
throw new ColumnNotFoundException(table.schemaTableName(), dropColumnName);
}
Map<String, String> physicalColumnNameMapping = columns.stream()
.collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getPhysicalName));

Map<String, Object> columnTypes = filterKeys(getColumnTypes(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName));
Map<String, String> columnComments = filterKeys(getColumnComments(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName));
Map<String, Boolean> columnsNullability = filterKeys(getColumnsNullability(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName));
Map<String, Map<String, Object>> columnMetadata = filterKeys(getColumnsMetadata(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName));
try {
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation());
appendTableEntries(
commitVersion,
transactionLogWriter,
metadataEntry.getId(),
columnNames,
metadataEntry.getOriginalPartitionColumns(),
columnTypes,
columnComments,
columnsNullability,
columnMetadata,
metadataEntry.getConfiguration(),
DROP_COLUMN_OPERATION,
session,
Optional.ofNullable(metadataEntry.getDescription()),
getProtocolEntry(session, table));
transactionLogWriter.flush();
}
catch (Exception e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to drop '%s' column from: %s.%s", dropColumnName, table.getSchemaName(), table.getTableName()), e);
}

try {
statisticsAccess.readExtendedStatistics(session, table.getSchemaTableName(), table.getLocation()).ifPresent(existingStatistics -> {
ExtendedStatistics statistics = new ExtendedStatistics(
existingStatistics.getAlreadyAnalyzedModifiedTimeMax(),
existingStatistics.getColumnStatistics().entrySet().stream()
.filter(stats -> !stats.getKey().equalsIgnoreCase(toPhysicalColumnName(dropColumnName, Optional.of(physicalColumnNameMapping))))
.collect(toImmutableMap(Entry::getKey, Entry::getValue)),
existingStatistics.getAnalyzedColumns()
.map(analyzedColumns -> analyzedColumns.stream().filter(column -> !column.equalsIgnoreCase(dropColumnName)).collect(toImmutableSet())));
statisticsAccess.updateExtendedStatistics(session, table.getSchemaTableName(), table.getLocation(), statistics);
});
}
catch (Exception e) {
LOG.warn(e, "Failed to update extended statistics when dropping %s column from %s table", dropColumnName, table.schemaTableName());
}
}

private void appendTableEntries(
long commitVersion,
TransactionLogWriter transactionLogWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,21 @@ private static <T> Map<String, T> getColumnProperty(String json, Function<JsonNo
}
}

/**
* @return the case-sensitive column names
*/
public static List<String> getExactColumnNames(MetadataEntry metadataEntry)
{
try {
return stream(OBJECT_MAPPER.readTree(metadataEntry.getSchemaString()).get("fields").elements())
.map(field -> field.get("name").asText())
.collect(toImmutableList());
}
catch (JsonProcessingException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, getLocation(e), "Failed to parse serialized schema: " + metadataEntry.getSchemaString(), e);
}
}

public static Set<String> unsupportedReaderFeatures(Set<String> features)
{
return Sets.difference(features, SUPPORTED_READER_FEATURES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public void testDropAndRecreateTable()
public void testDropColumnNotSupported()
{
registerTableFromResources("testdropcolumn", "io/trino/plugin/deltalake/testing/resources/databricks/nation", getQueryRunner());
assertQueryFails("ALTER TABLE testdropcolumn DROP COLUMN comment", ".*This connector does not support dropping columns.*");
assertQueryFails("ALTER TABLE testdropcolumn DROP COLUMN comment", "Cannot drop column from table using column mapping mode NONE");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,54 @@ public void testOptimizeWithColumnMappingMode(String columnMappingMode)
}
}

/**
* @see deltalake.column_mapping_mode_id
* @see deltalake.column_mapping_mode_name
*/
@Test(dataProvider = "columnMappingModeDataProvider")
public void testDropColumnWithColumnMappingMode(String columnMappingMode)
throws Exception
{
// The table contains 'x' column with column mapping mode
String tableName = "test_add_column_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/column_mapping_mode_" + columnMappingMode).toURI()).toPath(), tableLocation);

assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));
assertThat(query("DESCRIBE " + tableName)).projected("Column", "Type").skippingTypesCheck().matches("VALUES ('x', 'integer')");
assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);

assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN second_col row(a array(integer), b map(integer, integer), c row(field integer))");
MetadataEntry metadata = loadMetadataEntry(1, tableLocation);
Assertions.assertThat(metadata.getConfiguration().get("delta.columnMapping.maxColumnId"))
.isEqualTo("6"); // +5 comes from second_col + second_col.a + second_col.b + second_col.c + second_col.c.field
Assertions.assertThat(metadata.getSchemaString())
.containsPattern("(delta\\.columnMapping\\.id.*?){6}")
.containsPattern("(delta\\.columnMapping\\.physicalName.*?){6}");

JsonNode schema = OBJECT_MAPPER.readTree(metadata.getSchemaString());
List<JsonNode> fields = ImmutableList.copyOf(schema.get("fields").elements());
Assertions.assertThat(fields).hasSize(2);
JsonNode nestedColumn = fields.get(1);
List<JsonNode> rowFields = ImmutableList.copyOf(nestedColumn.get("type").get("fields").elements());
Assertions.assertThat(rowFields).hasSize(3);

// Drop 'x' column and verify that nested metadata and table configuration are preserved
assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN x");

MetadataEntry droppedMetadata = loadMetadataEntry(2, tableLocation);
JsonNode droppedSchema = OBJECT_MAPPER.readTree(droppedMetadata.getSchemaString());
List<JsonNode> droppedFields = ImmutableList.copyOf(droppedSchema.get("fields").elements());
Assertions.assertThat(droppedFields).hasSize(1);
Assertions.assertThat(droppedFields.get(0)).isEqualTo(nestedColumn);

Assertions.assertThat(droppedMetadata.getConfiguration())
.isEqualTo(metadata.getConfiguration());
Assertions.assertThat(droppedMetadata.getSchemaString())
.containsPattern("(delta\\.columnMapping\\.id.*?){5}")
.containsPattern("(delta\\.columnMapping\\.physicalName.*?){5}");
}

@DataProvider
public Object[][] columnMappingModeDataProvider()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
case SUPPORTS_RENAME_SCHEMA:
return false;

case SUPPORTS_DROP_COLUMN:
case SUPPORTS_DROP_FIELD:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_SET_COLUMN_TYPE:
return false;
Expand Down Expand Up @@ -334,6 +334,33 @@ public void testDropNonEmptySchemaWithTable()
assertUpdate("DROP SCHEMA " + schemaName);
}

@Override
public void testDropColumn()
{
// Override because the connector doesn't support dropping columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(super::testDropColumn)
.hasMessageContaining("Cannot drop column from table using column mapping mode NONE");
}

@Override
public void testAddAndDropColumnName(String columnName)
{
// Override because the connector doesn't support dropping columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(() -> super.testAddAndDropColumnName(columnName))
.hasMessageContaining("Cannot drop column from table using column mapping mode NONE");
}

@Override
public void testDropAndAddColumnWithSameName()
{
// Override because the connector doesn't support dropping columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(super::testDropAndAddColumnWithSameName)
.hasMessageContaining("Cannot drop column from table using column mapping mode NONE");
}

@Override
public void testCharVarcharComparison()
{
Expand Down
Loading

0 comments on commit d91e861

Please sign in to comment.