Skip to content

Commit

Permalink
Add support for renaming column in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 15, 2023
1 parent 0df8e32 commit 2990111
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdfFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format;
Expand Down Expand Up @@ -268,9 +269,11 @@ public class DeltaLakeMetadata
LAZY_SIMPLE_SERDE_CLASS,
SEQUENCEFILE_INPUT_FORMAT_CLASS,
HIVE_SEQUENCEFILE_OUTPUT_FORMAT_CLASS);
// Operation names in Delta Lake https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT";
public static final String CREATE_TABLE_OPERATION = "CREATE TABLE";
public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS";
public static final String RENAME_COLUMN_OPERATION = "RENAME COLUMN";
public static final String INSERT_OPERATION = "WRITE";
public static final String MERGE_OPERATION = "MERGE";
public static final String OPTIMIZE_OPERATION = "OPTIMIZE";
Expand Down Expand Up @@ -1221,6 +1224,66 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
}
}

@Override
public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle, String newColumnName)
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle;
String sourceColumnName = deltaLakeColumn.getName();

ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) {
throw new TrinoException(NOT_SUPPORTED, "Cannot rename column with the column mapping: " + columnMappingMode);
}

ConnectorTableMetadata tableMetadata = getTableMetadata(session, table);
long commitVersion = table.getReadVersion() + 1;
List<String> partitionColumns = getPartitionedBy(tableMetadata.getProperties()).stream()
.map(columnName -> columnName.equals(sourceColumnName) ? newColumnName : columnName)
.collect(toImmutableList());

List<DeltaLakeColumnHandle> columns = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.map(column -> toColumnHandle(
column.getName().equals(sourceColumnName) ? ColumnMetadata.builderFrom(column).setName(newColumnName).build() : column,
column.getName().equals(sourceColumnName) ? newColumnName : column.getName(),
column.getType(),
partitionColumns))
.collect(toImmutableList());
Map<String, String> columnComments = getColumnComments(table.getMetadataEntry()).entrySet().stream()
.map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column)
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, Boolean> columnsNullability = getColumnsNullability(table.getMetadataEntry()).entrySet().stream()
.map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column)
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, Map<String, Object>> columnMetadata = getColumnsMetadata(table.getMetadataEntry()).entrySet().stream()
.map(column -> column.getKey().equals(sourceColumnName) ? Map.entry(newColumnName, column.getValue()) : column)
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
try {
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation());
appendTableEntries(
commitVersion,
transactionLogWriter,
table.getMetadataEntry().getId(),
columns,
partitionColumns,
columnComments,
columnsNullability,
columnMetadata,
table.getMetadataEntry().getConfiguration(),
RENAME_COLUMN_OPERATION,
session,
nodeVersion,
nodeId,
Optional.ofNullable(table.getMetadataEntry().getDescription()),
getProtocolEntry(session, table.getSchemaTableName()));
transactionLogWriter.flush();
}
catch (Exception e) {
throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to rename '%s' column for: %s.%s", sourceColumnName, table.getSchemaName(), table.getTableName()), e);
}
}

private static void appendTableEntries(
long commitVersion,
TransactionLogWriter transactionLogWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
return false;

case SUPPORTS_DROP_COLUMN:
case SUPPORTS_RENAME_COLUMN:
case SUPPORTS_SET_COLUMN_TYPE:
return false;

Expand Down Expand Up @@ -363,6 +362,33 @@ public void testDropNonEmptySchemaWithTable()
assertUpdate("DROP SCHEMA " + schemaName);
}

@Override
public void testRenameColumn()
{
// Override because the connector doesn't support renaming columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(super::testRenameColumn)
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
}

@Override
public void testAlterTableRenameColumnToLongName()
{
// Override because the connector doesn't support renaming columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(super::testAlterTableRenameColumnToLongName)
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
}

@Override
public void testRenameColumnName(String columnName)
{
// Override because the connector doesn't support renaming columns with 'none' column mapping
// There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode
assertThatThrownBy(() -> super.testRenameColumnName(columnName))
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
}

