From 789c35fc2849d81fa0db15bf659e610217c7b040 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 6 Apr 2023 10:24:29 +0900 Subject: [PATCH 1/2] Static import Locale.ENGLISH in DeltaLakeMetadata --- .../java/io/trino/plugin/deltalake/DeltaLakeMetadata.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 29af8f54bb43..57736e34f422 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -138,7 +138,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -256,6 +255,7 @@ import static java.time.Instant.EPOCH; import static java.util.Collections.singletonList; import static java.util.Collections.unmodifiableMap; +import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; import static java.util.function.Function.identity; @@ -1598,7 +1598,7 @@ private Optional getLayoutForOptimize(DeltaLakeTableExecut return Optional.empty(); } Map columnsByName = optimizeHandle.getTableColumns().stream() - .collect(toImmutableMap(columnHandle -> columnHandle.getName().toLowerCase(Locale.ENGLISH), identity())); + .collect(toImmutableMap(columnHandle -> columnHandle.getName().toLowerCase(ENGLISH), identity())); ImmutableList.Builder partitioningColumns = ImmutableList.builder(); for (String columnName : partitionColumnNames) { partitioningColumns.add(columnsByName.get(columnName)); From 47a0080eb9f94d398a4eedb1b14dc387a1c4d989 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 16 Feb 2023 14:50:48 +0900 Subject: [PATCH 2/2] Support DML operations on Delta tables with `name` column mapping Co-Authored-By: Maxim Lukyanenko --- .../deltalake/AbstractDeltaLakePageSink.java | 4 +- .../plugin/deltalake/DeltaLakeMergeSink.java | 5 +- .../plugin/deltalake/DeltaLakeMetadata.java | 122 +++- .../plugin/deltalake/DeltaLakeWriter.java | 4 +- ...HiveMetastoreBackedDeltaLakeMetastore.java | 2 +- .../checkpoint/CheckpointWriter.java | 2 +- .../TestDeltaLakeAlterTableCompatibility.java | 39 +- .../TestDeltaLakeColumnMappingMode.java | 534 +++++++++++++++++- 8 files changed, 662 insertions(+), 50 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java index 7fbe150249f5..da4def26a654 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/AbstractDeltaLakePageSink.java @@ -162,8 +162,8 @@ public AbstractDeltaLakePageSink( case REGULAR: dataColumnHandles.add(column); dataColumnsInputIndex.add(inputIndex); - dataColumnNames.add(column.getName()); - dataColumnTypes.add(column.getType()); + dataColumnNames.add(column.getPhysicalName()); + dataColumnTypes.add(column.getPhysicalType()); break; case SYNTHESIZED: processSynthesizedColumn(column); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 006c513d8631..97143e623df3 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -364,11 +364,10 @@ private FileWriter createParquetFileWriter(String path, List fileSystem.deleteFile(path); List parquetTypes = dataColumns.stream() - .map(column -> toParquetType(typeOperators, column.getType())) + .map(column -> toParquetType(typeOperators, column.getPhysicalType())) .collect(toImmutableList()); - List dataColumnNames = dataColumns.stream() - .map(DeltaLakeColumnHandle::getName) + .map(DeltaLakeColumnHandle::getPhysicalName) .collect(toImmutableList()); ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter( parquetTypes, diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 57736e34f422..8187e86f4e0c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -45,6 +45,7 @@ import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.CdfFileEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; +import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format; @@ -146,6 +147,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -192,8 +194,8 @@ import static io.trino.plugin.deltalake.metastore.HiveMetastoreBackedDeltaLakeMetastore.TABLE_PROVIDER_VALUE; import static io.trino.plugin.deltalake.procedure.DeltaLakeTableProcedureId.OPTIMIZE; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.APPEND_ONLY_CONFIGURATION_KEY; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NONE; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled; -import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractColumnMetadata; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getCheckConstraints; @@ -286,7 +288,7 @@ public class DeltaLakeMetadata public static final int DEFAULT_WRITER_VERSION = 2; // The highest reader and writer versions Trino supports private static final int MAX_READER_VERSION = 3; - private static final int MAX_WRITER_VERSION = 4; + private static final int MAX_WRITER_VERSION = 5; private static final int CDF_SUPPORTED_WRITER_VERSION = 4; // Matches the dummy column Databricks stores in the metastore @@ -1009,7 +1011,8 @@ public Optional finishCreateTable( Optional.empty(), location, maxFileModificationTime, - computedStatistics); + computedStatistics, + Optional.empty()); // TODO: Pass physical column names mapping when supporting table creation with column mapping mode } PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow()); @@ -1061,6 +1064,11 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table { DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); checkSupportedWriterVersion(session, handle.getSchemaTableName()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); + if (columnMappingMode != NONE) { + // TODO https://github.com/trinodb/trino/issues/12638 Support setting a table comment for id and name column mapping mode + throw new TrinoException(NOT_SUPPORTED, "Setting a table comment with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH))); + } ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); @@ -1101,6 +1109,11 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle) tableHandle; DeltaLakeColumnHandle deltaLakeColumnHandle = (DeltaLakeColumnHandle) column; checkSupportedWriterVersion(session, deltaLakeTableHandle.getSchemaTableName()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(deltaLakeTableHandle.getMetadataEntry()); + if (columnMappingMode != NONE) { + // TODO https://github.com/trinodb/trino/issues/12638 Support setting a column comment for id and name column mapping mode + throw new TrinoException(NOT_SUPPORTED, "Setting a column comment with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH))); + } ConnectorTableMetadata tableMetadata = getTableMetadata(session, deltaLakeTableHandle); @@ -1146,6 +1159,11 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle { DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle); checkSupportedWriterVersion(session, handle.getSchemaTableName()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); + if (columnMappingMode != NONE) { + // TODO https://github.com/trinodb/trino/issues/12638 Support adding a column for id and name column mapping mode + throw new TrinoException(NOT_SUPPORTED, "Adding a column with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH))); + } if (changeDataFeedEnabled(handle.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) { throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName())); } @@ -1268,8 +1286,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto format("Inserts are not enabled on the %1$s filesystem in order to avoid eventual data corruption which may be caused by concurrent data modifications on the table. " + "Writes to the %1$s filesystem can be however enabled with the '%2$s' configuration property.", fileSystem, ENABLE_NON_CONCURRENT_WRITES_CONFIGURATION_KEY)); } - checkUnsupportedGeneratedColumns(table.getMetadataEntry()); - checkSupportedWriterVersion(session, table.getSchemaTableName()); + checkWriteSupported(session, table.getSchemaTableName(), table.getMetadataEntry()); List inputColumns = columns.stream() .map(handle -> (DeltaLakeColumnHandle) handle) @@ -1351,8 +1368,9 @@ public Optional finishInsert( // it is not obvious why we need to persist this readVersion transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, commitVersion, createdTime, INSERT_OPERATION, handle.getReadVersion())); - // Note: during writes we want to preserve original case of partition columns - List partitionColumns = handle.getMetadataEntry().getOriginalPartitionColumns(); + List partitionColumns = getColumnMappingMode(handle.getMetadataEntry()) == ColumnMappingMode.NAME + ? getPartitionColumnsForNameMapping(handle.getMetadataEntry().getOriginalPartitionColumns(), handle.getInputColumns()) + : handle.getMetadataEntry().getOriginalPartitionColumns(); appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, true); transactionLogWriter.flush(); @@ -1370,7 +1388,9 @@ public Optional finishInsert( Optional.empty(), handle.getLocation(), maxFileModificationTime, - computedStatistics); + computedStatistics, + Optional.of(extractSchema(handle.getMetadataEntry(), typeManager).stream() + .collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getPhysicalName)))); } } catch (Exception e) { @@ -1384,6 +1404,22 @@ public Optional finishInsert( return Optional.empty(); } + private static List getPartitionColumnsForNameMapping(List originalPartitionColumns, List dataColumns) + { + Map nameToDataColumns = dataColumns.stream() + .collect(toImmutableMap(DeltaLakeColumnHandle::getName, Function.identity())); + return originalPartitionColumns.stream() + .map(columnName -> { + DeltaLakeColumnHandle dataColumn = nameToDataColumns.get(columnName.toLowerCase(ENGLISH)); + // During writes we want to preserve original case of partition columns, if the name is not different from the physical name + if (dataColumn.getPhysicalName().equalsIgnoreCase(columnName)) { + return columnName; + } + return dataColumn.getPhysicalName(); + }) + .collect(toImmutableList()); + } + @Override public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle) { @@ -1422,8 +1458,12 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT if (!getCheckConstraints(handle.getMetadataEntry()).isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported"); } - checkUnsupportedGeneratedColumns(handle.getMetadataEntry()); - checkSupportedWriterVersion(session, handle.getSchemaTableName()); + ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry()); + if (changeDataFeedEnabled(handle.getMetadataEntry()) && columnMappingMode != NONE) { + // TODO https://github.com/trinodb/trino/issues/16967 Support CDF for tables with 'id' and 'name' column mapping + throw new TrinoException(NOT_SUPPORTED, "Unsupported column mapping mode for tables with change data feed enabled: " + columnMappingMode.name().toLowerCase(ENGLISH)); + } + checkWriteSupported(session, handle.getSchemaTableName(), handle.getMetadataEntry()); ConnectorTableMetadata tableMetadata = getTableMetadata(session, handle); @@ -1439,7 +1479,8 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT @Override public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tableHandle, Collection fragments, Collection computedStatistics) { - DeltaLakeTableHandle handle = ((DeltaLakeMergeTableHandle) tableHandle).getTableHandle(); + DeltaLakeMergeTableHandle mergeTableHandle = (DeltaLakeMergeTableHandle) tableHandle; + DeltaLakeTableHandle handle = mergeTableHandle.getTableHandle(); List mergeResults = fragments.stream() .map(Slice::getBytes) @@ -1497,7 +1538,9 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle tabl transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(file, writeTimestamp, true)); } - List partitionColumns = handle.getMetadataEntry().getOriginalPartitionColumns(); + List partitionColumns = getColumnMappingMode(handle.getMetadataEntry()) == ColumnMappingMode.NAME + ? getPartitionColumnsForNameMapping(handle.getMetadataEntry().getOriginalPartitionColumns(), mergeTableHandle.getInsertTableHandle().getInputColumns()) + : handle.getMetadataEntry().getOriginalPartitionColumns(); appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, true); transactionLogWriter.flush(); @@ -1637,6 +1680,11 @@ private BeginTableExecuteResult( executeHandle.withProcedureHandle(optimizeHandle.withCurrentVersion(table.getReadVersion())), @@ -1727,6 +1775,17 @@ private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableH } } + private void checkWriteSupported(ConnectorSession session, SchemaTableName schemaTableName, MetadataEntry metadataEntry) + { + checkSupportedWriterVersion(session, schemaTableName); + checkUnsupportedGeneratedColumns(metadataEntry); + ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry); + if (!(columnMappingMode == NONE || columnMappingMode == ColumnMappingMode.NAME)) { + throw new TrinoException(NOT_SUPPORTED, "Writing with column mapping %s is not supported".formatted(columnMappingMode.name().toLowerCase(ENGLISH))); + } + // TODO: Check writer-features + } + private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry) { Map columnGeneratedExpressions = getGeneratedColumnExpressions(metadataEntry); @@ -2188,10 +2247,10 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession alreadyAnalyzedModifiedTimeMax.orElse(EPOCH))); } - List columnsMetadata = extractColumnMetadata(metadata, typeManager); - Set allColumnNames = columnsMetadata.stream() - .map(ColumnMetadata::getName) - .collect(toImmutableSet()); + List columnsMetadata = extractSchema(metadata, typeManager); + Map physicalColumnNameMapping = columnsMetadata.stream() + .collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getPhysicalName)); + Set allColumnNames = physicalColumnNameMapping.keySet(); Optional> analyzeColumnNames = getColumnNames(analyzeProperties); if (analyzeColumnNames.isPresent()) { Set columnNames = analyzeColumnNames.get(); @@ -2234,7 +2293,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession false); TableStatisticsMetadata statisticsMetadata = getStatisticsCollectionMetadata( - columnsMetadata, + columnsMetadata.stream().map(DeltaLakeColumnMetadata::getColumnMetadata).collect(toImmutableList()), analyzeColumnNames.orElse(allColumnNames), true); @@ -2319,12 +2378,15 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH AnalyzeHandle analyzeHandle = tableHandle.getAnalyzeHandle().orElseThrow(() -> new IllegalArgumentException("analyzeHandle not set")); String location = metastore.getTableLocation(tableHandle.getSchemaTableName()); Optional maxFileModificationTime = getMaxFileModificationTime(computedStatistics); + Map physicalColumnNameMapping = extractSchema(tableHandle.getMetadataEntry(), typeManager).stream() + .collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getPhysicalName)); updateTableStatistics( session, Optional.of(analyzeHandle), location, maxFileModificationTime, - computedStatistics); + computedStatistics, + Optional.of(physicalColumnNameMapping)); } private void updateTableStatistics( @@ -2332,7 +2394,8 @@ private void updateTableStatistics( Optional analyzeHandle, String location, Optional maxFileModificationTime, - Collection computedStatistics) + Collection computedStatistics, + Optional> physicalColumnNameMapping) { Optional oldStatistics = statisticsAccess.readExtendedStatistics(session, location); @@ -2348,6 +2411,11 @@ private void updateTableStatistics( Map newColumnStatistics = toDeltaLakeColumnStatistics(computedStatistics); Map mergedColumnStatistics = newColumnStatistics.entrySet().stream() + .map(entry -> { + String columnName = entry.getKey(); + String physicalColumnName = toPhysicalColumnName(columnName, physicalColumnNameMapping); + return Map.entry(physicalColumnName, entry.getValue()); + }) .collect(toImmutableMap( Entry::getKey, entry -> { @@ -2377,9 +2445,12 @@ private void updateTableStatistics( } analyzedColumns.ifPresent(analyzeColumns -> { - if (!mergedColumnStatistics.keySet().equals(analyzeColumns)) { + Set analyzePhysicalColumns = analyzeColumns.stream() + .map(columnName -> toPhysicalColumnName(columnName, physicalColumnNameMapping)) + .collect(toImmutableSet()); + if (!mergedColumnStatistics.keySet().equals(analyzePhysicalColumns)) { // sanity validation - throw new IllegalStateException(format("Unexpected columns in in mergedColumnStatistics %s; expected %s", mergedColumnStatistics.keySet(), analyzeColumns)); + throw new IllegalStateException(format("Unexpected columns in in mergedColumnStatistics %s; expected %s", mergedColumnStatistics.keySet(), analyzePhysicalColumns)); } }); @@ -2391,6 +2462,15 @@ private void updateTableStatistics( statisticsAccess.updateExtendedStatistics(session, location, mergedExtendedStatistics); } + private static String toPhysicalColumnName(String columnName, Optional> physicalColumnNameMapping) + { + if (physicalColumnNameMapping.isPresent()) { + String physicalColumnName = physicalColumnNameMapping.get().get(columnName); + return requireNonNull(physicalColumnName, () -> "%s doesn't exist in %s".formatted(columnName, physicalColumnNameMapping)); + } + return columnName; + } + @Override public boolean supportsReportingWrittenBytes(ConnectorSession session, ConnectorTableHandle connectorTableHandle) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index dcc2569d9db9..6dab9ef9e8f9 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -185,8 +185,8 @@ public long getRowCount() public DataFileInfo getDataFileInfo() throws IOException { - List dataColumnNames = columnHandles.stream().map(DeltaLakeColumnHandle::getName).collect(toImmutableList()); - List dataColumnTypes = columnHandles.stream().map(DeltaLakeColumnHandle::getType).collect(toImmutableList()); + List dataColumnNames = columnHandles.stream().map(DeltaLakeColumnHandle::getPhysicalName).collect(toImmutableList()); + List dataColumnTypes = columnHandles.stream().map(DeltaLakeColumnHandle::getPhysicalType).collect(toImmutableList()); return new DataFileInfo( relativeFilePath, getWrittenBytes(), diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java index 0bde3bb82b7e..da8d2e043b21 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/HiveMetastoreBackedDeltaLakeMetastore.java @@ -375,7 +375,7 @@ else if (isValidInRange(minValue)) { columnStatsBuilder.setDistinctValuesCount(Estimate.of(partitioningColumnsDistinctValues.get(column).size())); } if (statistics.isPresent()) { - DeltaLakeColumnStatistics deltaLakeColumnStatistics = statistics.get().getColumnStatistics().get(column.getName()); + DeltaLakeColumnStatistics deltaLakeColumnStatistics = statistics.get().getColumnStatistics().get(column.getPhysicalName()); if (deltaLakeColumnStatistics != null && column.getColumnType() != PARTITION_KEY) { deltaLakeColumnStatistics.getTotalSizeInBytes().ifPresent(size -> columnStatsBuilder.setDataSize(Estimate.of(size))); columnStatsBuilder.setDistinctValuesCount(Estimate.of(deltaLakeColumnStatistics.getNdvSummary().cardinality())); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java index 376b6f076313..2548462128f4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java @@ -257,7 +257,7 @@ private void writeJsonStats(BlockBuilder entryBlockBuilder, RowType entryType, A private Map getColumnTypeMapping(MetadataEntry deltaMetadata) { return extractSchema(deltaMetadata, typeManager).stream() - .collect(toImmutableMap(DeltaLakeColumnMetadata::getName, DeltaLakeColumnMetadata::getType)); + .collect(toImmutableMap(DeltaLakeColumnMetadata::getPhysicalName, DeltaLakeColumnMetadata::getPhysicalColumnType)); } private Optional getStatsString(DeltaLakeJsonFileStatistics parsedStats) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java index a39b6b77e273..3ea113af405b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlterTableCompatibility.java @@ -27,6 +27,7 @@ import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91; import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; @@ -71,7 +72,7 @@ public void testAddColumnWithCommentOnTrino() } } - @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testAddColumnUnsupportedWriterVersion() { @@ -81,14 +82,14 @@ public void testAddColumnUnsupportedWriterVersion() onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='5')", + "TBLPROPERTIES ('delta.minWriterVersion'='6')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col int")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported"); } finally { dropDeltaTableWithRetry("default." + tableName); @@ -196,14 +197,14 @@ public void testCommentOnTableUnsupportedWriterVersion() onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='5')", + "TBLPROPERTIES ('delta.minWriterVersion'='6')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test comment'")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported"); } finally { onTrino().executeQuery("DROP TABLE delta.default." + tableName); @@ -242,20 +243,44 @@ public void testCommentOnColumnUnsupportedWriterVersion() onDelta().executeQuery(format("" + "CREATE TABLE default.%s (col int) " + "USING DELTA LOCATION 's3://%s/%s'" + - "TBLPROPERTIES ('delta.minWriterVersion'='5')", + "TBLPROPERTIES ('delta.minWriterVersion'='6')", tableName, bucketName, tableDirectory)); try { assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".col IS 'test column comment'")) - .hasMessageMatching(".* Table .* requires Delta Lake writer version 5 which is not supported"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported"); } finally { onTrino().executeQuery("DROP TABLE delta.default." + tableName); } } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testOptimizeUnsupportedWriterVersion() + { + String tableName = "test_dl_optimize_unsupported_writer_" + randomNameSuffix(); + String tableDirectory = "databricks-compatibility-test-" + tableName; + + onDelta().executeQuery(format("" + + "CREATE TABLE default.%s (col int) " + + "USING DELTA LOCATION 's3://%s/%s'" + + "TBLPROPERTIES ('delta.minWriterVersion'='6')", + tableName, + bucketName, + tableDirectory)); + + try { + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " EXECUTE OPTIMIZE")) + .hasMessageMatching(".* Table .* requires Delta Lake writer version 6 which is not supported"); + } + finally { + dropDeltaTableWithRetry(tableName); + } + } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testTrinoAlterTablePreservesTableMetadata() diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java index 8246f1da4b02..ccad0c3eae25 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeColumnMappingMode.java @@ -24,8 +24,11 @@ import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.testing.DataProviders.cartesianProduct; +import static io.trino.testing.DataProviders.trueFalse; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_104; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_73; import static io.trino.tests.product.TestGroups.DELTA_LAKE_EXCLUDE_91; import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; @@ -328,9 +331,146 @@ public void testShowStatsOnPartitionedForColumnMappingMode(String mode) } } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDropAndAddColumnShowStatsForColumnMappingMode(String mode) + { + String tableName = "test_dl_drop_add_column_show_stats_for_column_mapping_mode_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a_number INT, b_number INT)" + + " USING delta " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES (" + + " 'delta.columnMapping.mode' = '" + mode + "'" + + ")"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10), (2, 20), (null, null)"); + onTrino().executeQuery("ANALYZE delta.default." + tableName); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, 2.0, 0.33333333333, null, "1", "2"), + row("b_number", null, 2.0, 0.33333333333, null, "10", "20"), + row(null, null, null, null, 3.0, null, null))); + + // Ensure SHOW STATS doesn't return stats for the restored column + onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN b_number"); + onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN b_number INT"); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, 2.0, 0.33333333333, null, "1", "2"), + row("b_number", null, null, null, null, null, null), + row(null, null, null, null, 3.0, null, null))); + + // SHOW STATS returns the expected stats after executing ANALYZE + onTrino().executeQuery("ANALYZE delta.default." + tableName); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, 2.0, 0.33333333333, null, "1", "2"), + row("b_number", 0.0, 0.0, 1.0, null, null, null), + row(null, null, null, null, 3.0, null, null))); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testChangeColumnMappingAndShowStatsForColumnMappingMode() + { + String tableName = "test_dl_change_column_mapping_and_show_stats_for_column_mapping_mode_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a_number INT, b_number INT)" + + " USING delta " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES (" + + " 'delta.columnMapping.mode'='none'," + + " 'delta.minReaderVersion'='2'," + + " 'delta.minWriterVersion'='5')"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 10), (2, 20), (null, null)"); + onTrino().executeQuery("ANALYZE delta.default." + tableName); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly( + row("a_number", null, 2.0, 0.33333333333, null, "1", "2"), + row("b_number", null, 2.0, 0.33333333333, null, "10", "20"), + row(null, null, null, null, 3.0, null, null)); + + onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES('delta.columnMapping.mode'='name')"); + onDelta().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN b_number"); + onDelta().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN b_number INT"); + + // Ensure SHOW STATS doesn't return stats for the restored column + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly( + row("a_number", null, 2.0, 0.33333333333, null, "1", "2"), + row("b_number", null, null, null, null, null, null), + row(null, null, null, null, 3.0, null, null)); + + // SHOW STATS returns the expected stats after executing ANALYZE + onTrino().executeQuery("ANALYZE delta.default." + tableName); + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly( + row("a_number", null, 2.0, 0.33333333333, null, "1", "2"), + row("b_number", 0.0, 0.0, 1.0, null, null, null), + row(null, null, null, null, 3.0, null, null)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "changeColumnMappingDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testChangeColumnMappingMode(String sourceMappingMode, String targetMappingMode, boolean supported) + { + String tableName = "test_dl_change_column_mapping_mode_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a_number INT)" + + " USING delta " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES (" + + " 'delta.columnMapping.mode'='" + sourceMappingMode + "'," + + " 'delta.minReaderVersion'='2'," + + " 'delta.minWriterVersion'='5')"); + try { + if (supported) { + onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES('delta.columnMapping.mode'='" + targetMappingMode + "')"); + } + else { + assertQueryFailure(() -> onDelta().executeQuery("ALTER TABLE default." + tableName + " SET TBLPROPERTIES('delta.columnMapping.mode'='" + targetMappingMode + "')")) + .hasMessageContaining("Changing column mapping mode from '%s' to '%s' is not supported".formatted(sourceMappingMode, targetMappingMode)); + } + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @DataProvider + public Object[][] changeColumnMappingDataProvider() + { + // Update testChangeColumnMappingAndShowStatsForColumnMappingMode if Delta Lake changes their behavior + return new Object[][] { + // sourceMappingMode targetMappingMode supported + {"none", "id", false}, + {"none", "name", true}, + {"id", "none", false}, + {"id", "name", false}, + {"name", "none", false}, + {"name", "id", false}, + }; + } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) - public void testUnsupportedOperationsColumnMappingModeName(String mode) + public void testUnsupportedOperationsColumnMappingMode(String mode) { String tableName = "test_dl_unsupported_column_mapping_mode_" + randomNameSuffix(); @@ -345,19 +485,33 @@ public void testUnsupportedOperationsColumnMappingModeName(String mode) " 'delta.minWriterVersion'='5')"); try { - assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 'one'), (2, 'two')")) - .hasMessageContaining("Delta Lake writer version 5 which is not supported"); - assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM default." + tableName)) - .hasMessageContaining("Delta Lake writer version 5 which is not supported"); - assertQueryFailure(() -> onTrino().executeQuery("UPDATE default." + tableName + " SET a_string = 'test'")) - .hasMessageContaining("Delta Lake writer version 5 which is not supported"); - assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " EXECUTE OPTIMIZE")) - .hasMessageContaining("Delta Lake writer version 5 which is not supported"); - assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " ADD COLUMN new_col varchar")) - .hasMessageContaining("Delta Lake writer version 5 which is not supported"); - assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " RENAME COLUMN a_number TO renamed_column")) + if (!mode.equals("id")) { + assertThat(onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'one'), (2, 'two')")) + .updatedRowsCountIsEqualTo(2); + assertThat(onTrino().executeQuery("DELETE FROM delta.default." + tableName)) + .updatedRowsCountIsEqualTo(2); + assertThat(onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_string = 'test'")) + .updatedRowsCountIsEqualTo(0); + } + else { + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (1, 'one'), (2, 'two')")) + .hasMessageContaining("Writing with column mapping id is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName)) + .hasMessageContaining("Writing with column mapping id is not supported"); + assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_string = 'test'")) + .hasMessageContaining("Writing with column mapping id is not supported"); + } + assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE delta.default." + tableName + " IS 'test comment'")) + .hasMessageContaining("Setting a table comment with column mapping %s is not supported".formatted(mode)); + assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON COLUMN delta.default." + tableName + ".a_number IS 'test comment'")) + .hasMessageContaining("Setting a column comment with column mapping %s is not supported".formatted(mode)); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " EXECUTE OPTIMIZE")) + .hasMessageContaining("Executing 'optimize' procedure with column mapping %s is not supported".formatted(mode)); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " ADD COLUMN new_col varchar")) + .hasMessageContaining("Adding a column with column mapping %s is not supported".formatted(mode)); + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " RENAME COLUMN a_number TO renamed_column")) .hasMessageContaining("This connector does not support renaming columns"); - assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE default." + tableName + " DROP COLUMN a_number")) + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE delta.default." + tableName + " DROP COLUMN a_number")) .hasMessageContaining("This connector does not support dropping columns"); } finally { @@ -396,6 +550,350 @@ public void testSpecialCharacterColumnNamesWithColumnMappingMode(String mode) } } + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingWithTrueAndFalseDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testSupportedNonPartitionedColumnMappingWrites(String mode, boolean statsAsJsonEnabled) + { + String tableName = "test_dl_dml_column_mapping_mode_" + mode + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a_number INT, a_string STRING, array_col ARRAY>, nested STRUCT)" + + " USING delta " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES (" + + " 'delta.checkpointInterval' = 1, " + + " 'delta.checkpoint.writeStatsAsJson' = " + statsAsJsonEnabled + ", " + + " 'delta.checkpoint.writeStatsAsStruct' = " + !statsAsJsonEnabled + ", " + + " 'delta.columnMapping.mode' = '" + mode + "'" + + ")"); + + try { + String trinoColumns = "a_number, a_string, array_col[1].array_struct_element, nested.field1"; + String deltaColumns = "a_number, a_string, array_col[0].array_struct_element, nested.field1"; + + onTrino().executeQuery("INSERT INTO delta.default." + tableName + + " VALUES (1, 'first value', ARRAY[ROW('nested 1')], ROW('databricks 1'))," + + " (2, 'two', ARRAY[ROW('nested 2')], ROW('databricks 2'))," + + " (3, 'third value', ARRAY[ROW('nested 3')], ROW('databricks 3'))," + + " (4, 'four', ARRAY[ROW('nested 4')], ROW('databricks 4'))"); + assertDeltaTrinoTableEquals(tableName, trinoColumns, deltaColumns, ImmutableList.of( + row(1, "first value", "nested 1", "databricks 1"), + row(2, "two", "nested 2", "databricks 2"), + row(3, "third value", "nested 3", "databricks 3"), + row(4, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, 4.0, 0.0, null, "1", "4"), + row("a_string", 29.0, 4.0, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 4.0, null, null))); + + onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_number = a_number + 10 WHERE a_number in (3, 4)"); + onDelta().executeQuery("UPDATE default." + tableName + " SET a_number = a_number + 20 WHERE a_number in (1, 2)"); + assertDeltaTrinoTableEquals(tableName, trinoColumns, deltaColumns, ImmutableList.of( + row(21, "first value", "nested 1", "databricks 1"), + row(22, "two", "nested 2", "databricks 2"), + row(13, "third value", "nested 3", "databricks 3"), + row(14, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, 4.0, 0.0, null, "13", "22"), + row("a_string", 29.0, 4.0, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 4.0, null, null))); + + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 22"); + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 13"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a_number = 21"); + assertDeltaTrinoTableEquals(tableName, trinoColumns, deltaColumns, ImmutableList.of( + row(14, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, 1.0, 0.0, null, "14", "14"), + row("a_string", 29.0, 1.0, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 1.0, null, null))); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingWithTrueAndFalseDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testSupportedPartitionedColumnMappingWrites(String mode, boolean statsAsJsonEnabled) + { + String tableName = "test_dl_dml_column_mapping_mode_" + mode + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + " (a_number INT, a_string STRING, array_col ARRAY>, nested STRUCT)" + + " USING delta " + + " PARTITIONED BY (a_string)" + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "'" + + " TBLPROPERTIES (" + + " 'delta.checkpointInterval' = 1, " + + " 'delta.checkpoint.writeStatsAsJson' = " + statsAsJsonEnabled + ", " + + " 'delta.checkpoint.writeStatsAsStruct' = " + !statsAsJsonEnabled + ", " + + " 'delta.columnMapping.mode' = '" + mode + "'" + + ")"); + + try { + String trinoColumns = "a_number, a_string, array_col[1].array_struct_element, nested.field1"; + String deltaColumns = "a_number, a_string, array_col[0].array_struct_element, nested.field1"; + + onTrino().executeQuery("INSERT INTO delta.default." + tableName + + " VALUES (1, 'first value', ARRAY[ROW('nested 1')], ROW('databricks 1'))," + + " (2, 'two', ARRAY[ROW('nested 2')], ROW('databricks 2'))," + + " (3, 'third value', ARRAY[ROW('nested 3')], ROW('databricks 3'))," + + " (4, 'four', ARRAY[ROW('nested 4')], ROW('databricks 4'))"); + + assertDeltaTrinoTableEquals(tableName, trinoColumns, deltaColumns, ImmutableList.of( + row(1, "first value", "nested 1", "databricks 1"), + row(2, "two", "nested 2", "databricks 2"), + row(3, "third value", "nested 3", "databricks 3"), + row(4, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, 4.0, 0.0, null, "1", "4"), + row("a_string", null, 4.0, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 4.0, null, null))); + + onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a_number = a_number + 10 WHERE a_number in (3, 4)"); + onDelta().executeQuery("UPDATE default." + tableName + " SET a_number = a_number + 20 WHERE a_number in (1, 2)"); + assertDeltaTrinoTableEquals(tableName, trinoColumns, deltaColumns, ImmutableList.of( + row(21, "first value", "nested 1", "databricks 1"), + row(22, "two", "nested 2", "databricks 2"), + row(13, "third value", "nested 3", "databricks 3"), + row(14, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, 4.0, 0.0, null, "13", "22"), + row("a_string", null, 4.0, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 4.0, null, null))); + + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 22"); + onTrino().executeQuery("DELETE FROM delta.default." + tableName + " WHERE a_number = 13"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a_number = 21"); + assertDeltaTrinoTableEquals(tableName, trinoColumns, deltaColumns, ImmutableList.of( + row(14, "four", "nested 4", "databricks 4"))); + + assertThat(onTrino().executeQuery("SHOW STATS FOR delta.default." + tableName)) + .containsOnly(ImmutableList.of( + row("a_number", null, 1.0, 0.0, null, "14", "14"), + row("a_string", null, 1.0, 0.0, null, null, null), + row("array_col", null, null, null, null, null, null), + row("nested", null, null, null, null, null, null), + row(null, null, null, null, 1.0, null, null))); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "supportedColumnMappingForDmlDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testMergeUpdateWithColumnMapping(String mode) + { + String sourceTableName = "test_merge_update_source_column_mapping_mode_" + randomNameSuffix(); + String targetTableName = "test_merge_update_target_column_mapping_mode_" + randomNameSuffix(); + + onDelta().executeQuery("CREATE TABLE default." + targetTableName + " (nationkey INT, name STRING, regionkey INT) " + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName + "'" + + "TBLPROPERTIES ('delta.columnMapping.mode' = '" + mode + "')"); + onDelta().executeQuery("CREATE TABLE default." + sourceTableName + " (nationkey INT, name STRING, regionkey INT) " + + "USING DELTA " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName + "'"); + try { + onDelta().executeQuery("INSERT INTO default." + targetTableName + " VALUES (1, 'nation1', 100), (2, 'nation2', 200), (3, 'nation3', 300)"); + onDelta().executeQuery("INSERT INTO default." + sourceTableName + " VALUES (1000, 'nation1000', 1000), (2, 'nation2', 20000), (3000, 'nation3000', 3000)"); + + onTrino().executeQuery("MERGE INTO delta.default." + targetTableName + " target USING delta.default." + sourceTableName + " source " + + "ON (target.nationkey = source.nationkey) " + + "WHEN MATCHED " + + "THEN UPDATE SET nationkey = (target.nationkey + source.nationkey + source.regionkey) " + + "WHEN NOT MATCHED " + + "THEN INSERT (nationkey, name, regionkey) VALUES (source.nationkey, source.name, source.regionkey)"); + + assertThat(onDelta().executeQuery("SELECT * FROM " + targetTableName)) + .containsOnly( + row(1000, "nation1000", 1000), + row(3000, "nation3000", 3000), + row(1, "nation1", 100), + row(3, "nation3", 300), + row(20004, "nation2", 200)); + } + finally { + dropDeltaTableWithRetry("default." + targetTableName); + dropDeltaTableWithRetry("default." + sourceTableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "supportedColumnMappingForDmlDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testMergeDeleteWithColumnMapping(String mode) + { + String sourceTableName = "test_dl_merge_delete_source_column_mapping_mode_" + mode + randomNameSuffix(); + String targetTableName = "test_dl_merge_delete_target_column_mapping_mode_" + mode + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + sourceTableName + + " (a_number INT, a_string STRING, array_col ARRAY>, nested STRUCT)" + + " USING delta " + + " PARTITIONED BY (a_string)" + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName + "'" + + " TBLPROPERTIES (" + + " 'delta.columnMapping.mode' ='" + mode + "')"); + + onDelta().executeQuery("" + + "CREATE TABLE default." + targetTableName + + " (a_number INT, a_string STRING, array_col ARRAY>, nested STRUCT)" + + " USING delta " + + " PARTITIONED BY (a_string)" + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName + "'" + + " TBLPROPERTIES (" + + " 'delta.columnMapping.mode' ='" + mode + "')"); + try { + onTrino().executeQuery("INSERT INTO delta.default." + sourceTableName + + " VALUES (1, 'first value', ARRAY[ROW('nested 1')], ROW('databricks 1'))," + + " (2, 'two', ARRAY[ROW('nested 2')], ROW('databricks 2'))," + + " (3, 'third value', ARRAY[ROW('nested 3')], ROW('databricks 3'))," + + " (4, 'four', ARRAY[ROW('nested 4')], ROW('databricks 4'))"); + + String trinoColumns = "a_number, a_string, array_col[1].array_struct_element, nested.field1"; + String deltaColumns = "a_number, a_string, array_col[0].array_struct_element, nested.field1"; + assertDeltaTrinoTableEquals(sourceTableName, trinoColumns, deltaColumns, ImmutableList.of( + row(1, "first value", "nested 1", "databricks 1"), + row(2, "two", "nested 2", "databricks 2"), + row(3, "third value", "nested 3", "databricks 3"), + row(4, "four", "nested 4", "databricks 4"))); + + onTrino().executeQuery("INSERT INTO delta.default." + targetTableName + + " VALUES (1000, '1000 value', ARRAY[ROW('nested 1000')], ROW('databricks 1000'))," + + " (2, 'two', ARRAY[ROW('nested 2')], ROW('databricks 2'))"); + onDelta().executeQuery("INSERT INTO default." + targetTableName + + " VALUES (3000, '3000 value', array(struct('nested 3000')), struct('databricks 3000'))," + + " (4, 'four', array(struct('nested 4')), struct('databricks 4'))"); + + assertDeltaTrinoTableEquals(targetTableName, trinoColumns, deltaColumns, ImmutableList.of( + row(1000, "1000 value", "nested 1000", "databricks 1000"), + row(2, "two", "nested 2", "databricks 2"), + row(3000, "3000 value", "nested 3000", "databricks 3000"), + row(4, "four", "nested 4", "databricks 4"))); + + onTrino().executeQuery("MERGE INTO delta.default." + targetTableName + " t USING delta.default." + sourceTableName + " s " + + "ON (t.a_number = s.a_number) " + + "WHEN MATCHED " + + "THEN DELETE " + + "WHEN NOT MATCHED " + + "THEN INSERT (a_number, a_string, array_col, nested) VALUES (s.a_number, s.a_string, s.array_col, s.nested)"); + + assertDeltaTrinoTableEquals(targetTableName, trinoColumns, deltaColumns, ImmutableList.of( + row(1000, "1000 value", "nested 1000", "databricks 1000"), + row(3000, "3000 value", "nested 3000", "databricks 3000"), + row(1, "first value", "nested 1", "databricks 1"), + row(3, "third value", "nested 3", "databricks 3"))); + } + finally { + dropDeltaTableWithRetry("default." + sourceTableName); + dropDeltaTableWithRetry("default." + targetTableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, PROFILE_SPECIFIC_TESTS}, dataProvider = "columnMappingDataProvider") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testUnsupportedColumnMappingModeChangeDataFeed(String mode) + { + String sourceTableName = "test_dl_cdf_target_column_mapping_mode_" + randomNameSuffix(); + String targetTableName = "test_dl_cdf_source_column_mapping_mode_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + targetTableName + + " (nationkey INT, name STRING, regionkey INT)" + + " USING delta " + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + targetTableName + "'" + + " TBLPROPERTIES (" + + " 'delta.columnMapping.mode'='" + mode + "'," + + " 'delta.enableChangeDataFeed' = true" + + ")"); + onDelta().executeQuery("CREATE TABLE default." + sourceTableName + " (nationkey INT, name STRING, regionkey INT) " + + " USING DELTA" + + " LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + sourceTableName + "'"); + + try { + onDelta().executeQuery("INSERT INTO default." + targetTableName + " VALUES (1, 'nation1', 100)"); + onDelta().executeQuery("INSERT INTO default." + targetTableName + " VALUES (2, 'nation2', 200)"); + onDelta().executeQuery("INSERT INTO default." + targetTableName + " VALUES (3, 'nation3', 300)"); + + // Column mapping mode 'none' is tested in TestDeltaLakeDatabricksChangeDataFeedCompatibility + // TODO: Remove these failure check and update TestDeltaLakeDatabricksChangeDataFeedCompatibility when adding support the column mapping mode + if (mode.equals("id")) { + assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + targetTableName + " SET regionkey = 10")) + .hasMessageContaining("Unsupported column mapping mode for tables with change data feed enabled: id"); + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + targetTableName)) + .hasMessageContaining("Unsupported column mapping mode for tables with change data feed enabled: id"); + assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + targetTableName + " cdf USING delta.default." + sourceTableName + " n " + + "ON (cdf.nationkey = n.nationkey) " + + "WHEN MATCHED " + + "THEN UPDATE SET nationkey = (cdf.nationkey + n.nationkey + n.regionkey) " + + "WHEN NOT MATCHED " + + "THEN INSERT (nationkey, name, regionkey) VALUES (n.nationkey, n.name, n.regionkey)")) + .hasMessageContaining("Unsupported column mapping mode for tables with change data feed enabled: id"); + } + else if (mode.equals("name")) { + assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + targetTableName + " SET regionkey = 10")) + .hasMessageContaining("Unsupported column mapping mode for tables with change data feed enabled: " + mode); + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + targetTableName)) + .hasMessageContaining("Unsupported column mapping mode for tables with change data feed enabled: " + mode); + assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + targetTableName + " cdf USING delta.default." + sourceTableName + " n " + + "ON (cdf.nationkey = n.nationkey) " + + "WHEN MATCHED " + + "THEN UPDATE SET nationkey = (cdf.nationkey + n.nationkey + n.regionkey) " + + "WHEN NOT MATCHED " + + "THEN INSERT (nationkey, name, regionkey) VALUES (n.nationkey, n.name, n.regionkey)")) + .hasMessageContaining("Unsupported column mapping mode for tables with change data feed enabled: " + mode); + } + + assertThat(onDelta().executeQuery("SELECT nationkey, name, regionkey, _change_type, _commit_version " + + "FROM table_changes('default." + targetTableName + "', 0)")) + .containsOnly( + row(1, "nation1", 100, "insert", 1), + row(2, "nation2", 200, "insert", 2), + row(3, "nation3", 300, "insert", 3)); + } + finally { + dropDeltaTableWithRetry("default." + targetTableName); + } + } + + private void assertDeltaTrinoTableEquals(String tableName, String trinoQuery, String deltaQuery, List expectedRows) + { + assertThat(onDelta().executeQuery("SELECT " + deltaQuery + " FROM default." + tableName)) + .containsOnly(expectedRows); + assertThat(onTrino().executeQuery("SELECT " + trinoQuery + " FROM delta.default." + tableName)) + .containsOnly(expectedRows); + } + + @DataProvider + public Object[][] columnMappingWithTrueAndFalseDataProvider() + { + return cartesianProduct(supportedColumnMappingForDmlDataProvider(), trueFalse()); + } + @DataProvider public Object[][] columnMappingDataProvider() { @@ -404,4 +902,14 @@ public Object[][] columnMappingDataProvider() {"name"}, }; } + + @DataProvider + public Object[][] supportedColumnMappingForDmlDataProvider() + { + // TODO: Support 'id' column mapping + return new Object[][] { + {"none"}, + {"name"}, + }; + } }