From 865c97371d4233256a222b75f1f6d07f78508ce8 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Fri, 9 Jun 2023 13:26:27 +0900 Subject: [PATCH] Support reading deletion vectors in Delta Lake --- plugin/trino-delta-lake/pom.xml | 5 + .../deltalake/DeltaLakeColumnHandle.java | 5 + .../plugin/deltalake/DeltaLakeMetadata.java | 3 +- .../plugin/deltalake/DeltaLakePageSource.java | 12 +- .../DeltaLakePageSourceProvider.java | 54 ++- .../plugin/deltalake/DeltaLakeSplit.java | 27 +- .../deltalake/DeltaLakeSplitManager.java | 6 + .../deltalake/delete/DeletionVectors.java | 270 +++++++++++++++ .../delete/PositionDeleteFilter.java | 52 +++ .../plugin/deltalake/delete/RowPredicate.java | 38 +++ .../TableChangesFunctionProcessor.java | 3 +- .../transactionlog/AddFileEntry.java | 14 +- .../transactionlog/DeletionVectorEntry.java | 29 ++ .../DeltaLakeSchemaSupport.java | 9 +- .../checkpoint/CheckpointEntryIterator.java | 51 ++- .../checkpoint/CheckpointSchemaManager.java | 13 + .../plugin/deltalake/TestDeltaLakeBasic.java | 11 +- .../deltalake/TestDeltaLakeSplitManager.java | 4 +- .../deltalake/delete/TestDeletionVectors.java | 112 ++++++ .../transactionlog/TestTableSnapshot.java | 12 +- .../checkpoint/TestCheckpointBuilder.java | 6 +- .../TestCheckpointEntryIterator.java | 9 +- .../checkpoint/TestCheckpointWriter.java | 12 +- .../databricks/deletion_vectors/README.md | 13 + .../_delta_log/00000000000000000000.json | 3 + .../_delta_log/00000000000000000001.json | 2 + .../_delta_log/00000000000000000002.json | 3 + ...r_a52eda8c-0a57-4636-814b-9c165388f7ca.bin | Bin 0 -> 43 bytes ...4e53-94c8-2e20a0796fee-c000.snappy.parquet | Bin 0 -> 796 bytes .../TestDeltaLakeDatabricksDelete.java | 322 +++++++++++++++++- 30 files changed, 1047 insertions(+), 53 deletions(-) create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index 1d3a1791c6aa..83fcd353c130 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -144,6 +144,11 @@ trino-hive + + io.trino + trino-hive-formats + + io.trino trino-parquet diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java index 7c829626ddc9..f39d990f0d34 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java @@ -218,6 +218,11 @@ public HiveColumnHandle toHiveColumnHandle() Optional.empty()); } + public static DeltaLakeColumnHandle rowIdColumnHandle() + { + return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, BIGINT, OptionalInt.empty(), ROW_ID_COLUMN_NAME, BIGINT, SYNTHESIZED, Optional.empty()); + } + public static DeltaLakeColumnHandle pathColumnHandle() { return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED, Optional.empty()); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 1c219139b0e8..3a7c2345f312 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -1411,7 +1411,8 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit dataChange, Optional.of(serializeStatsAsJson(info.getStatistics())), Optional.empty(), - ImmutableMap.of())); + ImmutableMap.of(), + Optional.empty())); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java index 59f504fdf287..a6c705040316 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java @@ -15,6 +15,7 @@ import io.airlift.json.JsonCodec; import io.airlift.json.JsonCodecFactory; +import io.trino.plugin.deltalake.delete.RowPredicate; import io.trino.plugin.hive.ReaderProjectionsAdapter; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -33,6 +34,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.function.Supplier; import static com.google.common.base.Throwables.throwIfInstanceOf; import static io.airlift.slice.Slices.utf8Slice; @@ -63,6 +65,7 @@ public class DeltaLakePageSource private final Block partitionsBlock; private final ConnectorPageSource delegate; private final Optional projectionsAdapter; + private final Supplier> deletePredicate; public DeltaLakePageSource( List columns, @@ -73,7 +76,8 @@ public DeltaLakePageSource( Optional projectionsAdapter, String path, long fileSize, - long fileModifiedTime) + long fileModifiedTime, + Supplier> deletePredicate) { int size = columns.size(); requireNonNull(partitionKeys, "partitionKeys is null"); @@ -131,6 +135,7 @@ else if (missingColumnNames.contains(column.getBaseColumnName())) { this.rowIdIndex = rowIdIndex; this.pathBlock = pathBlock; this.partitionsBlock = partitionsBlock; + this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null"); } @Override @@ -168,6 +173,11 @@ public Page getNextPage() if (projectionsAdapter.isPresent()) { dataPage = projectionsAdapter.get().adaptPage(dataPage); } + Optional deleteFilterPredicate = deletePredicate.get(); + if (deleteFilterPredicate.isPresent()) { + dataPage = deleteFilterPredicate.get().filterPage(dataPage); + } + int batchSize = dataPage.getPositionCount(); Block[] blocks = new Block[prefilledBlocks.length]; for (int i = 0; i < prefilledBlocks.length; i++) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 82a0d3bbdbfa..c85997c3281e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.deltalake; +import com.google.common.base.Suppliers; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -24,6 +25,9 @@ import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.reader.MetadataReader; +import io.trino.plugin.deltalake.delete.PositionDeleteFilter; +import io.trino.plugin.deltalake.delete.RowPredicate; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveColumnHandle; @@ -35,6 +39,7 @@ import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.TrinoParquetDataSource; import io.trino.spi.Page; +import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.block.LongArrayBlock; import io.trino.spi.connector.ColumnHandle; @@ -56,6 +61,7 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; import org.joda.time.DateTimeZone; +import org.roaringbitmap.RoaringBitmap; import java.io.IOException; import java.io.UncheckedIOException; @@ -63,6 +69,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -71,12 +78,15 @@ import static io.airlift.slice.SizeOf.SIZE_OF_LONG; import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType; import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME; +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.rowIdColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedNestedReaderEnabled; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedReaderEnabled; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex; +import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN; @@ -134,6 +144,11 @@ public ConnectorPageSource createPageSource( .map(DeltaLakeColumnHandle.class::cast) .collect(toImmutableList()); + List requiredColumns = ImmutableList.builderWithExpectedSize(deltaLakeColumns.size() + 1) + .addAll(deltaLakeColumns) + .add(rowIdColumnHandle()) + .build(); + List regularColumns = deltaLakeColumns.stream() .filter(column -> (column.getColumnType() == REGULAR) || column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME)) .collect(toImmutableList()); @@ -166,6 +181,7 @@ public ConnectorPageSource createPageSource( if (filteredSplitPredicate.isAll() && split.getStart() == 0 && split.getLength() == split.getFileSize() && split.getFileRowCount().isPresent() && + split.getDeletionVector().isEmpty() && (regularColumns.isEmpty() || onlyRowIdColumn(regularColumns))) { return new DeltaLakePageSource( deltaLakeColumns, @@ -176,7 +192,8 @@ public ConnectorPageSource createPageSource( Optional.empty(), split.getPath(), split.getFileSize(), - split.getFileModifiedTime()); + split.getFileModifiedTime(), + Optional::empty); } Location location = Location.of(split.getPath()); @@ -201,6 +218,9 @@ public ConnectorPageSource createPageSource( hiveColumnHandles::add, () -> missingColumnNames.add(column.getBaseColumnName())); } + if (split.getDeletionVector().isPresent() && !regularColumns.contains(rowIdColumnHandle())) { + hiveColumnHandles.add(PARQUET_ROW_INDEX_COLUMN); + } TupleDomain parquetPredicate = getParquetTupleDomain(filteredSplitPredicate.simplify(domainCompactionThreshold), columnMappingMode, parquetFieldIdToName); @@ -224,6 +244,14 @@ public ConnectorPageSource createPageSource( column -> ((HiveColumnHandle) column).getType(), HivePageSourceProvider::getProjection)); + Supplier> deletePredicate = Suppliers.memoize(() -> { + if (split.getDeletionVector().isEmpty()) { + return Optional.empty(); + } + PositionDeleteFilter deleteFilter = readDeletes(session, Location.of(split.getTableLocation()), split.getDeletionVector().get()); + return Optional.of(deleteFilter.createPredicate(requiredColumns)); + }); + return new DeltaLakePageSource( deltaLakeColumns, missingColumnNames.build(), @@ -233,7 +261,29 @@ public ConnectorPageSource createPageSource( projectionsAdapter, split.getPath(), split.getFileSize(), - split.getFileModifiedTime()); + split.getFileModifiedTime(), + deletePredicate); + } + + private PositionDeleteFilter readDeletes( + ConnectorSession session, + Location tableLocation, + DeletionVectorEntry deletionVector) + { + try { + RoaringBitmap[] deletedRows = readDeletionVectors( + fileSystemFactory.create(session), + tableLocation, + deletionVector.storageType(), + deletionVector.pathOrInlineDv(), + deletionVector.offset(), + deletionVector.sizeInBytes(), + deletionVector.cardinality()); + return new PositionDeleteFilter(deletedRows); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed to read deletion vectors", e); + } } public Map loadParquetIdAndNameMapping(TrinoInputFile inputFile, ParquetReaderOptions options) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java index d5398e2a4fa1..3b6704f4278a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.slice.SizeOf; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; @@ -41,34 +42,40 @@ public class DeltaLakeSplit { private static final int INSTANCE_SIZE = instanceSize(DeltaLakeSplit.class); + private final String tableLocation; private final String path; private final long start; private final long length; private final long fileSize; private final Optional fileRowCount; private final long fileModifiedTime; + private final Optional deletionVector; private final SplitWeight splitWeight; private final TupleDomain statisticsPredicate; private final Map> partitionKeys; @JsonCreator public DeltaLakeSplit( + @JsonProperty("tableLocation") String tableLocation, @JsonProperty("path") String path, @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("fileSize") long fileSize, @JsonProperty("rowCount") Optional fileRowCount, @JsonProperty("fileModifiedTime") long fileModifiedTime, + @JsonProperty("deletionVector") Optional deletionVector, @JsonProperty("splitWeight") SplitWeight splitWeight, @JsonProperty("statisticsPredicate") TupleDomain statisticsPredicate, @JsonProperty("partitionKeys") Map> partitionKeys) { + this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); this.path = requireNonNull(path, "path is null"); this.start = start; this.length = length; this.fileSize = fileSize; this.fileRowCount = requireNonNull(fileRowCount, "rowCount is null"); this.fileModifiedTime = fileModifiedTime; + this.deletionVector = requireNonNull(deletionVector, "deletionVector is null"); this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); this.statisticsPredicate = requireNonNull(statisticsPredicate, "statisticsPredicate is null"); this.partitionKeys = requireNonNull(partitionKeys, "partitionKeys is null"); @@ -94,6 +101,12 @@ public SplitWeight getSplitWeight() return splitWeight; } + @JsonProperty + public String getTableLocation() + { + return tableLocation; + } + @JsonProperty public String getPath() { @@ -130,6 +143,12 @@ public long getFileModifiedTime() return fileModifiedTime; } + @JsonProperty + public Optional getDeletionVector() + { + return deletionVector; + } + /** * A TupleDomain representing the min/max statistics from the file this split was generated from. This does not contain any partitioning information. */ @@ -149,8 +168,10 @@ public Map> getPartitionKeys() public long getRetainedSizeInBytes() { return INSTANCE_SIZE + + estimatedSizeOf(tableLocation) + estimatedSizeOf(path) + sizeOf(fileRowCount, value -> LONG_INSTANCE_SIZE) + + sizeOf(deletionVector, DeletionVectorEntry::sizeInBytes) + splitWeight.getRetainedSizeInBytes() + statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::getRetainedSizeInBytes) + estimatedSizeOf(partitionKeys, SizeOf::estimatedSizeOf, value -> sizeOf(value, SizeOf::estimatedSizeOf)); @@ -170,11 +191,13 @@ public Object getInfo() public String toString() { return toStringHelper(this) + .add("tableLocation", tableLocation) .add("path", path) .add("start", start) .add("length", length) .add("fileSize", fileSize) .add("rowCount", fileRowCount) + .add("deletionVector", deletionVector) .add("statisticsPredicate", statisticsPredicate) .add("partitionKeys", partitionKeys) .toString(); @@ -193,8 +216,10 @@ public boolean equals(Object o) return start == that.start && length == that.length && fileSize == that.fileSize && + tableLocation.equals(that.tableLocation) && path.equals(that.path) && fileRowCount.equals(that.fileRowCount) && + deletionVector.equals(that.deletionVector) && Objects.equals(statisticsPredicate, that.statisticsPredicate) && Objects.equals(partitionKeys, that.partitionKeys); } @@ -202,6 +227,6 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(path, start, length, fileSize, fileRowCount, statisticsPredicate, partitionKeys); + return Objects.hash(path, start, length, fileSize, fileRowCount, deletionVector, statisticsPredicate, partitionKeys); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index b0cff466065e..11fe06d01a48 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -227,6 +227,7 @@ private Stream getSplits( return splitsForFile( session, + tableHandle.location(), addAction, splitPath, addAction.getCanonicalPartitionValues(), @@ -273,6 +274,7 @@ private static boolean pathMatchesPredicate(Domain pathDomain, String path) private List splitsForFile( ConnectorSession session, + String tableLocation, AddFileEntry addFileEntry, String splitPath, Map> partitionKeys, @@ -285,12 +287,14 @@ private List splitsForFile( if (!splittable) { // remainingInitialSplits is not used when !splittable return ImmutableList.of(new DeltaLakeSplit( + tableLocation, splitPath, 0, fileSize, fileSize, addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords), addFileEntry.getModificationTime(), + addFileEntry.getDeletionVector(), SplitWeight.standard(), statisticsPredicate, partitionKeys)); @@ -309,12 +313,14 @@ private List splitsForFile( long splitSize = Math.min(maxSplitSize, fileSize - currentOffset); splits.add(new DeltaLakeSplit( + tableLocation, splitPath, currentOffset, splitSize, fileSize, Optional.empty(), addFileEntry.getModificationTime(), + addFileEntry.getDeletionVector(), SplitWeight.fromProportion(Math.min(Math.max((double) splitSize / maxSplitSize, minimumAssignedSplitWeight), 1.0)), statisticsPredicate, partitionKeys)); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java new file mode 100644 index 000000000000..197de3589b7e --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java @@ -0,0 +1,270 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import com.google.common.base.CharMatcher; +import io.airlift.log.Logger; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.hive.formats.TrinoDataInputStream; +import io.trino.spi.TrinoException; +import org.roaringbitmap.RoaringBitmap; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.OptionalInt; +import java.util.UUID; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.Math.toIntExact; +import static java.nio.ByteOrder.LITTLE_ENDIAN; +import static java.nio.charset.StandardCharsets.US_ASCII; +import static java.nio.charset.StandardCharsets.UTF_8; + +// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-format +public final class DeletionVectors +{ + private static final Logger log = Logger.get(DeletionVectors.class); + private static final int PORTABLE_ROARING_BITMAP_MAGIC_NUMBER = 1681511377; + + private static final String UUID_MARKER = "u"; // relative path with random prefix on disk + private static final String PATH_MARKER = "p"; // absolute path on disk + private static final String INLINE_MARKER = "i"; // inline + + private static final CharMatcher ALPHANUMERIC = CharMatcher.inRange('A', 'Z').or(CharMatcher.inRange('a', 'z')).or(CharMatcher.inRange('0', '9')).precomputed(); + + private DeletionVectors() {} + + public static RoaringBitmap[] readDeletionVectors( + TrinoFileSystem fileSystem, + Location location, + String storageType, + String pathOrInlineDv, + OptionalInt offset, + int sizeInBytes, + long cardinality) + throws IOException + { + if (storageType.equals(UUID_MARKER)) { + TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(pathOrInlineDv))); + ByteBuffer buffer = readDeletionVector(inputFile, offset.orElseThrow(), sizeInBytes); + RoaringBitmap[] bitmaps = deserializeDeletionVectors(buffer); + if (bitmaps.length != cardinality) { + // Don't throw an exception because Databricks may report the wrong cardinality when there are many deleted rows + log.debug("The number of deleted rows expects %s but got %s", cardinality, bitmaps.length); + } + return bitmaps; + } + else if (storageType.equals(INLINE_MARKER) || storageType.equals(PATH_MARKER)) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + storageType); + } + throw new IllegalArgumentException("Unexpected storage type: " + storageType); + } + + public static String toFileName(String pathOrInlineDv) + { + int randomPrefixLength = pathOrInlineDv.length() - Base85Codec.ENCODED_UUID_LENGTH; + String randomPrefix = pathOrInlineDv.substring(0, randomPrefixLength); + checkArgument(ALPHANUMERIC.matchesAllOf(randomPrefix), "Random prefix must be alphanumeric: %s", randomPrefix); + String prefix = randomPrefix.isEmpty() ? "" : randomPrefix + "/"; + String encodedUuid = pathOrInlineDv.substring(randomPrefixLength); + UUID uuid = Base85Codec.decodeUuid(encodedUuid); + return "%sdeletion_vector_%s.bin".formatted(prefix, uuid); + } + + public static ByteBuffer readDeletionVector(TrinoInputFile inputFile, int offset, int expectedSize) + throws IOException + { + byte[] bytes = new byte[expectedSize]; + TrinoDataInputStream inputStream = new TrinoDataInputStream(inputFile.newStream()); + inputStream.seek(offset); + int actualSize = inputStream.readInt(); + if (actualSize != expectedSize) { + // TODO: Investigate why these size differ + log.warn("The size of deletion vector %s expects %s but got %s", inputFile.location(), expectedSize, actualSize); + } + inputStream.readFully(bytes); + return ByteBuffer.wrap(bytes).order(LITTLE_ENDIAN); + } + + public static boolean contains(RoaringBitmap[] bitmaps, long value) + { + int high = highBytes(value); + if (high >= bitmaps.length) { + return false; + } + RoaringBitmap highBitmap = bitmaps[high]; + int low = lowBytes(value); + return highBitmap.contains(low); + } + + private static int highBytes(long value) + { + return toIntExact(value >> 32); + } + + private static int lowBytes(long value) + { + return toIntExact(value); + } + + public static RoaringBitmap[] deserializeDeletionVectors(ByteBuffer buffer) + throws IOException + { + checkArgument(buffer.order() == LITTLE_ENDIAN, "Byte order must be little endian: %s", buffer.order()); + int magicNumber = buffer.getInt(); + if (magicNumber == PORTABLE_ROARING_BITMAP_MAGIC_NUMBER) { + int size = toIntExact(buffer.getLong()); + RoaringBitmap[] bitmaps = new RoaringBitmap[size]; + for (int i = 0; i < size; i++) { + int key = buffer.getInt(); + checkArgument(key >= 0); + RoaringBitmap bitmap = new RoaringBitmap(); + bitmap.deserialize(buffer); + bitmaps[i] = bitmap; + buffer.position(buffer.position() + bitmap.serializedSizeInBytes()); + } + return bitmaps; + } + throw new IllegalArgumentException("Unsupported magic number: " + magicNumber); + } + + // This implements Base85 using the 4 byte block aligned encoding and character set from Z85 https://rfc.zeromq.org/spec/32 + // Delta Lake implementation is https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/util/Codec.scala + static final class Base85Codec + { + private static final long BASE = 85L; + private static final long BASE_2ND_POWER = 7225L; // 85^2 + private static final long BASE_3RD_POWER = 614125L; // 85^3 + private static final long BASE_4TH_POWER = 52200625L; // 85^4 + private static final int ASCII_BITMASK = 0x7F; + + // UUIDs always encode into 20 characters + static final int ENCODED_UUID_LENGTH = 20; + + private static final String BASE85_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#"; + + private static final byte[] ENCODE_MAP = BASE85_CHARACTERS.getBytes(UTF_8); + // The bitmask is the same as largest possible value, so the length of the array must be one greater. + private static final byte[] DECODE_MAP = new byte[ASCII_BITMASK + 1]; + + static { + Arrays.fill(DECODE_MAP, (byte) -1); + for (int i = 0; i < ENCODE_MAP.length; i++) { + DECODE_MAP[ENCODE_MAP[i]] = (byte) i; + } + } + + private Base85Codec() {} + + // This method will be used when supporting https://github.com/trinodb/trino/issues/17063 + // This is used for testing codec round trip for now + public static String encodeBytes(byte[] input) + { + if (input.length % 4 == 0) { + return encodeBlocks(ByteBuffer.wrap(input)); + } + int alignedLength = ((input.length + 4) / 4) * 4; + ByteBuffer buffer = ByteBuffer.allocate(alignedLength); + buffer.put(input); + while (buffer.hasRemaining()) { + buffer.put((byte) 0); + } + buffer.rewind(); + return encodeBlocks(buffer); + } + + private static String encodeBlocks(ByteBuffer buffer) + { + checkArgument(buffer.remaining() % 4 == 0); + int numBlocks = buffer.remaining() / 4; + // Every 4 byte block gets encoded into 5 bytes/chars + int outputLength = numBlocks * 5; + byte[] output = new byte[outputLength]; + int outputIndex = 0; + + while (buffer.hasRemaining()) { + long sum = buffer.getInt() & 0x00000000ffffffffL; + output[outputIndex] = ENCODE_MAP[(int) (sum / BASE_4TH_POWER)]; + sum %= BASE_4TH_POWER; + output[outputIndex + 1] = ENCODE_MAP[(int) (sum / BASE_3RD_POWER)]; + sum %= BASE_3RD_POWER; + output[outputIndex + 2] = ENCODE_MAP[(int) (sum / BASE_2ND_POWER)]; + sum %= BASE_2ND_POWER; + output[outputIndex + 3] = ENCODE_MAP[(int) (sum / BASE)]; + output[outputIndex + 4] = ENCODE_MAP[(int) (sum % BASE)]; + outputIndex += 5; + } + return new String(output, US_ASCII); + } + + public static ByteBuffer decodeBlocks(String encoded) + { + char[] input = encoded.toCharArray(); + checkArgument(input.length % 5 == 0, "Input should be 5 character aligned"); + ByteBuffer buffer = ByteBuffer.allocate(input.length / 5 * 4); + + int inputIndex = 0; + while (buffer.hasRemaining()) { + long sum = 0; + sum += decodeInputChar(input[inputIndex]) * BASE_4TH_POWER; + sum += decodeInputChar(input[inputIndex + 1]) * BASE_3RD_POWER; + sum += decodeInputChar(input[inputIndex + 2]) * BASE_2ND_POWER; + sum += decodeInputChar(input[inputIndex + 3]) * BASE; + sum += decodeInputChar(input[inputIndex + 4]); + buffer.putInt((int) sum); + inputIndex += 5; + } + buffer.rewind(); + return buffer; + } + + public static UUID decodeUuid(String encoded) + { + ByteBuffer buffer = decodeBlocks(encoded); + return uuidFromByteBuffer(buffer); + } + + private static UUID uuidFromByteBuffer(ByteBuffer buffer) + { + checkArgument(buffer.remaining() >= 16); + long highBits = buffer.getLong(); + long lowBits = buffer.getLong(); + return new UUID(highBits, lowBits); + } + + // This method will be used when supporting https://github.com/trinodb/trino/issues/17063 + // This is used for testing codec round trip for now + public static byte[] decodeBytes(String encoded, int outputLength) + { + ByteBuffer result = decodeBlocks(encoded); + if (result.remaining() > outputLength) { + // Only read the expected number of bytes + byte[] output = new byte[outputLength]; + result.get(output); + return output; + } + return result.array(); + } + + private static long decodeInputChar(char chr) + { + checkArgument(BASE85_CHARACTERS.contains(String.valueOf(chr)), "%s is not valid Base85 character", chr); + return DECODE_MAP[chr & ASCII_BITMASK]; + } + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java new file mode 100644 index 000000000000..d2b19952b510 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import io.trino.plugin.deltalake.DeltaLakeColumnHandle; +import org.roaringbitmap.RoaringBitmap; + +import java.util.List; + +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME; +import static io.trino.spi.type.BigintType.BIGINT; +import static java.util.Objects.requireNonNull; + +public final class PositionDeleteFilter +{ + private final RoaringBitmap[] deletedRows; + + public PositionDeleteFilter(RoaringBitmap[] deletedRows) + { + this.deletedRows = requireNonNull(deletedRows, "deletedRows is null"); + } + + public RowPredicate createPredicate(List columns) + { + int filePositionChannel = rowPositionChannel(columns); + return (page, position) -> { + long filePosition = BIGINT.getLong(page.getBlock(filePositionChannel), position); + return !DeletionVectors.contains(deletedRows, filePosition); + }; + } + + private static int rowPositionChannel(List columns) + { + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getBaseColumnName().equals(ROW_ID_COLUMN_NAME)) { + return i; + } + } + throw new IllegalArgumentException("No row position column"); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java new file mode 100644 index 000000000000..22b51b3ebb28 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import io.trino.spi.Page; + +public interface RowPredicate +{ + boolean test(Page page, int position); + + default Page filterPage(Page page) + { + int positionCount = page.getPositionCount(); + int[] retained = new int[positionCount]; + int retainedCount = 0; + for (int position = 0; position < positionCount; position++) { + if (test(page, position)) { + retained[retainedCount] = position; + retainedCount++; + } + } + if (retainedCount == positionCount) { + return page; + } + return page.getPositions(retained, 0, retainedCount); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java index a342ffc502cd..1934726d8fd9 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java @@ -209,6 +209,7 @@ private DeltaLakePageSource createDeltaLakePageSource(TableChangesSplit split) Optional.empty(), split.path(), split.fileSize(), - 0L); + 0L, + Optional::empty); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java index 1cb4475067fb..a4884ed7c5cb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java @@ -34,6 +34,7 @@ import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson; import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues; import static java.lang.String.format; +import static java.util.Objects.requireNonNull; public class AddFileEntry { @@ -47,6 +48,7 @@ public class AddFileEntry private final long modificationTime; private final boolean dataChange; private final Map tags; + private final Optional deletionVector; private final Optional parsedStats; @JsonCreator @@ -58,7 +60,8 @@ public AddFileEntry( @JsonProperty("dataChange") boolean dataChange, @JsonProperty("stats") Optional stats, @JsonProperty("parsedStats") Optional parsedStats, - @JsonProperty("tags") @Nullable Map tags) + @JsonProperty("tags") @Nullable Map tags, + @JsonProperty("deletionVector") Optional deletionVector) { this.path = path; this.partitionValues = partitionValues; @@ -67,6 +70,7 @@ public AddFileEntry( this.modificationTime = modificationTime; this.dataChange = dataChange; this.tags = tags; + this.deletionVector = requireNonNull(deletionVector, "deletionVector is null"); Optional resultParsedStats = Optional.empty(); if (parsedStats.isPresent()) { @@ -149,6 +153,12 @@ public Map getTags() return tags; } + @JsonProperty + public Optional getDeletionVector() + { + return deletionVector; + } + @Override public String toString() { @@ -173,6 +183,7 @@ public boolean equals(Object o) Objects.equals(partitionValues, that.partitionValues) && Objects.equals(canonicalPartitionValues, that.canonicalPartitionValues) && Objects.equals(tags, that.tags) && + Objects.equals(deletionVector, that.deletionVector) && Objects.equals(parsedStats, that.parsedStats); } @@ -187,6 +198,7 @@ public int hashCode() modificationTime, dataChange, tags, + deletionVector, parsedStats); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java new file mode 100644 index 000000000000..94719ae02c71 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog; + +import java.util.OptionalInt; + +import static java.util.Objects.requireNonNull; + +// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-descriptor-schema +public record DeletionVectorEntry(String storageType, String pathOrInlineDv, OptionalInt offset, int sizeInBytes, long cardinality) +{ + public DeletionVectorEntry + { + requireNonNull(storageType, "storageType is null"); + requireNonNull(pathOrInlineDv, "pathOrInlineDv is null"); + requireNonNull(offset, "offset is null"); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java index 558475900bb8..f9b127c686c8 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java @@ -87,11 +87,13 @@ private DeltaLakeSchemaSupport() {} public static final String APPEND_ONLY_CONFIGURATION_KEY = "delta.appendOnly"; public static final String COLUMN_MAPPING_MODE_CONFIGURATION_KEY = "delta.columnMapping.mode"; public static final String MAX_COLUMN_ID_CONFIGURATION_KEY = "delta.columnMapping.maxColumnId"; + private static final String DELETION_VECTORS_CONFIGURATION_KEY = "delta.enableDeletionVectors"; // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features - // TODO: Add support for 'deletionVectors' and 'timestampNTZ' reader features + // TODO: Add support for 'timestampNTZ' reader features private static final Set SUPPORTED_READER_FEATURES = ImmutableSet.builder() .add("columnMapping") + .add("deletionVectors") .build(); public enum ColumnMappingMode @@ -123,6 +125,11 @@ public static boolean isAppendOnly(MetadataEntry metadataEntry) return parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false")); } + public static boolean deletionVectorsEnabled(Map configuration) + { + return parseBoolean(configuration.get(DELETION_VECTORS_CONFIGURATION_KEY)); + } + public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata) { String columnMappingMode = metadata.getConfiguration().getOrDefault(COLUMN_MAPPING_MODE_CONFIGURATION_KEY, "none"); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java index 88c6301542ad..78ed32d5794c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java @@ -22,6 +22,7 @@ import io.trino.plugin.deltalake.DeltaLakeColumnMetadata; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; +import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; @@ -72,6 +73,7 @@ import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.deletionVectorsEnabled; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.columnsWithStats; import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.START_OF_MODERN_ERA; @@ -359,18 +361,24 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo if (block.isNull(pagePosition)) { return null; } + boolean deletionVectorsEnabled = deletionVectorsEnabled(metadataEntry.getConfiguration()); Block addEntryBlock = block.getObject(pagePosition, Block.class); log.debug("Block %s has %s fields", block, addEntryBlock.getPositionCount()); - Map partitionValues = getMap(addEntryBlock, 1); - long size = getLong(addEntryBlock, 2); - long modificationTime = getLong(addEntryBlock, 3); - boolean dataChange = getByte(addEntryBlock, 4) != 0; - Map tags = getMap(addEntryBlock, 7); + int position = 0; + String path = getString(addEntryBlock, position++); + Map partitionValues = getMap(addEntryBlock, position++); + long size = getLong(addEntryBlock, position++); + long modificationTime = getLong(addEntryBlock, position++); + boolean dataChange = getByte(addEntryBlock, position++) != 0; + Optional deletionVector = Optional.empty(); + if (deletionVectorsEnabled) { + deletionVector = Optional.of(parseDeletionVectorFromParquet(addEntryBlock.getObject(position++, Block.class))); + } + Map tags = getMap(addEntryBlock, position + 2); - String path = getString(addEntryBlock, 0); AddFileEntry result; - if (!addEntryBlock.isNull(6)) { + if (!addEntryBlock.isNull(position + 1)) { result = new AddFileEntry( path, partitionValues, @@ -378,19 +386,21 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo modificationTime, dataChange, Optional.empty(), - Optional.of(parseStatisticsFromParquet(addEntryBlock.getObject(6, Block.class))), - tags); + Optional.of(parseStatisticsFromParquet(addEntryBlock.getObject(position + 1, Block.class))), + tags, + deletionVector); } - else if (!addEntryBlock.isNull(5)) { + else if (!addEntryBlock.isNull(position)) { result = new AddFileEntry( path, partitionValues, size, modificationTime, dataChange, - Optional.of(getString(addEntryBlock, 5)), + Optional.of(getString(addEntryBlock, position)), Optional.empty(), - tags); + tags, + deletionVector); } else { result = new AddFileEntry( @@ -401,13 +411,28 @@ else if (!addEntryBlock.isNull(5)) { dataChange, Optional.empty(), Optional.empty(), - tags); + tags, + deletionVector); } log.debug("Result: %s", result); return DeltaLakeTransactionLogEntry.addFileEntry(result); } + private DeletionVectorEntry parseDeletionVectorFromParquet(Block block) + { + int size = block.getPositionCount(); + checkArgument(size == 4 || size == 5, "Deletion vector entry must have 4 or 5 fields"); + + int position = 0; + String storageType = getString(block, position++); + String pathOrInlineDv = getString(block, position++); + OptionalInt offset = size == 4 ? OptionalInt.empty() : OptionalInt.of(getInt(block, position++)); + int sizeInBytes = getInt(block, position++); + long cardinality = getLong(block, position++); + return new DeletionVectorEntry(storageType, pathOrInlineDv, offset, sizeInBytes, cardinality); + } + private DeltaLakeParquetFileStatistics parseStatisticsFromParquet(Block statsRowBlock) { if (metadataEntry == null) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java index 9125de8b63e9..fe4a4680bfe3 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java @@ -34,6 +34,7 @@ import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.deletionVectorsEnabled; import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema; import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.columnsWithStats; import static java.util.Objects.requireNonNull; @@ -42,6 +43,14 @@ public class CheckpointSchemaManager { private final TypeManager typeManager; + private static final RowType DELETION_VECTORS_TYPE = RowType.from(ImmutableList.builder() + .add(RowType.field("storageType", VarcharType.VARCHAR)) + .add(RowType.field("pathOrInlineDv", VarcharType.VARCHAR)) + .add(RowType.field("offset", IntegerType.INTEGER)) + .add(RowType.field("sizeInBytes", IntegerType.INTEGER)) + .add(RowType.field("cardinality", BigintType.BIGINT)) + .build()); + private static final RowType TXN_ENTRY_TYPE = RowType.from(ImmutableList.of( RowType.field("appId", VarcharType.createUnboundedVarcharType()), RowType.field("version", BigintType.BIGINT), @@ -106,6 +115,7 @@ public RowType getAddEntryType(MetadataEntry metadataEntry, boolean requireWrite { List allColumns = extractSchema(metadataEntry, typeManager); List minMaxColumns = columnsWithStats(metadataEntry, typeManager); + boolean deletionVectorEnabled = deletionVectorsEnabled(metadataEntry.getConfiguration()); ImmutableList.Builder minMaxFields = ImmutableList.builder(); for (DeltaLakeColumnMetadata dataColumn : minMaxColumns) { @@ -139,6 +149,9 @@ public RowType getAddEntryType(MetadataEntry metadataEntry, boolean requireWrite addFields.add(RowType.field("size", BigintType.BIGINT)); addFields.add(RowType.field("modificationTime", BigintType.BIGINT)); addFields.add(RowType.field("dataChange", BooleanType.BOOLEAN)); + if (deletionVectorEnabled) { + addFields.add(RowType.field("deletionVector", DELETION_VECTORS_TYPE)); + } if (requireWriteStatsAsJson) { addFields.add(RowType.field("stats", VarcharType.createUnboundedVarcharType())); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 79bdba2cafa2..8c1c6a87eb70 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -64,7 +64,7 @@ public class TestDeltaLakeBasic private static final List PERSON_TABLES = ImmutableList.of( "person", "person_without_last_checkpoint", "person_without_old_jsons", "person_without_checkpoints"); - private static final List OTHER_TABLES = ImmutableList.of("no_column_stats"); + private static final List OTHER_TABLES = ImmutableList.of("no_column_stats", "deletion_vectors"); // The col-{uuid} pattern for delta.columnMapping.physicalName private static final Pattern PHYSICAL_COLUMN_NAME_PATTERN = Pattern.compile("^col-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"); @@ -217,6 +217,15 @@ public Object[][] columnMappingModeDataProvider() }; } + /** + * @see databricks.deletion_vectors + */ + @Test + public void testDeletionVectors() + { + assertQuery("SELECT * FROM deletion_vectors", "VALUES (1, 11)"); + } + @Test public void testCorruptedManagedTableLocation() throws Exception diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java index 666a5e3cb986..d7a809b9cd41 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java @@ -181,13 +181,13 @@ public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorS private AddFileEntry addFileEntryOfSize(long fileSize) { - return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of()); + return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty()); } private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight) { SplitWeight splitWeight = SplitWeight.fromProportion(Math.min(Math.max((double) fileSize / splitSize, minimumAssignedSplitWeight), 1.0)); - return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, splitWeight, TupleDomain.all(), ImmutableMap.of()); + return new DeltaLakeSplit(TABLE_PATH, FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), splitWeight, TupleDomain.all(), ImmutableMap.of()); } private List getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java new file mode 100644 index 000000000000..ae6d1e4bb225 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.delete; + +import com.google.common.io.Resources; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import org.roaringbitmap.RoaringBitmap; +import org.testng.annotations.Test; + +import java.io.File; +import java.nio.file.Path; +import java.util.OptionalInt; + +import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION; +import static io.trino.plugin.deltalake.delete.DeletionVectors.Base85Codec.decodeBlocks; +import static io.trino.plugin.deltalake.delete.DeletionVectors.Base85Codec.decodeBytes; +import static io.trino.plugin.deltalake.delete.DeletionVectors.Base85Codec.encodeBytes; +import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors; +import static io.trino.plugin.deltalake.delete.DeletionVectors.toFileName; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class TestDeletionVectors +{ + @Test + public void testUuidStorageType() + throws Exception + { + // The deletion vector has a deleted row at position 1 + Path path = new File(Resources.getResource("databricks/deletion_vectors").toURI()).toPath(); + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + + RoaringBitmap[] bitmaps = readDeletionVectors(fileSystem, Location.of(path.toString()), "u", "R7QFX3rGXPFLhHGq&7g<", OptionalInt.of(1), 34, 1); + assertThat(bitmaps).hasSize(1); + assertFalse(bitmaps[0].contains(0)); + assertTrue(bitmaps[0].contains(1)); + assertFalse(bitmaps[0].contains(2)); + } + + @Test + public void testUnsupportedPathStorageType() + { + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), "p", "s3://bucket/table/deletion_vector.bin", OptionalInt.empty(), 40, 1)) + .hasMessageContaining("Unsupported storage type for deletion vector: p"); + } + + @Test + public void testUnsupportedInlineStorageType() + { + TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); + assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), "i", "wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", OptionalInt.empty(), 40, 1)) + .hasMessageContaining("Unsupported storage type for deletion vector: i"); + } + + @Test + public void testToFileName() + { + assertEquals(toFileName("R7QFX3rGXPFLhHGq&7g<"), "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin"); + assertEquals(toFileName("ab^-aqEH.-t@S}K{vb[*k^"), "ab/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin"); + } + + @Test + public void testEncodeBytes() + { + // The test case comes from https://rfc.zeromq.org/spec/32 + byte[] inputBytes = new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B}; + String encoded = encodeBytes(inputBytes); + assertEquals(encoded, "HelloWorld"); + } + + @Test + public void testDecodeBytes() + { + String data = "HelloWorld"; + byte[] bytes = decodeBytes(data, 8); + assertEquals(bytes, new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B}); + } + + @Test + public void testDecodeBlocksIllegalCharacter() + { + assertThatThrownBy(() -> decodeBlocks("ab" + 0x7F + "de")).hasMessageContaining("Input should be 5 character aligned"); + + assertThatThrownBy(() -> decodeBlocks("abîde")).hasMessageContaining("î is not valid Base85 character"); + assertThatThrownBy(() -> decodeBlocks("abπde")).hasMessageContaining("π is not valid Base85 character"); + assertThatThrownBy(() -> decodeBlocks("ab\"de")).hasMessageContaining("\" is not valid Base85 character"); + } + + @Test + public void testCodecRoundTrip() + { + assertEquals("HelloWorld", encodeBytes(decodeBytes("HelloWorld", 8))); + assertEquals("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", encodeBytes(decodeBytes("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", 40))); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java index f28544ce9c8d..5303e55a365d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java @@ -129,7 +129,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); assertThat(entries).element(7).extracting(DeltaLakeTransactionLogEntry::getAdd).isEqualTo( new AddFileEntry( @@ -145,7 +146,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); } // lets read two entry types in one call; add and protocol @@ -169,7 +171,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); assertThat(entries).element(6).extracting(DeltaLakeTransactionLogEntry::getProtocol).isEqualTo(new ProtocolEntry(1, 2, Optional.empty(), Optional.empty())); @@ -187,7 +190,8 @@ public void readsCheckpointFile() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java index f19248c7fbb2..738272a7f381 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java @@ -58,10 +58,10 @@ public void testCheckpointBuilder() builder.addLogEntry(transactionEntry(app1TransactionV1)); builder.addLogEntry(transactionEntry(app2TransactionV5)); - AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of()); + AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); RemoveFileEntry removeA1 = new RemoveFileEntry("a", 1, true); - AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of()); - AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of()); + AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); + AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty()); RemoveFileEntry removeB = new RemoveFileEntry("b", 1, true); RemoveFileEntry removeC = new RemoveFileEntry("c", 1, true); builder.addLogEntry(addFileEntry(addA1)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java index 7401f3ae308a..7facacadf840 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java @@ -129,7 +129,8 @@ public void testReadAddEntries() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); assertThat(entries).element(7).extracting(DeltaLakeTransactionLogEntry::getAdd).isEqualTo( new AddFileEntry( @@ -145,7 +146,8 @@ public void testReadAddEntries() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); } @Test @@ -190,7 +192,8 @@ public void testReadAllEntries() "\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" + "}"), Optional.empty(), - null)); + null, + Optional.empty())); // RemoveFileEntry assertThat(entries).element(3).extracting(DeltaLakeTransactionLogEntry::getRemove).isEqualTo( diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index 3adbc82eed38..0e7edd7d3d14 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -178,7 +178,8 @@ public void testCheckpointWriteReadJsonRoundtrip() Optional.empty(), ImmutableMap.of( "someTag", "someValue", - "otherTag", "otherValue")); + "otherTag", "otherValue"), + Optional.empty()); RemoveFileEntry removeFileEntry = new RemoveFileEntry( "removeFilePath", @@ -314,7 +315,8 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() .buildOrThrow()))), ImmutableMap.of( "someTag", "someValue", - "otherTag", "otherValue")); + "otherTag", "otherValue"), + Optional.empty()); RemoveFileEntry removeFileEntry = new RemoveFileEntry( "removeFilePath", @@ -390,7 +392,8 @@ public void testDisablingRowStatistics() "row", RowBlock.fromFieldBlocks(1, Optional.empty(), minMaxRowFieldBlocks).getSingleValueBlock(0))), Optional.of(ImmutableMap.of( "row", RowBlock.fromFieldBlocks(1, Optional.empty(), nullCountRowFieldBlocks).getSingleValueBlock(0))))), - ImmutableMap.of()); + ImmutableMap.of(), + Optional.empty()); CheckpointEntries entries = new CheckpointEntries( metadataEntry, @@ -428,7 +431,8 @@ private AddFileEntry makeComparable(AddFileEntry original) original.isDataChange(), original.getStatsString(), makeComparable(original.getStats()), - original.getTags()); + original.getTags(), + original.getDeletionVector()); } private Optional makeComparable(Optional original) diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md new file mode 100644 index 000000000000..f30f3b1279ae --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md @@ -0,0 +1,13 @@ +Data generated using Databricks 12.2: + +```sql +CREATE TABLE default.test_deletion_vectors ( + a INT, + b INT) +USING delta +LOCATION 's3://trino-ci-test/test_deletion_vectors' +TBLPROPERTIES ('delta.enableDeletionVectors' = true); + +INSERT INTO default.test_deletion_vectors VALUES (1, 11), (2, 22); +DELETE FROM default.test_deletion_vectors WHERE a = 2; +``` diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..4a5d53407173 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1682326581374,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.enableDeletionVectors\":\"true\"}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"2cbfa481-d2b0-4f59-83f9-1261492dfd46"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}} +{"metaData":{"id":"32f26f4b-95ba-4980-b209-0132e949b3e4","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true"},"createdTime":1682326580906}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json new file mode 100644 index 000000000000..7a5e8e6418b8 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1682326587253,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"2","numOutputBytes":"796"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"99cd5421-a1b9-40c6-8063-7298ec935fd6"}} +{"add":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","partitionValues":{},"size":796,"modificationTime":1682326588000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":1,\"b\":11},\"maxValues\":{\"a\":2,\"b\":22},\"nullCount\":{\"a\":0,\"b\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json new file mode 100644 index 000000000000..00f135f1c8d2 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1682326592314,"operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.default.test_deletion_vectors_vsipbnhjjg.a = 2)\"]"},"readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"0","numRemovedBytes":"0","numCopiedRows":"0","numDeletionVectorsAdded":"1","numDeletionVectorsRemoved":"0","numAddedChangeFiles":"0","executionTimeMs":"2046","numDeletedRows":"1","scanTimeMs":"1335","numAddedFiles":"0","numAddedBytes":"0","rewriteTimeMs":"709"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"219ffc4f-ff84-49d6-98a3-b0b105ce2a1e"}} +{"remove":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","deletionTimestamp":1682326592313,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":796,"tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"add":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","partitionValues":{},"size":796,"modificationTime":1682326588000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":1,\"b\":11},\"maxValues\":{\"a\":2,\"b\":22},\"nullCount\":{\"a\":0,\"b\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"R7QFX3rGXPFLhHGq&7g<","offset":1,"sizeInBytes":34,"cardinality":1}}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin new file mode 100644 index 0000000000000000000000000000000000000000..66b4b7369d9f146ca1841aa0fa2c4be0e6866f20 GIT binary patch literal 43 jcmZQ%U|>+Xc-bDV@IyDfVW71Tpnr1r2XGLuZYVUx6*Ojnjt z*po+3z3I{a!9T&P_<#66_|kNjLcIt{=FRuM_s#nrljEmvS{Pv)JNWC5U!M&d8?fEQ zCPMe0=m?>m9Sy!^bWQvD<oe9ZWigAcZ{#8jNhlSwA&?H4L|{C4;ZQG1J&$T1HtUk!%64u#|C7U4qX zMLfyqEuZ$y=Zldy36 z@8)jC(=x>YU9;j$$+64<$Yx2xMFM#!l%9L7>GtP~g?Cdc{=4?W0?~(C7OE*c=SmJ? zXyqVI){Yu!?yN`Pv{NdpZ+SLC@SrQW8prTo`~}`{vc&)Z literal 0 HcmV?d00001 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java index ba98b1fac3e2..b368eb49c874 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java @@ -13,9 +13,15 @@ */ package io.trino.tests.product.deltalake; +import com.google.common.collect.ImmutableList; +import io.trino.tempto.assertions.QueryAssert.Row; +import io.trino.testing.DataProviders; import io.trino.testng.services.Flaky; import org.testng.annotations.Test; +import java.util.List; +import java.util.function.Consumer; + import static io.trino.tempto.assertions.QueryAssert.Row.row; import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -65,7 +71,6 @@ public void testDeleteOnAppendOnlyTableFails() @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) public void testDeletionVectors() { - // TODO https://github.com/trinodb/trino/issues/16903 Add support for deletionVectors reader features String tableName = "test_deletion_vectors_" + randomNameSuffix(); onDelta().executeQuery("" + "CREATE TABLE default." + tableName + @@ -79,28 +84,315 @@ public void testDeletionVectors() assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) .containsOnly(row(1, 11)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11)); + + // Reinsert the deleted row and verify that the row appears correctly + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (2, 22)"); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + + // Execute DELETE statement which doesn't delete any rows + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = -1"); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11), row(2, 22)); + // Verify other statements assertThat(onTrino().executeQuery("SHOW TABLES FROM delta.default")) .contains(row(tableName)); - assertThat(onTrino().executeQuery("SELECT comment FROM information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "'")) - .hasNoRows(); - assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); - assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default.\"" + tableName + "$history\"")) - .hasMessageMatching(".* Table .* does not exist"); - assertQueryFailure(() -> onTrino().executeQuery("SHOW COLUMNS FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); - assertQueryFailure(() -> onTrino().executeQuery("DESCRIBE delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); + assertThat(onTrino().executeQuery("SELECT column_name FROM delta.information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "'")) + .contains(row("a"), row("b")); + assertThat(onTrino().executeQuery("SELECT version, operation FROM delta.default.\"" + tableName + "$history\"")) + .contains(row(0, "CREATE TABLE"), row(1, "WRITE"), row(2, "DELETE")); + assertThat(onTrino().executeQuery("SHOW COLUMNS FROM delta.default." + tableName)) + .contains(row("a", "integer", "", ""), row("b", "integer", "", "")); + assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName)) + .contains(row("a", "integer", "", ""), row("b", "integer", "", "")); + + // TODO https://github.com/trinodb/trino/issues/17063 Use Delta Deletion Vectors for row-level deletes assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, 33)")) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName)) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3")) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " + "ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = -1")) - .hasMessageMatching(".* Table .* does not exist"); + .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported"); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsWithRandomPrefix() + { + String tableName = "test_deletion_vectors_random_prefix_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.randomizeFilePrefixes' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsWithCheckpointInterval() + { + String tableName = "test_deletion_vectors_random_prefix_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.checkpointInterval' = 1)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2"); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)) + .containsOnly(row(1, 11)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .containsOnly(row(1, 11)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsMergeDelete() + { + String tableName = "test_deletion_vectors_merge_delete_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 10))"); + onDelta().executeQuery("MERGE INTO default." + tableName + " t USING default." + tableName + " s " + + "ON (t.a = s.a) WHEN MATCHED AND t.a > 5 THEN DELETE"); + + List expected = ImmutableList.of(row(1), row(2), row(3), row(4), row(5)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsLargeNumbers() + { + String tableName = "test_deletion_vectors_large_numbers_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 10000))"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a > 1"); + + List expected = ImmutableList.of(row(1)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}, + dataProviderClass = DataProviders.class, dataProvider = "trueFalse") + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsAcrossAddFile(boolean partitioned) + { + String tableName = "test_deletion_vectors_accross_add_file_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + (partitioned ? "PARTITIONED BY (a)" : "") + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22)"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2 OR a = 4"); + + List expected = ImmutableList.of(row(1, 11), row(3, 33)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(expected); + + // Verify behavior when the query doesn't read non-partition columns + assertThat(onTrino().executeQuery("SELECT count(*) FROM delta.default." + tableName)).containsOnly(row(2)); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsTruncateTable() + { + testDeletionVectorsDeleteAll(tableName -> onDelta().executeQuery("TRUNCATE TABLE default." + tableName)); + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsDeleteFrom() + { + testDeletionVectorsDeleteAll(tableName -> onDelta().executeQuery("DELETE FROM default." + tableName)); + } + + private void testDeletionVectorsDeleteAll(Consumer deleteRow) + { + String tableName = "test_deletion_vectors_delete_all_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 1000))"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasRowsCount(1000); + + deleteRow.accept(tableName); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).hasNoRows(); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasNoRows(); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsOptimize() + { + String tableName = "test_deletion_vectors_optimize_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22)"); + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1 OR a = 3"); + + List expected = ImmutableList.of(row(2, 22), row(4, 44)); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + + onDelta().executeQuery("OPTIMIZE default." + tableName); + + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); + } + finally { + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsAbsolutePath() + { + String baseTableName = "test_deletion_vectors_base_absolute_" + randomNameSuffix(); + String tableName = "test_deletion_vectors_absolute_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + baseTableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + baseTableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + baseTableName + " VALUES (1,11), (2,22), (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + baseTableName + " WHERE a = 1 OR a = 3"); + + // The cloned table has 'p' (absolute path) storageType for deletion vector + onDelta().executeQuery("CREATE TABLE default." + tableName + " SHALLOW CLONE " + baseTableName); + + List expected = ImmutableList.of(row(2, 22), row(4, 44)); + assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); + // TODO https://github.com/trinodb/trino/issues/17205 Fix below assertion when supporting absolute path + assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) + .hasMessageContaining("Failed to generate splits"); + } + finally { + dropDeltaTableWithRetry("default." + baseTableName); + dropDeltaTableWithRetry("default." + tableName); + } + } + + // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS}) + @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH) + public void testDeletionVectorsWithChangeDataFeed() + { + String tableName = "test_deletion_vectors_cdf_" + randomNameSuffix(); + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + + "(a INT, b INT)" + + "USING delta " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.enableChangeDataFeed' = true)"); + try { + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22), (3,33), (4,44)"); + onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1 OR a = 3"); + + assertThat(onDelta().executeQuery( + "SELECT a, b, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)")) + .containsOnly( + row(1, 11, "insert", 1L), + row(2, 22, "insert", 1L), + row(3, 33, "insert", 1L), + row(4, 44, "insert", 1L), + row(1, 11, "delete", 2L), + row(3, 33, "delete", 2L)); + + // TODO Fix table_changes function failure + assertQueryFailure(() -> onTrino().executeQuery("SELECT a, b, _change_type, _commit_version FROM TABLE(delta.system.table_changes('default', '" + tableName + "', 0))")) + .hasMessageContaining("Change Data Feed is not enabled at version 2. Version contains 'remove' entries without 'cdc' entries"); } finally { dropDeltaTableWithRetry("default." + tableName);