From d91e861ec8fa389c5a6c73e804e12949f72892ae Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Tue, 30 May 2023 11:32:01 +0900 Subject: [PATCH] Add support for dropping column in Delta Lake --- .../plugin/deltalake/DeltaLakeMetadata.java | 82 ++++++++ .../DeltaLakeSchemaSupport.java | 15 ++ .../BaseDeltaLakeConnectorSmokeTest.java | 2 +- .../plugin/deltalake/TestDeltaLakeBasic.java | 48 +++++ .../deltalake/TestDeltaLakeConnectorTest.java | 29 ++- .../TestDeltaLakeColumnMappingMode.java | 189 +++++++++++++++++- .../deltalake/util/DeltaLakeTestUtils.java | 6 + 7 files changed, 367 insertions(+), 4 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index c63687406ba3..eb7ccd2fd052 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -83,6 +83,7 @@ import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ColumnNotFoundException; import io.trino.spi.connector.ConnectorAnalyzeMetadata; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMergeTableHandle; @@ -165,6 +166,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.Maps.filterKeys; import static com.google.common.collect.MoreCollectors.toOptional; import static com.google.common.collect.Sets.difference; import static com.google.common.primitives.Ints.max; @@ -225,6 +227,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnTypes; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsMetadata; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnsNullability; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getExactColumnNames; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getGeneratedColumnExpressions; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getMaxColumnId; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.isAppendOnly; @@ -301,6 +304,7 @@ public class DeltaLakeMetadata public static final String CREATE_TABLE_AS_OPERATION = "CREATE TABLE AS SELECT"; public static final String CREATE_TABLE_OPERATION = "CREATE TABLE"; public static final String ADD_COLUMN_OPERATION = "ADD COLUMNS"; + public static final String DROP_COLUMN_OPERATION = "DROP COLUMNS"; public static final String INSERT_OPERATION = "WRITE"; public static final String MERGE_OPERATION = "MERGE"; public static final String UPDATE_OPERATION = "UPDATE"; // used by old Trino versions and Spark @@ -1393,6 +1397,84 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle } } + @Override + public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle; + DeltaLakeColumnHandle deltaLakeColumn = (DeltaLakeColumnHandle) columnHandle; + verify(deltaLakeColumn.isBaseColumn(), "Unexpected dereference: %s", deltaLakeColumn); + String dropColumnName = deltaLakeColumn.getBaseColumnName(); + MetadataEntry metadataEntry = table.getMetadataEntry(); + + checkSupportedWriterVersion(session, table); + ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry); + if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) { + throw new TrinoException(NOT_SUPPORTED, "Cannot drop column from table using column mapping mode " + columnMappingMode); + } + + ConnectorTableMetadata tableMetadata = getTableMetadata(session, table); + long commitVersion = table.getReadVersion() + 1; + List partitionColumns = getPartitionedBy(tableMetadata.getProperties()); + if (partitionColumns.contains(dropColumnName)) { + throw new TrinoException(NOT_SUPPORTED, "Cannot drop partition column: " + dropColumnName); + } + + // Use equalsIgnoreCase because the remote column name can contain uppercase characters + // Creating a table with ambiguous names (e.g. "a" and "A") is disallowed, so this should be safe + List columns = extractSchema(metadataEntry, typeManager); + List columnNames = getExactColumnNames(metadataEntry).stream() + .filter(name -> !name.equalsIgnoreCase(dropColumnName)) + .collect(toImmutableList()); + if (columns.size() == columnNames.size()) { + throw new ColumnNotFoundException(table.schemaTableName(), dropColumnName); + } + Map physicalColumnNameMapping = columns.stream() + .collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getPhysicalName)); + + Map columnTypes = filterKeys(getColumnTypes(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName)); + Map columnComments = filterKeys(getColumnComments(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName)); + Map columnsNullability = filterKeys(getColumnsNullability(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName)); + Map> columnMetadata = filterKeys(getColumnsMetadata(metadataEntry), name -> !name.equalsIgnoreCase(dropColumnName)); + try { + TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, table.getLocation()); + appendTableEntries( + commitVersion, + transactionLogWriter, + metadataEntry.getId(), + columnNames, + metadataEntry.getOriginalPartitionColumns(), + columnTypes, + columnComments, + columnsNullability, + columnMetadata, + metadataEntry.getConfiguration(), + DROP_COLUMN_OPERATION, + session, + Optional.ofNullable(metadataEntry.getDescription()), + getProtocolEntry(session, table)); + transactionLogWriter.flush(); + } + catch (Exception e) { + throw new TrinoException(DELTA_LAKE_BAD_WRITE, format("Unable to drop '%s' column from: %s.%s", dropColumnName, table.getSchemaName(), table.getTableName()), e); + } + + try { + statisticsAccess.readExtendedStatistics(session, table.getSchemaTableName(), table.getLocation()).ifPresent(existingStatistics -> { + ExtendedStatistics statistics = new ExtendedStatistics( + existingStatistics.getAlreadyAnalyzedModifiedTimeMax(), + existingStatistics.getColumnStatistics().entrySet().stream() + .filter(stats -> !stats.getKey().equalsIgnoreCase(toPhysicalColumnName(dropColumnName, Optional.of(physicalColumnNameMapping)))) + .collect(toImmutableMap(Entry::getKey, Entry::getValue)), + existingStatistics.getAnalyzedColumns() + .map(analyzedColumns -> analyzedColumns.stream().filter(column -> !column.equalsIgnoreCase(dropColumnName)).collect(toImmutableSet()))); + statisticsAccess.updateExtendedStatistics(session, table.getSchemaTableName(), table.getLocation(), statistics); + }); + } + catch (Exception e) { + LOG.warn(e, "Failed to update extended statistics when dropping %s column from %s table", dropColumnName, table.schemaTableName()); + } + } + private void appendTableEntries( long commitVersion, TransactionLogWriter transactionLogWriter, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index ef703c9dd130..518a30f6ea72 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -516,6 +516,21 @@ private static Map getColumnProperty(String json, Function getExactColumnNames(MetadataEntry metadataEntry) + { + try { + return stream(OBJECT_MAPPER.readTree(metadataEntry.getSchemaString()).get("fields").elements()) + .map(field -> field.get("name").asText()) + .collect(toImmutableList()); + } + catch (JsonProcessingException e) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, getLocation(e), "Failed to parse serialized schema: " + metadataEntry.getSchemaString(), e); + } + } + public static Set unsupportedReaderFeatures(Set features) { return Sets.difference(features, SUPPORTED_READER_FEATURES); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index e4b78efbdbed..b3dc44f763c2 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -485,7 +485,7 @@ public void testDropAndRecreateTable() public void testDropColumnNotSupported() { registerTableFromResources("testdropcolumn", "io/trino/plugin/deltalake/testing/resources/databricks/nation", getQueryRunner()); - assertQueryFails("ALTER TABLE testdropcolumn DROP COLUMN comment", ".*This connector does not support dropping columns.*"); + assertQueryFails("ALTER TABLE testdropcolumn DROP COLUMN comment", "Cannot drop column from table using column mapping mode NONE"); } @Test diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index d999b91c582b..dfa6758cf5a7 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -284,6 +284,54 @@ public void testOptimizeWithColumnMappingMode(String columnMappingMode) } } + /** + * @see deltalake.column_mapping_mode_id + * @see deltalake.column_mapping_mode_name + */ + @Test(dataProvider = "columnMappingModeDataProvider") + public void testDropColumnWithColumnMappingMode(String columnMappingMode) + throws Exception + { + // The table contains 'x' column with column mapping mode + String tableName = "test_add_column_" + randomNameSuffix(); + Path tableLocation = Files.createTempFile(tableName, null); + copyDirectoryContents(new File(Resources.getResource("deltalake/column_mapping_mode_" + columnMappingMode).toURI()).toPath(), tableLocation); + + assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); + assertThat(query("DESCRIBE " + tableName)).projected("Column", "Type").skippingTypesCheck().matches("VALUES ('x', 'integer')"); + assertQueryReturnsEmptyResult("SELECT * FROM " + tableName); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN second_col row(a array(integer), b map(integer, integer), c row(field integer))"); + MetadataEntry metadata = loadMetadataEntry(1, tableLocation); + Assertions.assertThat(metadata.getConfiguration().get("delta.columnMapping.maxColumnId")) + .isEqualTo("6"); // +5 comes from second_col + second_col.a + second_col.b + second_col.c + second_col.c.field + Assertions.assertThat(metadata.getSchemaString()) + .containsPattern("(delta\\.columnMapping\\.id.*?){6}") + .containsPattern("(delta\\.columnMapping\\.physicalName.*?){6}"); + + JsonNode schema = OBJECT_MAPPER.readTree(metadata.getSchemaString()); + List fields = ImmutableList.copyOf(schema.get("fields").elements()); + Assertions.assertThat(fields).hasSize(2); + JsonNode nestedColumn = fields.get(1); + List rowFields = ImmutableList.copyOf(nestedColumn.get("type").get("fields").elements()); + Assertions.assertThat(rowFields).hasSize(3); + + // Drop 'x' column and verify that nested metadata and table configuration are preserved + assertUpdate("ALTER TABLE " + tableName + " DROP COLUMN x"); + + MetadataEntry droppedMetadata = loadMetadataEntry(2, tableLocation); + JsonNode droppedSchema = OBJECT_MAPPER.readTree(droppedMetadata.getSchemaString()); + List droppedFields = ImmutableList.copyOf(droppedSchema.get("fields").elements()); + Assertions.assertThat(droppedFields).hasSize(1); + Assertions.assertThat(droppedFields.get(0)).isEqualTo(nestedColumn); + + Assertions.assertThat(droppedMetadata.getConfiguration()) + .isEqualTo(metadata.getConfiguration()); + Assertions.assertThat(droppedMetadata.getSchemaString()) + .containsPattern("(delta\\.columnMapping\\.id.*?){5}") + .containsPattern("(delta\\.columnMapping\\.physicalName.*?){5}"); + } + @DataProvider public Object[][] columnMappingModeDataProvider() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java index 152cf2d0627e..c446d7807157 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java @@ -149,7 +149,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) case SUPPORTS_RENAME_SCHEMA: return false; - case SUPPORTS_DROP_COLUMN: + case SUPPORTS_DROP_FIELD: case SUPPORTS_RENAME_COLUMN: case SUPPORTS_SET_COLUMN_TYPE: return false; @@ -334,6 +334,33 @@ public void testDropNonEmptySchemaWithTable() assertUpdate("DROP SCHEMA " + schemaName); } + @Override + public void testDropColumn() + { + // Override because the connector doesn't support dropping columns with 'none' column mapping + // There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode + assertThatThrownBy(super::testDropColumn) + .hasMessageContaining("Cannot drop column from table using column mapping mode NONE"); + } + + @Override + public void testAddAndDropColumnName(String columnName) + { + // Override because the connector doesn't support dropping columns with 'none' column mapping + // There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode + assertThatThrownBy(() -> super.testAddAndDropColumnName(columnName)) + .hasMessageContaining("Cannot drop column from table using column mapping mode NONE"); + } + + @Override + public void testDropAndAddColumnWithSameName() + { + // Override because the connector doesn't support dropping columns with 'none' column mapping + // There are some tests in in io.trino.tests.product.deltalake.TestDeltaLakeColumnMappingMode + assertThatThrownBy(super::testDropAndAddColumnWithSameName) + .hasMessageContaining("Cannot drop column from table using column mapping mode NONE"); + } + @Override public void testCharVarcharComparison() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java index 6a994a263671..7608abb65254 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java @@ -25,6 +25,7 @@ import java.math.BigDecimal; import java.time.LocalDate; import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Consumer; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -44,12 +45,14 @@ import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnCommentOnTrino; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnNamesOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTableCommentOnDelta; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTableCommentOnTrino; import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getTablePropertyOnDelta; import static io.trino.tests.product.utils.QueryExecutors.onDelta; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; public class TestDeltaLakeColumnMappingMode @@ -1007,8 +1010,6 @@ public void testUnsupportedOperationsColumnMappingMode(String mode) try { assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN a_number TO renamed_column")) .hasMessageContaining("This connector does not support renaming columns"); - assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN a_number")) - .hasMessageContaining("This connector does not support dropping columns"); } finally { dropDeltaTableWithRetry("default." + tableName); @@ -1446,6 +1447,190 @@ public Object[][] columnMappingWithTrueAndFalseDataProvider() return cartesianProduct(supportedColumnMappingForDmlDataProvider(), trueFalse()); } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testTrinoDropColumnWithColumnMappingMode(String mode) + { + testDropColumnWithColumnMappingMode(mode, (tableName, columnName) -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN " + columnName)); + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testSparkDropColumnWithColumnMappingMode(String mode) + { + testDropColumnWithColumnMappingMode(mode, (tableName, columnName) -> onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN " + columnName)); + } + + private void testDropColumnWithColumnMappingMode(String mode, BiConsumer dropColumn) + { + String tableName = "test_drop_column_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (id INT, data INT, part STRING)" + + " USING delta " + + " PARTITIONED BY (part) " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')"); + + try { + Assertions.assertThat(getTablePropertyOnDelta("default", tableName, "delta.columnMapping.maxColumnId")) + .isEqualTo("3"); + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 10, 'part#1')"); + + dropColumn.accept(tableName, "data"); + assertThatThrownBy(() -> dropColumn.accept(tableName, "part")) + .hasMessageMatching("(?s).*(Cannot drop partition column: part|Dropping partition columns \\(part\\) is not allowed).*"); + Assertions.assertThat(getTablePropertyOnDelta("default", tableName, "delta.columnMapping.maxColumnId")) + .isEqualTo("3"); + + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, "part#1")); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, "part#1")); + + // Verify adding a new column with the same name doesn't allow accessing the old data + onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN data INTEGER"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, "part#1", null)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, "part#1", null)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testTrinoExtendedStatisticsDropAndAddColumnWithColumnMappingMode(String mode) + { + String tableName = "test_drop_and_add_column_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a INT, b INT)" + + " USING delta " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')"); + + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 2)"); + onTrino().executeQuery("ANALYZE delta.default." + tableName); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly( + row("a", null, 1.0, 0.0, null, "1", "1"), + row("b", null, 1.0, 0.0, null, "2", "2"), + row(null, null, null, null, 1.0, null, null)); + + onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN b"); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly( + row("a", null, 1.0, 0.0, null, "1", "1"), + row(null, null, null, null, 1.0, null, null)); + + // TODO: Add a new column on Trino once the connector supports adding a column with the column mapping mode + onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN b INTEGER"); + + // Verify column statistics of dropped column isn't restored + onTrino().executeQuery("ANALYZE delta.default." + tableName); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly( + row("a", null, 1.0, 0.0, null, "1", "1"), + row("b", 0.0, 0.0, 1.0, null, null, null), + row(null, null, null, null, 1.0, null, null)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDropNonLowercaseColumnWithColumnMappingMode(String mode) + { + String tableName = "test_drop_non_lowercase_column_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (UPPER_ID INT, UPPER_DATA INT, UPPER_PART STRING)" + + " USING delta " + + " PARTITIONED BY (UPPER_PART) " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')"); + + try { + assertThat(getColumnNamesOnDelta("default", tableName)) + .containsExactly("UPPER_ID", "UPPER_DATA", "UPPER_PART"); + + onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 10, 'part#1')"); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("upper_id", null, 1.0, 0.0, null, "1", "1"), + row("upper_data", null, 1.0, 0.0, null, "10", "10"), + row("upper_part", null, 1.0, 0.0, null, null, null), + row(null, null, null, null, 1.0, null, null))); + + onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN upper_data"); + assertThat(getColumnNamesOnDelta("default", tableName)) + .containsExactly("UPPER_ID", "UPPER_PART"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, "part#1")); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("upper_id", null, 1.0, 0.0, null, "1", "1"), + row("upper_part", null, 1.0, 0.0, null, null, null), + row(null, null, null, null, 1.0, null, null))); + + assertThatThrownBy(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN upper_part")) + .hasMessageContaining("Cannot drop partition column"); + + // Verify adding a column with the same name doesn't restore the old statistics + onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN UPPER_DATA INT"); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("upper_id", null, 1.0, 0.0, null, "1", "1"), + row("upper_data", null, null, null, null, null, null), + row("upper_part", null, 1.0, 0.0, null, null, null), + row(null, null, null, null, 1.0, null, null))); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testUnsupportedDropColumnWithColumnMappingModeNone() + { + String tableName = "test_unsupported_drop_column_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (id INT, data INT, part STRING)" + + " USING delta " + + " PARTITIONED BY (part) " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES ('delta.columnMapping.mode' = 'none')"); + + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10, 'part#1')"); + + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN part")) + .hasMessageContaining("Cannot drop column from table using column mapping mode NONE"); + assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN part")) + .hasMessageContaining("DROP COLUMN is not supported for your Delta table"); + + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 10, "part#1")); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 10, "part#1")); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + @DataProvider public Object[][] columnMappingDataProvider() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java index fa8e2ab8fa2e..4d84b362acf7 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/DeltaLakeTestUtils.java @@ -64,6 +64,12 @@ public static Optional getDatabricksRuntimeVersion() return Optional.of(DatabricksVersion.parse(version)); } + public static List getColumnNamesOnDelta(String schemaName, String tableName) + { + QueryResult result = onDelta().executeQuery("SHOW COLUMNS IN " + schemaName + "." + tableName); + return result.column(1); + } + public static String getColumnCommentOnTrino(String schemaName, String tableName, String columnName) { return (String) onTrino()