@Override
public void testCharVarcharComparison()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public List<SuiteTestRun> getTestRuns(EnvironmentConfig config)
return ImmutableList.of(
testOnEnvironment(EnvSinglenodeDeltaLakeDatabricks104.class)
.withGroups("configured_features", "delta-lake-databricks")
.withExcludedGroups("delta-lake-exclude-104")
.withExcludedTests(getExcludedTests())
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public final class TestGroups
public static final String DELTA_LAKE_DATABRICKS = "delta-lake-databricks";
public static final String DELTA_LAKE_EXCLUDE_73 = "delta-lake-exclude-73";
public static final String DELTA_LAKE_EXCLUDE_91 = "delta-lake-exclude-91";
public static final String DELTA_LAKE_EXCLUDE_104 = "delta-lake-exclude-104";
public static final String DELTA_LAKE_EXCLUDE_113 = "delta-lake-exclude-113";
public static final String HUDI = "hudi";
public static final String PARQUET = "parquet";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static io.trino.tempto.assertions.QueryAssert.assertThat;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91;
import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS;
Expand Down Expand Up @@ -350,8 +351,6 @@ public void testUnsupportedOperationsColumnMappingModeName(String mode)
.hasMessageContaining("Delta Lake writer version 5 which is not supported");
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN new_col varchar"))
.hasMessageContaining("Delta Lake writer version 5 which is not supported");
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN a_number TO renamed_column"))
.hasMessageContaining("This connector does not support renaming columns");
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN a_number"))
.hasMessageContaining("This connector does not support dropping columns");
}
Expand Down Expand Up @@ -391,6 +390,135 @@ public void testSpecialCharacterColumnNamesWithColumnMappingMode(String mode)
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testTrinoRenameColumnWithColumnMappingMode(String mode)
{
String tableName = "test_rename_column_" + randomNameSuffix();

onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
" (id INT, data INT, part STRING)" +
" USING delta " +
" PARTITIONED BY (part) " +
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");

try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')");

onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN data TO new_data");
onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN part TO new_part");

assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
.containsOnly(
row("id", "integer", "", ""),
row("new_data", "integer", "", ""),
row("new_part", "varchar", "", ""));

assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 10, "part#1"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 10, "part#1"));

// Ensure renaming to the dropped column doesn't restore the old data
// TODO: Drop a column in Trino once the connector supports the syntax
assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN id"))
.hasMessageContaining("This connector does not support dropping columns");
onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN id");
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN new_data TO id");

assertThat(onTrino().executeQuery("SELECT id, new_part FROM delta.default." + tableName))
.containsOnly(row(10, "part#1"));
assertThat(onDelta().executeQuery("SELECT id, new_part FROM default." + tableName))
.containsOnly(row(10, "part#1"));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider")
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testSparkRenameColumnWithColumnMappingMode(String mode)
{
String tableName = "test_spark_rename_column_" + randomNameSuffix();

onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
" (id INT, data INT, part STRING)" +
" USING delta " +
" PARTITIONED BY (part) " +
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
" TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')");

try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')");

onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN data TO new_data");
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN part TO new_part");

assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
.containsOnly(
row("id", "integer", "", ""),
row("new_data", "integer", "", ""),
row("new_part", "varchar", "", ""));

assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 10, "part#1"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 10, "part#1"));

// Ensure renaming to the dropped column doesn't restore the old data
onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN id");
onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN new_data TO id");

assertThat(onTrino().executeQuery("SELECT id, new_part FROM delta.default." + tableName))
.containsOnly(row(10, "part#1"));
assertThat(onDelta().executeQuery("SELECT id, new_part FROM default." + tableName))
.containsOnly(row(10, "part#1"));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS})
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testUnsupportedRenameColumnWithColumnMappingModeNone()
{
String tableName = "test_unsupported_rename_column_" + randomNameSuffix();

onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
" (id INT, data INT)" +
" USING delta " +
" LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" +
" TBLPROPERTIES ('delta.columnMapping.mode' = 'none')");

try {
onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10)");

assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN data TO new_data"))
.hasMessageContaining("Cannot rename column with the column mapping: NONE");
assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN data TO new_data"))
.hasMessageContaining(" Column rename is not supported for your Delta table");

assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
.containsOnly(
row("id", "integer", "", ""),
row("data", "integer", "", ""));

assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.containsOnly(row(1, 10));
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 10));
}
finally {
onDelta().executeQuery("DROP TABLE default." + tableName);
}
}

@DataProvider
public Object[][] columnMappingDataProvider()
{
Expand Down

0 comments on commit 2990111

Please sign in to comment.