diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml
index 8d3f7bea7e5d..08a97298c52e 100644
--- a/plugin/trino-delta-lake/pom.xml
+++ b/plugin/trino-delta-lake/pom.xml
@@ -52,6 +52,11 @@
trino-hive
+
+ io.trino
+ trino-hive-formats
+
+
io.trino
trino-parquet
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java
index 7c829626ddc9..f39d990f0d34 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeColumnHandle.java
@@ -218,6 +218,11 @@ public HiveColumnHandle toHiveColumnHandle()
Optional.empty());
}
+ public static DeltaLakeColumnHandle rowIdColumnHandle()
+ {
+ return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, BIGINT, OptionalInt.empty(), ROW_ID_COLUMN_NAME, BIGINT, SYNTHESIZED, Optional.empty());
+ }
+
public static DeltaLakeColumnHandle pathColumnHandle()
{
return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED, Optional.empty());
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
index f70d31246e83..d1c242cdb106 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
@@ -1400,7 +1400,8 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit
dataChange,
Optional.of(serializeStatsAsJson(info.getStatistics())),
Optional.empty(),
- ImmutableMap.of()));
+ ImmutableMap.of(),
+ Optional.empty()));
}
}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java
index 59f504fdf287..a6c705040316 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSource.java
@@ -15,6 +15,7 @@
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
+import io.trino.plugin.deltalake.delete.RowPredicate;
import io.trino.plugin.hive.ReaderProjectionsAdapter;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
@@ -33,6 +34,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.function.Supplier;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static io.airlift.slice.Slices.utf8Slice;
@@ -63,6 +65,7 @@ public class DeltaLakePageSource
private final Block partitionsBlock;
private final ConnectorPageSource delegate;
private final Optional projectionsAdapter;
+ private final Supplier> deletePredicate;
public DeltaLakePageSource(
List columns,
@@ -73,7 +76,8 @@ public DeltaLakePageSource(
Optional projectionsAdapter,
String path,
long fileSize,
- long fileModifiedTime)
+ long fileModifiedTime,
+ Supplier> deletePredicate)
{
int size = columns.size();
requireNonNull(partitionKeys, "partitionKeys is null");
@@ -131,6 +135,7 @@ else if (missingColumnNames.contains(column.getBaseColumnName())) {
this.rowIdIndex = rowIdIndex;
this.pathBlock = pathBlock;
this.partitionsBlock = partitionsBlock;
+ this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null");
}
@Override
@@ -168,6 +173,11 @@ public Page getNextPage()
if (projectionsAdapter.isPresent()) {
dataPage = projectionsAdapter.get().adaptPage(dataPage);
}
+ Optional deleteFilterPredicate = deletePredicate.get();
+ if (deleteFilterPredicate.isPresent()) {
+ dataPage = deleteFilterPredicate.get().filterPage(dataPage);
+ }
+
int batchSize = dataPage.getPositionCount();
Block[] blocks = new Block[prefilledBlocks.length];
for (int i = 0; i < prefilledBlocks.length; i++) {
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java
index 82a0d3bbdbfa..c85997c3281e 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java
@@ -13,6 +13,7 @@
*/
package io.trino.plugin.deltalake;
+import com.google.common.base.Suppliers;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -24,6 +25,9 @@
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.reader.MetadataReader;
+import io.trino.plugin.deltalake.delete.PositionDeleteFilter;
+import io.trino.plugin.deltalake.delete.RowPredicate;
+import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HiveColumnHandle;
@@ -35,6 +39,7 @@
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
import io.trino.spi.Page;
+import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.connector.ColumnHandle;
@@ -56,6 +61,7 @@
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.joda.time.DateTimeZone;
+import org.roaringbitmap.RoaringBitmap;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -63,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
@@ -71,12 +78,15 @@
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME;
+import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.rowIdColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
+import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedNestedReaderEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedReaderEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex;
+import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode;
import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN;
@@ -134,6 +144,11 @@ public ConnectorPageSource createPageSource(
.map(DeltaLakeColumnHandle.class::cast)
.collect(toImmutableList());
+ List requiredColumns = ImmutableList.builderWithExpectedSize(deltaLakeColumns.size() + 1)
+ .addAll(deltaLakeColumns)
+ .add(rowIdColumnHandle())
+ .build();
+
List regularColumns = deltaLakeColumns.stream()
.filter(column -> (column.getColumnType() == REGULAR) || column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME))
.collect(toImmutableList());
@@ -166,6 +181,7 @@ public ConnectorPageSource createPageSource(
if (filteredSplitPredicate.isAll() &&
split.getStart() == 0 && split.getLength() == split.getFileSize() &&
split.getFileRowCount().isPresent() &&
+ split.getDeletionVector().isEmpty() &&
(regularColumns.isEmpty() || onlyRowIdColumn(regularColumns))) {
return new DeltaLakePageSource(
deltaLakeColumns,
@@ -176,7 +192,8 @@ public ConnectorPageSource createPageSource(
Optional.empty(),
split.getPath(),
split.getFileSize(),
- split.getFileModifiedTime());
+ split.getFileModifiedTime(),
+ Optional::empty);
}
Location location = Location.of(split.getPath());
@@ -201,6 +218,9 @@ public ConnectorPageSource createPageSource(
hiveColumnHandles::add,
() -> missingColumnNames.add(column.getBaseColumnName()));
}
+ if (split.getDeletionVector().isPresent() && !regularColumns.contains(rowIdColumnHandle())) {
+ hiveColumnHandles.add(PARQUET_ROW_INDEX_COLUMN);
+ }
TupleDomain parquetPredicate = getParquetTupleDomain(filteredSplitPredicate.simplify(domainCompactionThreshold), columnMappingMode, parquetFieldIdToName);
@@ -224,6 +244,14 @@ public ConnectorPageSource createPageSource(
column -> ((HiveColumnHandle) column).getType(),
HivePageSourceProvider::getProjection));
+ Supplier> deletePredicate = Suppliers.memoize(() -> {
+ if (split.getDeletionVector().isEmpty()) {
+ return Optional.empty();
+ }
+ PositionDeleteFilter deleteFilter = readDeletes(session, Location.of(split.getTableLocation()), split.getDeletionVector().get());
+ return Optional.of(deleteFilter.createPredicate(requiredColumns));
+ });
+
return new DeltaLakePageSource(
deltaLakeColumns,
missingColumnNames.build(),
@@ -233,7 +261,29 @@ public ConnectorPageSource createPageSource(
projectionsAdapter,
split.getPath(),
split.getFileSize(),
- split.getFileModifiedTime());
+ split.getFileModifiedTime(),
+ deletePredicate);
+ }
+
+ private PositionDeleteFilter readDeletes(
+ ConnectorSession session,
+ Location tableLocation,
+ DeletionVectorEntry deletionVector)
+ {
+ try {
+ RoaringBitmap[] deletedRows = readDeletionVectors(
+ fileSystemFactory.create(session),
+ tableLocation,
+ deletionVector.storageType(),
+ deletionVector.pathOrInlineDv(),
+ deletionVector.offset(),
+ deletionVector.sizeInBytes(),
+ deletionVector.cardinality());
+ return new PositionDeleteFilter(deletedRows);
+ }
+ catch (IOException e) {
+ throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed to read deletion vectors", e);
+ }
}
public Map loadParquetIdAndNameMapping(TrinoInputFile inputFile, ParquetReaderOptions options)
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java
index 4161da6f360c..ed3b3948f7bf 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplit.java
@@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.SizeOf;
+import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ConnectorSplit;
@@ -40,12 +41,14 @@ public class DeltaLakeSplit
{
private static final int INSTANCE_SIZE = instanceSize(DeltaLakeSplit.class);
+ private final String tableLocation;
private final String path;
private final long start;
private final long length;
private final long fileSize;
private final Optional fileRowCount;
private final long fileModifiedTime;
+ private final Optional deletionVector;
private final List addresses;
private final SplitWeight splitWeight;
private final TupleDomain statisticsPredicate;
@@ -53,23 +56,27 @@ public class DeltaLakeSplit
@JsonCreator
public DeltaLakeSplit(
+ @JsonProperty("tableLocation") String tableLocation,
@JsonProperty("path") String path,
@JsonProperty("start") long start,
@JsonProperty("length") long length,
@JsonProperty("fileSize") long fileSize,
@JsonProperty("rowCount") Optional fileRowCount,
@JsonProperty("fileModifiedTime") long fileModifiedTime,
+ @JsonProperty("deletionVector") Optional deletionVector,
@JsonProperty("addresses") List addresses,
@JsonProperty("splitWeight") SplitWeight splitWeight,
@JsonProperty("statisticsPredicate") TupleDomain statisticsPredicate,
@JsonProperty("partitionKeys") Map> partitionKeys)
{
+ this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
this.path = requireNonNull(path, "path is null");
this.start = start;
this.length = length;
this.fileSize = fileSize;
this.fileRowCount = requireNonNull(fileRowCount, "rowCount is null");
this.fileModifiedTime = fileModifiedTime;
+ this.deletionVector = requireNonNull(deletionVector, "deletionVector is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
this.statisticsPredicate = requireNonNull(statisticsPredicate, "statisticsPredicate is null");
@@ -96,6 +103,12 @@ public SplitWeight getSplitWeight()
return splitWeight;
}
+ @JsonProperty
+ public String getTableLocation()
+ {
+ return tableLocation;
+ }
+
@JsonProperty
public String getPath()
{
@@ -132,6 +145,12 @@ public long getFileModifiedTime()
return fileModifiedTime;
}
+ @JsonProperty
+ public Optional getDeletionVector()
+ {
+ return deletionVector;
+ }
+
/**
* A TupleDomain representing the min/max statistics from the file this split was generated from. This does not contain any partitioning information.
*/
@@ -151,8 +170,10 @@ public Map> getPartitionKeys()
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ + estimatedSizeOf(tableLocation)
+ estimatedSizeOf(path)
+ sizeOf(fileRowCount, value -> LONG_INSTANCE_SIZE)
+ + sizeOf(deletionVector, DeletionVectorEntry::sizeInBytes)
+ estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes)
+ splitWeight.getRetainedSizeInBytes()
+ statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::getRetainedSizeInBytes)
@@ -173,11 +194,13 @@ public Object getInfo()
public String toString()
{
return toStringHelper(this)
+ .add("tableLocation", tableLocation)
.add("path", path)
.add("start", start)
.add("length", length)
.add("fileSize", fileSize)
.add("rowCount", fileRowCount)
+ .add("deletionVector", deletionVector)
.add("addresses", addresses)
.add("statisticsPredicate", statisticsPredicate)
.add("partitionKeys", partitionKeys)
@@ -197,8 +220,10 @@ public boolean equals(Object o)
return start == that.start &&
length == that.length &&
fileSize == that.fileSize &&
+ tableLocation.equals(that.tableLocation) &&
path.equals(that.path) &&
fileRowCount.equals(that.fileRowCount) &&
+ deletionVector.equals(that.deletionVector) &&
addresses.equals(that.addresses) &&
Objects.equals(statisticsPredicate, that.statisticsPredicate) &&
Objects.equals(partitionKeys, that.partitionKeys);
@@ -207,6 +232,6 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
- return Objects.hash(path, start, length, fileSize, fileRowCount, addresses, statisticsPredicate, partitionKeys);
+ return Objects.hash(tableLocation, path, start, length, fileSize, fileRowCount, deletionVector, addresses, statisticsPredicate, partitionKeys);
}
}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
index 41e400b4a865..a8e59bc70faa 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java
@@ -226,6 +226,7 @@ private Stream getSplits(
return splitsForFile(
session,
+ tableHandle.location(),
addAction,
splitPath,
addAction.getCanonicalPartitionValues(),
@@ -272,6 +273,7 @@ private static boolean pathMatchesPredicate(Domain pathDomain, String path)
private List splitsForFile(
ConnectorSession session,
+ String tableLocation,
AddFileEntry addFileEntry,
String splitPath,
Map> partitionKeys,
@@ -284,12 +286,14 @@ private List splitsForFile(
if (!splittable) {
// remainingInitialSplits is not used when !splittable
return ImmutableList.of(new DeltaLakeSplit(
+ tableLocation,
splitPath,
0,
fileSize,
fileSize,
addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords),
addFileEntry.getModificationTime(),
+ addFileEntry.getDeletionVector(),
ImmutableList.of(),
SplitWeight.standard(),
statisticsPredicate,
@@ -309,12 +313,14 @@ private List splitsForFile(
long splitSize = Math.min(maxSplitSize, fileSize - currentOffset);
splits.add(new DeltaLakeSplit(
+ tableLocation,
splitPath,
currentOffset,
splitSize,
fileSize,
Optional.empty(),
addFileEntry.getModificationTime(),
+ addFileEntry.getDeletionVector(),
ImmutableList.of(),
SplitWeight.fromProportion(Math.min(Math.max((double) splitSize / maxSplitSize, minimumAssignedSplitWeight), 1.0)),
statisticsPredicate,
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java
new file mode 100644
index 000000000000..197de3589b7e
--- /dev/null
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.deltalake.delete;
+
+import com.google.common.base.CharMatcher;
+import io.airlift.log.Logger;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoInputFile;
+import io.trino.hive.formats.TrinoDataInputStream;
+import io.trino.spi.TrinoException;
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.OptionalInt;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static java.lang.Math.toIntExact;
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+import static java.nio.charset.StandardCharsets.US_ASCII;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-format
+public final class DeletionVectors
+{
+ private static final Logger log = Logger.get(DeletionVectors.class);
+ private static final int PORTABLE_ROARING_BITMAP_MAGIC_NUMBER = 1681511377;
+
+ private static final String UUID_MARKER = "u"; // relative path with random prefix on disk
+ private static final String PATH_MARKER = "p"; // absolute path on disk
+ private static final String INLINE_MARKER = "i"; // inline
+
+ private static final CharMatcher ALPHANUMERIC = CharMatcher.inRange('A', 'Z').or(CharMatcher.inRange('a', 'z')).or(CharMatcher.inRange('0', '9')).precomputed();
+
+ private DeletionVectors() {}
+
+ public static RoaringBitmap[] readDeletionVectors(
+ TrinoFileSystem fileSystem,
+ Location location,
+ String storageType,
+ String pathOrInlineDv,
+ OptionalInt offset,
+ int sizeInBytes,
+ long cardinality)
+ throws IOException
+ {
+ if (storageType.equals(UUID_MARKER)) {
+ TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(pathOrInlineDv)));
+ ByteBuffer buffer = readDeletionVector(inputFile, offset.orElseThrow(), sizeInBytes);
+ RoaringBitmap[] bitmaps = deserializeDeletionVectors(buffer);
+ if (bitmaps.length != cardinality) {
+ // Don't throw an exception because Databricks may report the wrong cardinality when there are many deleted rows
+ log.debug("The number of deleted rows expects %s but got %s", cardinality, bitmaps.length);
+ }
+ return bitmaps;
+ }
+ else if (storageType.equals(INLINE_MARKER) || storageType.equals(PATH_MARKER)) {
+ throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + storageType);
+ }
+ throw new IllegalArgumentException("Unexpected storage type: " + storageType);
+ }
+
+ public static String toFileName(String pathOrInlineDv)
+ {
+ int randomPrefixLength = pathOrInlineDv.length() - Base85Codec.ENCODED_UUID_LENGTH;
+ String randomPrefix = pathOrInlineDv.substring(0, randomPrefixLength);
+ checkArgument(ALPHANUMERIC.matchesAllOf(randomPrefix), "Random prefix must be alphanumeric: %s", randomPrefix);
+ String prefix = randomPrefix.isEmpty() ? "" : randomPrefix + "/";
+ String encodedUuid = pathOrInlineDv.substring(randomPrefixLength);
+ UUID uuid = Base85Codec.decodeUuid(encodedUuid);
+ return "%sdeletion_vector_%s.bin".formatted(prefix, uuid);
+ }
+
+ public static ByteBuffer readDeletionVector(TrinoInputFile inputFile, int offset, int expectedSize)
+ throws IOException
+ {
+ byte[] bytes = new byte[expectedSize];
+ TrinoDataInputStream inputStream = new TrinoDataInputStream(inputFile.newStream());
+ inputStream.seek(offset);
+ int actualSize = inputStream.readInt();
+ if (actualSize != expectedSize) {
+ // TODO: Investigate why these size differ
+ log.warn("The size of deletion vector %s expects %s but got %s", inputFile.location(), expectedSize, actualSize);
+ }
+ inputStream.readFully(bytes);
+ return ByteBuffer.wrap(bytes).order(LITTLE_ENDIAN);
+ }
+
+ public static boolean contains(RoaringBitmap[] bitmaps, long value)
+ {
+ int high = highBytes(value);
+ if (high >= bitmaps.length) {
+ return false;
+ }
+ RoaringBitmap highBitmap = bitmaps[high];
+ int low = lowBytes(value);
+ return highBitmap.contains(low);
+ }
+
+ private static int highBytes(long value)
+ {
+ return toIntExact(value >> 32);
+ }
+
+ private static int lowBytes(long value)
+ {
+ return toIntExact(value);
+ }
+
+ public static RoaringBitmap[] deserializeDeletionVectors(ByteBuffer buffer)
+ throws IOException
+ {
+ checkArgument(buffer.order() == LITTLE_ENDIAN, "Byte order must be little endian: %s", buffer.order());
+ int magicNumber = buffer.getInt();
+ if (magicNumber == PORTABLE_ROARING_BITMAP_MAGIC_NUMBER) {
+ int size = toIntExact(buffer.getLong());
+ RoaringBitmap[] bitmaps = new RoaringBitmap[size];
+ for (int i = 0; i < size; i++) {
+ int key = buffer.getInt();
+ checkArgument(key >= 0);
+ RoaringBitmap bitmap = new RoaringBitmap();
+ bitmap.deserialize(buffer);
+ bitmaps[i] = bitmap;
+ buffer.position(buffer.position() + bitmap.serializedSizeInBytes());
+ }
+ return bitmaps;
+ }
+ throw new IllegalArgumentException("Unsupported magic number: " + magicNumber);
+ }
+
+ // This implements Base85 using the 4 byte block aligned encoding and character set from Z85 https://rfc.zeromq.org/spec/32
+ // Delta Lake implementation is https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/util/Codec.scala
+ static final class Base85Codec
+ {
+ private static final long BASE = 85L;
+ private static final long BASE_2ND_POWER = 7225L; // 85^2
+ private static final long BASE_3RD_POWER = 614125L; // 85^3
+ private static final long BASE_4TH_POWER = 52200625L; // 85^4
+ private static final int ASCII_BITMASK = 0x7F;
+
+ // UUIDs always encode into 20 characters
+ static final int ENCODED_UUID_LENGTH = 20;
+
+ private static final String BASE85_CHARACTERS = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#";
+
+ private static final byte[] ENCODE_MAP = BASE85_CHARACTERS.getBytes(UTF_8);
+ // The bitmask is the same as largest possible value, so the length of the array must be one greater.
+ private static final byte[] DECODE_MAP = new byte[ASCII_BITMASK + 1];
+
+ static {
+ Arrays.fill(DECODE_MAP, (byte) -1);
+ for (int i = 0; i < ENCODE_MAP.length; i++) {
+ DECODE_MAP[ENCODE_MAP[i]] = (byte) i;
+ }
+ }
+
+ private Base85Codec() {}
+
+ // This method will be used when supporting https://github.com/trinodb/trino/issues/17063
+ // This is used for testing codec round trip for now
+ public static String encodeBytes(byte[] input)
+ {
+ if (input.length % 4 == 0) {
+ return encodeBlocks(ByteBuffer.wrap(input));
+ }
+ int alignedLength = ((input.length + 4) / 4) * 4;
+ ByteBuffer buffer = ByteBuffer.allocate(alignedLength);
+ buffer.put(input);
+ while (buffer.hasRemaining()) {
+ buffer.put((byte) 0);
+ }
+ buffer.rewind();
+ return encodeBlocks(buffer);
+ }
+
+ private static String encodeBlocks(ByteBuffer buffer)
+ {
+ checkArgument(buffer.remaining() % 4 == 0);
+ int numBlocks = buffer.remaining() / 4;
+ // Every 4 byte block gets encoded into 5 bytes/chars
+ int outputLength = numBlocks * 5;
+ byte[] output = new byte[outputLength];
+ int outputIndex = 0;
+
+ while (buffer.hasRemaining()) {
+ long sum = buffer.getInt() & 0x00000000ffffffffL;
+ output[outputIndex] = ENCODE_MAP[(int) (sum / BASE_4TH_POWER)];
+ sum %= BASE_4TH_POWER;
+ output[outputIndex + 1] = ENCODE_MAP[(int) (sum / BASE_3RD_POWER)];
+ sum %= BASE_3RD_POWER;
+ output[outputIndex + 2] = ENCODE_MAP[(int) (sum / BASE_2ND_POWER)];
+ sum %= BASE_2ND_POWER;
+ output[outputIndex + 3] = ENCODE_MAP[(int) (sum / BASE)];
+ output[outputIndex + 4] = ENCODE_MAP[(int) (sum % BASE)];
+ outputIndex += 5;
+ }
+ return new String(output, US_ASCII);
+ }
+
+ public static ByteBuffer decodeBlocks(String encoded)
+ {
+ char[] input = encoded.toCharArray();
+ checkArgument(input.length % 5 == 0, "Input should be 5 character aligned");
+ ByteBuffer buffer = ByteBuffer.allocate(input.length / 5 * 4);
+
+ int inputIndex = 0;
+ while (buffer.hasRemaining()) {
+ long sum = 0;
+ sum += decodeInputChar(input[inputIndex]) * BASE_4TH_POWER;
+ sum += decodeInputChar(input[inputIndex + 1]) * BASE_3RD_POWER;
+ sum += decodeInputChar(input[inputIndex + 2]) * BASE_2ND_POWER;
+ sum += decodeInputChar(input[inputIndex + 3]) * BASE;
+ sum += decodeInputChar(input[inputIndex + 4]);
+ buffer.putInt((int) sum);
+ inputIndex += 5;
+ }
+ buffer.rewind();
+ return buffer;
+ }
+
+ public static UUID decodeUuid(String encoded)
+ {
+ ByteBuffer buffer = decodeBlocks(encoded);
+ return uuidFromByteBuffer(buffer);
+ }
+
+ private static UUID uuidFromByteBuffer(ByteBuffer buffer)
+ {
+ checkArgument(buffer.remaining() >= 16);
+ long highBits = buffer.getLong();
+ long lowBits = buffer.getLong();
+ return new UUID(highBits, lowBits);
+ }
+
+ // This method will be used when supporting https://github.com/trinodb/trino/issues/17063
+ // This is used for testing codec round trip for now
+ public static byte[] decodeBytes(String encoded, int outputLength)
+ {
+ ByteBuffer result = decodeBlocks(encoded);
+ if (result.remaining() > outputLength) {
+ // Only read the expected number of bytes
+ byte[] output = new byte[outputLength];
+ result.get(output);
+ return output;
+ }
+ return result.array();
+ }
+
+ private static long decodeInputChar(char chr)
+ {
+ checkArgument(BASE85_CHARACTERS.contains(String.valueOf(chr)), "%s is not valid Base85 character", chr);
+ return DECODE_MAP[chr & ASCII_BITMASK];
+ }
+ }
+}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java
new file mode 100644
index 000000000000..d2b19952b510
--- /dev/null
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/PositionDeleteFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.deltalake.delete;
+
+import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
+import org.roaringbitmap.RoaringBitmap;
+
+import java.util.List;
+
+import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static java.util.Objects.requireNonNull;
+
+public final class PositionDeleteFilter
+{
+ private final RoaringBitmap[] deletedRows;
+
+ public PositionDeleteFilter(RoaringBitmap[] deletedRows)
+ {
+ this.deletedRows = requireNonNull(deletedRows, "deletedRows is null");
+ }
+
+ public RowPredicate createPredicate(List columns)
+ {
+ int filePositionChannel = rowPositionChannel(columns);
+ return (page, position) -> {
+ long filePosition = BIGINT.getLong(page.getBlock(filePositionChannel), position);
+ return !DeletionVectors.contains(deletedRows, filePosition);
+ };
+ }
+
+ private static int rowPositionChannel(List columns)
+ {
+ for (int i = 0; i < columns.size(); i++) {
+ if (columns.get(i).getBaseColumnName().equals(ROW_ID_COLUMN_NAME)) {
+ return i;
+ }
+ }
+ throw new IllegalArgumentException("No row position column");
+ }
+}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java
new file mode 100644
index 000000000000..22b51b3ebb28
--- /dev/null
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/RowPredicate.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.deltalake.delete;
+
+import io.trino.spi.Page;
+
+public interface RowPredicate
+{
+ boolean test(Page page, int position);
+
+ default Page filterPage(Page page)
+ {
+ int positionCount = page.getPositionCount();
+ int[] retained = new int[positionCount];
+ int retainedCount = 0;
+ for (int position = 0; position < positionCount; position++) {
+ if (test(page, position)) {
+ retained[retainedCount] = position;
+ retainedCount++;
+ }
+ }
+ if (retainedCount == positionCount) {
+ return page;
+ }
+ return page.getPositions(retained, 0, retainedCount);
+ }
+}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java
index fddbf4aa9d7e..a52c1ae8b749 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunctionProcessor.java
@@ -201,6 +201,7 @@ private DeltaLakePageSource createDeltaLakePageSource(TableChangesSplit split)
Optional.empty(),
split.path(),
split.fileSize(),
- 0L);
+ 0L,
+ Optional::empty);
}
}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java
index 1cb4475067fb..a4884ed7c5cb 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/AddFileEntry.java
@@ -34,6 +34,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.serializeStatsAsJson;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues;
import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
public class AddFileEntry
{
@@ -47,6 +48,7 @@ public class AddFileEntry
private final long modificationTime;
private final boolean dataChange;
private final Map tags;
+ private final Optional deletionVector;
private final Optional extends DeltaLakeFileStatistics> parsedStats;
@JsonCreator
@@ -58,7 +60,8 @@ public AddFileEntry(
@JsonProperty("dataChange") boolean dataChange,
@JsonProperty("stats") Optional stats,
@JsonProperty("parsedStats") Optional parsedStats,
- @JsonProperty("tags") @Nullable Map tags)
+ @JsonProperty("tags") @Nullable Map tags,
+ @JsonProperty("deletionVector") Optional deletionVector)
{
this.path = path;
this.partitionValues = partitionValues;
@@ -67,6 +70,7 @@ public AddFileEntry(
this.modificationTime = modificationTime;
this.dataChange = dataChange;
this.tags = tags;
+ this.deletionVector = requireNonNull(deletionVector, "deletionVector is null");
Optional extends DeltaLakeFileStatistics> resultParsedStats = Optional.empty();
if (parsedStats.isPresent()) {
@@ -149,6 +153,12 @@ public Map getTags()
return tags;
}
+ @JsonProperty
+ public Optional getDeletionVector()
+ {
+ return deletionVector;
+ }
+
@Override
public String toString()
{
@@ -173,6 +183,7 @@ public boolean equals(Object o)
Objects.equals(partitionValues, that.partitionValues) &&
Objects.equals(canonicalPartitionValues, that.canonicalPartitionValues) &&
Objects.equals(tags, that.tags) &&
+ Objects.equals(deletionVector, that.deletionVector) &&
Objects.equals(parsedStats, that.parsedStats);
}
@@ -187,6 +198,7 @@ public int hashCode()
modificationTime,
dataChange,
tags,
+ deletionVector,
parsedStats);
}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java
new file mode 100644
index 000000000000..94719ae02c71
--- /dev/null
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeletionVectorEntry.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.deltalake.transactionlog;
+
+import java.util.OptionalInt;
+
+import static java.util.Objects.requireNonNull;
+
+// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-descriptor-schema
+public record DeletionVectorEntry(String storageType, String pathOrInlineDv, OptionalInt offset, int sizeInBytes, long cardinality)
+{
+ public DeletionVectorEntry
+ {
+ requireNonNull(storageType, "storageType is null");
+ requireNonNull(pathOrInlineDv, "pathOrInlineDv is null");
+ requireNonNull(offset, "offset is null");
+ }
+}
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java
index 558475900bb8..f9b127c686c8 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeSchemaSupport.java
@@ -87,11 +87,13 @@ private DeltaLakeSchemaSupport() {}
public static final String APPEND_ONLY_CONFIGURATION_KEY = "delta.appendOnly";
public static final String COLUMN_MAPPING_MODE_CONFIGURATION_KEY = "delta.columnMapping.mode";
public static final String MAX_COLUMN_ID_CONFIGURATION_KEY = "delta.columnMapping.maxColumnId";
+ private static final String DELETION_VECTORS_CONFIGURATION_KEY = "delta.enableDeletionVectors";
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features
- // TODO: Add support for 'deletionVectors' and 'timestampNTZ' reader features
+ // TODO: Add support for 'timestampNTZ' reader features
private static final Set SUPPORTED_READER_FEATURES = ImmutableSet.builder()
.add("columnMapping")
+ .add("deletionVectors")
.build();
public enum ColumnMappingMode
@@ -123,6 +125,11 @@ public static boolean isAppendOnly(MetadataEntry metadataEntry)
return parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false"));
}
+ public static boolean deletionVectorsEnabled(Map configuration)
+ {
+ return parseBoolean(configuration.get(DELETION_VECTORS_CONFIGURATION_KEY));
+ }
+
public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata)
{
String columnMappingMode = metadata.getConfiguration().getOrDefault(COLUMN_MAPPING_MODE_CONFIGURATION_KEY, "none");
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java
index 88c6301542ad..78ed32d5794c 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java
@@ -22,6 +22,7 @@
import io.trino.plugin.deltalake.DeltaLakeColumnMetadata;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
+import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
@@ -72,6 +73,7 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_DATA;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
+import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.deletionVectorsEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.columnsWithStats;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.START_OF_MODERN_ERA;
@@ -359,18 +361,24 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo
if (block.isNull(pagePosition)) {
return null;
}
+ boolean deletionVectorsEnabled = deletionVectorsEnabled(metadataEntry.getConfiguration());
Block addEntryBlock = block.getObject(pagePosition, Block.class);
log.debug("Block %s has %s fields", block, addEntryBlock.getPositionCount());
- Map partitionValues = getMap(addEntryBlock, 1);
- long size = getLong(addEntryBlock, 2);
- long modificationTime = getLong(addEntryBlock, 3);
- boolean dataChange = getByte(addEntryBlock, 4) != 0;
- Map tags = getMap(addEntryBlock, 7);
+ int position = 0;
+ String path = getString(addEntryBlock, position++);
+ Map partitionValues = getMap(addEntryBlock, position++);
+ long size = getLong(addEntryBlock, position++);
+ long modificationTime = getLong(addEntryBlock, position++);
+ boolean dataChange = getByte(addEntryBlock, position++) != 0;
+ Optional deletionVector = Optional.empty();
+ if (deletionVectorsEnabled) {
+ deletionVector = Optional.of(parseDeletionVectorFromParquet(addEntryBlock.getObject(position++, Block.class)));
+ }
+ Map tags = getMap(addEntryBlock, position + 2);
- String path = getString(addEntryBlock, 0);
AddFileEntry result;
- if (!addEntryBlock.isNull(6)) {
+ if (!addEntryBlock.isNull(position + 1)) {
result = new AddFileEntry(
path,
partitionValues,
@@ -378,19 +386,21 @@ private DeltaLakeTransactionLogEntry buildAddEntry(ConnectorSession session, Blo
modificationTime,
dataChange,
Optional.empty(),
- Optional.of(parseStatisticsFromParquet(addEntryBlock.getObject(6, Block.class))),
- tags);
+ Optional.of(parseStatisticsFromParquet(addEntryBlock.getObject(position + 1, Block.class))),
+ tags,
+ deletionVector);
}
- else if (!addEntryBlock.isNull(5)) {
+ else if (!addEntryBlock.isNull(position)) {
result = new AddFileEntry(
path,
partitionValues,
size,
modificationTime,
dataChange,
- Optional.of(getString(addEntryBlock, 5)),
+ Optional.of(getString(addEntryBlock, position)),
Optional.empty(),
- tags);
+ tags,
+ deletionVector);
}
else {
result = new AddFileEntry(
@@ -401,13 +411,28 @@ else if (!addEntryBlock.isNull(5)) {
dataChange,
Optional.empty(),
Optional.empty(),
- tags);
+ tags,
+ deletionVector);
}
log.debug("Result: %s", result);
return DeltaLakeTransactionLogEntry.addFileEntry(result);
}
+ private DeletionVectorEntry parseDeletionVectorFromParquet(Block block)
+ {
+ int size = block.getPositionCount();
+ checkArgument(size == 4 || size == 5, "Deletion vector entry must have 4 or 5 fields");
+
+ int position = 0;
+ String storageType = getString(block, position++);
+ String pathOrInlineDv = getString(block, position++);
+ OptionalInt offset = size == 4 ? OptionalInt.empty() : OptionalInt.of(getInt(block, position++));
+ int sizeInBytes = getInt(block, position++);
+ long cardinality = getLong(block, position++);
+ return new DeletionVectorEntry(storageType, pathOrInlineDv, offset, sizeInBytes, cardinality);
+ }
+
private DeltaLakeParquetFileStatistics parseStatisticsFromParquet(Block statsRowBlock)
{
if (metadataEntry == null) {
diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java
index 9125de8b63e9..fe4a4680bfe3 100644
--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java
+++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointSchemaManager.java
@@ -34,6 +34,7 @@
import java.util.Optional;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.deletionVectorsEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogAccess.columnsWithStats;
import static java.util.Objects.requireNonNull;
@@ -42,6 +43,14 @@ public class CheckpointSchemaManager
{
private final TypeManager typeManager;
+ private static final RowType DELETION_VECTORS_TYPE = RowType.from(ImmutableList.builder()
+ .add(RowType.field("storageType", VarcharType.VARCHAR))
+ .add(RowType.field("pathOrInlineDv", VarcharType.VARCHAR))
+ .add(RowType.field("offset", IntegerType.INTEGER))
+ .add(RowType.field("sizeInBytes", IntegerType.INTEGER))
+ .add(RowType.field("cardinality", BigintType.BIGINT))
+ .build());
+
private static final RowType TXN_ENTRY_TYPE = RowType.from(ImmutableList.of(
RowType.field("appId", VarcharType.createUnboundedVarcharType()),
RowType.field("version", BigintType.BIGINT),
@@ -106,6 +115,7 @@ public RowType getAddEntryType(MetadataEntry metadataEntry, boolean requireWrite
{
List allColumns = extractSchema(metadataEntry, typeManager);
List minMaxColumns = columnsWithStats(metadataEntry, typeManager);
+ boolean deletionVectorEnabled = deletionVectorsEnabled(metadataEntry.getConfiguration());
ImmutableList.Builder minMaxFields = ImmutableList.builder();
for (DeltaLakeColumnMetadata dataColumn : minMaxColumns) {
@@ -139,6 +149,9 @@ public RowType getAddEntryType(MetadataEntry metadataEntry, boolean requireWrite
addFields.add(RowType.field("size", BigintType.BIGINT));
addFields.add(RowType.field("modificationTime", BigintType.BIGINT));
addFields.add(RowType.field("dataChange", BooleanType.BOOLEAN));
+ if (deletionVectorEnabled) {
+ addFields.add(RowType.field("deletionVector", DELETION_VECTORS_TYPE));
+ }
if (requireWriteStatsAsJson) {
addFields.add(RowType.field("stats", VarcharType.createUnboundedVarcharType()));
}
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java
index 79bdba2cafa2..8c1c6a87eb70 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java
@@ -64,7 +64,7 @@ public class TestDeltaLakeBasic
private static final List PERSON_TABLES = ImmutableList.of(
"person", "person_without_last_checkpoint", "person_without_old_jsons", "person_without_checkpoints");
- private static final List OTHER_TABLES = ImmutableList.of("no_column_stats");
+ private static final List OTHER_TABLES = ImmutableList.of("no_column_stats", "deletion_vectors");
// The col-{uuid} pattern for delta.columnMapping.physicalName
private static final Pattern PHYSICAL_COLUMN_NAME_PATTERN = Pattern.compile("^col-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");
@@ -217,6 +217,15 @@ public Object[][] columnMappingModeDataProvider()
};
}
+ /**
+ * @see databricks.deletion_vectors
+ */
+ @Test
+ public void testDeletionVectors()
+ {
+ assertQuery("SELECT * FROM deletion_vectors", "VALUES (1, 11)");
+ }
+
@Test
public void testCorruptedManagedTableLocation()
throws Exception
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java
index e0fd0d51c6b8..a102a5c73b50 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSplitManager.java
@@ -181,13 +181,13 @@ public List getActiveFiles(TableSnapshot tableSnapshot, ConnectorS
private AddFileEntry addFileEntryOfSize(long fileSize)
{
- return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of());
+ return new AddFileEntry(FILE_PATH, ImmutableMap.of(), fileSize, 0, false, Optional.empty(), Optional.empty(), ImmutableMap.of(), Optional.empty());
}
private DeltaLakeSplit makeSplit(long start, long splitSize, long fileSize, double minimumAssignedSplitWeight)
{
SplitWeight splitWeight = SplitWeight.fromProportion(Math.min(Math.max((double) fileSize / splitSize, minimumAssignedSplitWeight), 1.0));
- return new DeltaLakeSplit(FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of());
+ return new DeltaLakeSplit(TABLE_PATH, FULL_PATH, start, splitSize, fileSize, Optional.empty(), 0, Optional.empty(), ImmutableList.of(), splitWeight, TupleDomain.all(), ImmutableMap.of());
}
private List getSplits(DeltaLakeSplitManager splitManager, DeltaLakeConfig deltaLakeConfig)
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java
new file mode 100644
index 000000000000..ae6d1e4bb225
--- /dev/null
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.deltalake.delete;
+
+import com.google.common.io.Resources;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import org.roaringbitmap.RoaringBitmap;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.OptionalInt;
+
+import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION;
+import static io.trino.plugin.deltalake.delete.DeletionVectors.Base85Codec.decodeBlocks;
+import static io.trino.plugin.deltalake.delete.DeletionVectors.Base85Codec.decodeBytes;
+import static io.trino.plugin.deltalake.delete.DeletionVectors.Base85Codec.encodeBytes;
+import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors;
+import static io.trino.plugin.deltalake.delete.DeletionVectors.toFileName;
+import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class TestDeletionVectors
+{
+ @Test
+ public void testUuidStorageType()
+ throws Exception
+ {
+ // The deletion vector has a deleted row at position 1
+ Path path = new File(Resources.getResource("databricks/deletion_vectors").toURI()).toPath();
+ TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION);
+
+ RoaringBitmap[] bitmaps = readDeletionVectors(fileSystem, Location.of(path.toString()), "u", "R7QFX3rGXPFLhHGq&7g<", OptionalInt.of(1), 34, 1);
+ assertThat(bitmaps).hasSize(1);
+ assertFalse(bitmaps[0].contains(0));
+ assertTrue(bitmaps[0].contains(1));
+ assertFalse(bitmaps[0].contains(2));
+ }
+
+ @Test
+ public void testUnsupportedPathStorageType()
+ {
+ TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION);
+ assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), "p", "s3://bucket/table/deletion_vector.bin", OptionalInt.empty(), 40, 1))
+ .hasMessageContaining("Unsupported storage type for deletion vector: p");
+ }
+
+ @Test
+ public void testUnsupportedInlineStorageType()
+ {
+ TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION);
+ assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), "i", "wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", OptionalInt.empty(), 40, 1))
+ .hasMessageContaining("Unsupported storage type for deletion vector: i");
+ }
+
+ @Test
+ public void testToFileName()
+ {
+ assertEquals(toFileName("R7QFX3rGXPFLhHGq&7g<"), "deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin");
+ assertEquals(toFileName("ab^-aqEH.-t@S}K{vb[*k^"), "ab/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin");
+ }
+
+ @Test
+ public void testEncodeBytes()
+ {
+ // The test case comes from https://rfc.zeromq.org/spec/32
+ byte[] inputBytes = new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B};
+ String encoded = encodeBytes(inputBytes);
+ assertEquals(encoded, "HelloWorld");
+ }
+
+ @Test
+ public void testDecodeBytes()
+ {
+ String data = "HelloWorld";
+ byte[] bytes = decodeBytes(data, 8);
+ assertEquals(bytes, new byte[] {(byte) 0x86, 0x4F, (byte) 0xD2, 0x6F, (byte) 0xB5, 0x59, (byte) 0xF7, 0x5B});
+ }
+
+ @Test
+ public void testDecodeBlocksIllegalCharacter()
+ {
+ assertThatThrownBy(() -> decodeBlocks("ab" + 0x7F + "de")).hasMessageContaining("Input should be 5 character aligned");
+
+ assertThatThrownBy(() -> decodeBlocks("abîde")).hasMessageContaining("î is not valid Base85 character");
+ assertThatThrownBy(() -> decodeBlocks("abπde")).hasMessageContaining("π is not valid Base85 character");
+ assertThatThrownBy(() -> decodeBlocks("ab\"de")).hasMessageContaining("\" is not valid Base85 character");
+ }
+
+ @Test
+ public void testCodecRoundTrip()
+ {
+ assertEquals("HelloWorld", encodeBytes(decodeBytes("HelloWorld", 8)));
+ assertEquals("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", encodeBytes(decodeBytes("wi5b=000010000siXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L", 40)));
+ }
+}
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java
index f28544ce9c8d..5303e55a365d 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestTableSnapshot.java
@@ -129,7 +129,8 @@ public void readsCheckpointFile()
"\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" +
"}"),
Optional.empty(),
- null));
+ null,
+ Optional.empty()));
assertThat(entries).element(7).extracting(DeltaLakeTransactionLogEntry::getAdd).isEqualTo(
new AddFileEntry(
@@ -145,7 +146,8 @@ public void readsCheckpointFile()
"\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" +
"}"),
Optional.empty(),
- null));
+ null,
+ Optional.empty()));
}
// lets read two entry types in one call; add and protocol
@@ -169,7 +171,8 @@ public void readsCheckpointFile()
"\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" +
"}"),
Optional.empty(),
- null));
+ null,
+ Optional.empty()));
assertThat(entries).element(6).extracting(DeltaLakeTransactionLogEntry::getProtocol).isEqualTo(new ProtocolEntry(1, 2, Optional.empty(), Optional.empty()));
@@ -187,7 +190,8 @@ public void readsCheckpointFile()
"\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" +
"}"),
Optional.empty(),
- null));
+ null,
+ Optional.empty()));
}
}
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java
index f19248c7fbb2..738272a7f381 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointBuilder.java
@@ -58,10 +58,10 @@ public void testCheckpointBuilder()
builder.addLogEntry(transactionEntry(app1TransactionV1));
builder.addLogEntry(transactionEntry(app2TransactionV5));
- AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of());
+ AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty());
RemoveFileEntry removeA1 = new RemoveFileEntry("a", 1, true);
- AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of());
- AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of());
+ AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty());
+ AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty());
RemoveFileEntry removeB = new RemoveFileEntry("b", 1, true);
RemoveFileEntry removeC = new RemoveFileEntry("c", 1, true);
builder.addLogEntry(addFileEntry(addA1));
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java
index 7401f3ae308a..7facacadf840 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointEntryIterator.java
@@ -129,7 +129,8 @@ public void testReadAddEntries()
"\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" +
"}"),
Optional.empty(),
- null));
+ null,
+ Optional.empty()));
assertThat(entries).element(7).extracting(DeltaLakeTransactionLogEntry::getAdd).isEqualTo(
new AddFileEntry(
@@ -145,7 +146,8 @@ public void testReadAddEntries()
"\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" +
"}"),
Optional.empty(),
- null));
+ null,
+ Optional.empty()));
}
@Test
@@ -190,7 +192,8 @@ public void testReadAllEntries()
"\"nullCount\":{\"name\":0,\"married\":0,\"phones\":0,\"address\":{\"street\":0,\"city\":0,\"state\":0,\"zip\":0},\"income\":0}" +
"}"),
Optional.empty(),
- null));
+ null,
+ Optional.empty()));
// RemoveFileEntry
assertThat(entries).element(3).extracting(DeltaLakeTransactionLogEntry::getRemove).isEqualTo(
diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java
index 3f15432f43d8..93ffb6f299f5 100644
--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java
+++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java
@@ -179,7 +179,8 @@ public void testCheckpointWriteReadJsonRoundtrip()
Optional.empty(),
ImmutableMap.of(
"someTag", "someValue",
- "otherTag", "otherValue"));
+ "otherTag", "otherValue"),
+ Optional.empty());
RemoveFileEntry removeFileEntry = new RemoveFileEntry(
"removeFilePath",
@@ -315,7 +316,8 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip()
.buildOrThrow()))),
ImmutableMap.of(
"someTag", "someValue",
- "otherTag", "otherValue"));
+ "otherTag", "otherValue"),
+ Optional.empty());
RemoveFileEntry removeFileEntry = new RemoveFileEntry(
"removeFilePath",
@@ -391,7 +393,8 @@ public void testDisablingRowStatistics()
"row", RowBlock.fromFieldBlocks(1, Optional.empty(), minMaxRowFieldBlocks).getSingleValueBlock(0))),
Optional.of(ImmutableMap.of(
"row", RowBlock.fromFieldBlocks(1, Optional.empty(), nullCountRowFieldBlocks).getSingleValueBlock(0))))),
- ImmutableMap.of());
+ ImmutableMap.of(),
+ Optional.empty());
CheckpointEntries entries = new CheckpointEntries(
metadataEntry,
@@ -429,7 +432,8 @@ private AddFileEntry makeComparable(AddFileEntry original)
original.isDataChange(),
original.getStatsString(),
makeComparable(original.getStats()),
- original.getTags());
+ original.getTags(),
+ original.getDeletionVector());
}
private Optional makeComparable(Optional extends DeltaLakeFileStatistics> original)
diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md
new file mode 100644
index 000000000000..f30f3b1279ae
--- /dev/null
+++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/README.md
@@ -0,0 +1,13 @@
+Data generated using Databricks 12.2:
+
+```sql
+CREATE TABLE default.test_deletion_vectors (
+ a INT,
+ b INT)
+USING delta
+LOCATION 's3://trino-ci-test/test_deletion_vectors'
+TBLPROPERTIES ('delta.enableDeletionVectors' = true);
+
+INSERT INTO default.test_deletion_vectors VALUES (1, 11), (2, 22);
+DELETE FROM default.test_deletion_vectors WHERE a = 2;
+```
diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json
new file mode 100644
index 000000000000..4a5d53407173
--- /dev/null
+++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000000.json
@@ -0,0 +1,3 @@
+{"commitInfo":{"timestamp":1682326581374,"operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.enableDeletionVectors\":\"true\"}"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"2cbfa481-d2b0-4f59-83f9-1261492dfd46"}}
+{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}
+{"metaData":{"id":"32f26f4b-95ba-4980-b209-0132e949b3e4","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true"},"createdTime":1682326580906}}
diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json
new file mode 100644
index 000000000000..7a5e8e6418b8
--- /dev/null
+++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000001.json
@@ -0,0 +1,2 @@
+{"commitInfo":{"timestamp":1682326587253,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"2","numOutputBytes":"796"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"99cd5421-a1b9-40c6-8063-7298ec935fd6"}}
+{"add":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","partitionValues":{},"size":796,"modificationTime":1682326588000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":1,\"b\":11},\"maxValues\":{\"a\":2,\"b\":22},\"nullCount\":{\"a\":0,\"b\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json
new file mode 100644
index 000000000000..00f135f1c8d2
--- /dev/null
+++ b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/_delta_log/00000000000000000002.json
@@ -0,0 +1,3 @@
+{"commitInfo":{"timestamp":1682326592314,"operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.default.test_deletion_vectors_vsipbnhjjg.a = 2)\"]"},"readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"0","numRemovedBytes":"0","numCopiedRows":"0","numDeletionVectorsAdded":"1","numDeletionVectorsRemoved":"0","numAddedChangeFiles":"0","executionTimeMs":"2046","numDeletedRows":"1","scanTimeMs":"1335","numAddedFiles":"0","numAddedBytes":"0","rewriteTimeMs":"709"},"engineInfo":"Databricks-Runtime/12.2.x-scala2.12","txnId":"219ffc4f-ff84-49d6-98a3-b0b105ce2a1e"}}
+{"remove":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","deletionTimestamp":1682326592313,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":796,"tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
+{"add":{"path":"part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet","partitionValues":{},"size":796,"modificationTime":1682326588000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"a\":1,\"b\":11},\"maxValues\":{\"a\":2,\"b\":22},\"nullCount\":{\"a\":0,\"b\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1682326588000000","MIN_INSERTION_TIME":"1682326588000000","MAX_INSERTION_TIME":"1682326588000000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"R7QFX3rGXPFLhHGq&7g<","offset":1,"sizeInBytes":34,"cardinality":1}}}
diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin
new file mode 100644
index 000000000000..66b4b7369d9f
Binary files /dev/null and b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/deletion_vector_a52eda8c-0a57-4636-814b-9c165388f7ca.bin differ
diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet
new file mode 100644
index 000000000000..b4fbdc1f40bd
Binary files /dev/null and b/plugin/trino-delta-lake/src/test/resources/databricks/deletion_vectors/part-00000-0aa47759-3062-4e53-94c8-2e20a0796fee-c000.snappy.parquet differ
diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java
index fedc2b2d88b6..516de06f0ef3 100644
--- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java
+++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDatabricksDelete.java
@@ -13,9 +13,15 @@
*/
package io.trino.tests.product.deltalake;
+import com.google.common.collect.ImmutableList;
+import io.trino.tempto.assertions.QueryAssert.Row;
+import io.trino.testing.DataProviders;
import io.trino.testng.services.Flaky;
import org.testng.annotations.Test;
+import java.util.List;
+import java.util.function.Consumer;
+
import static io.trino.tempto.assertions.QueryAssert.Row.row;
import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.tempto.assertions.QueryAssert.assertThat;
@@ -65,7 +71,6 @@ public void testDeleteOnAppendOnlyTableFails()
@Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
public void testDeletionVectors()
{
- // TODO https://github.com/trinodb/trino/issues/16903 Add support for deletionVectors reader features
String tableName = "test_deletion_vectors_" + randomNameSuffix();
onDelta().executeQuery("" +
"CREATE TABLE default." + tableName +
@@ -79,28 +84,315 @@ public void testDeletionVectors()
assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
.containsOnly(row(1, 11));
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
+ .containsOnly(row(1, 11));
+
+ // Reinsert the deleted row and verify that the row appears correctly
+ onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (2, 22)");
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
+ .containsOnly(row(1, 11), row(2, 22));
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
+ .containsOnly(row(1, 11), row(2, 22));
+
+ // Execute DELETE statement which doesn't delete any rows
+ onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = -1");
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
+ .containsOnly(row(1, 11), row(2, 22));
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
+ .containsOnly(row(1, 11), row(2, 22));
+ // Verify other statements
assertThat(onTrino().executeQuery("SHOW TABLES FROM delta.default"))
.contains(row(tableName));
- assertThat(onTrino().executeQuery("SELECT comment FROM information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "'"))
- .hasNoRows();
- assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
- .hasMessageMatching(".* Table .* does not exist");
- assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default.\"" + tableName + "$history\""))
- .hasMessageMatching(".* Table .* does not exist");
- assertQueryFailure(() -> onTrino().executeQuery("SHOW COLUMNS FROM delta.default." + tableName))
- .hasMessageMatching(".* Table .* does not exist");
- assertQueryFailure(() -> onTrino().executeQuery("DESCRIBE delta.default." + tableName))
- .hasMessageMatching(".* Table .* does not exist");
+ assertThat(onTrino().executeQuery("SELECT column_name FROM delta.information_schema.columns WHERE table_schema = 'default' AND table_name = '" + tableName + "'"))
+ .contains(row("a"), row("b"));
+ assertThat(onTrino().executeQuery("SELECT version, operation FROM delta.default.\"" + tableName + "$history\""))
+ .contains(row(0, "CREATE TABLE"), row(1, "WRITE"), row(2, "DELETE"));
+ assertThat(onTrino().executeQuery("SHOW COLUMNS FROM delta.default." + tableName))
+ .contains(row("a", "integer", "", ""), row("b", "integer", "", ""));
+ assertThat(onTrino().executeQuery("DESCRIBE delta.default." + tableName))
+ .contains(row("a", "integer", "", ""), row("b", "integer", "", ""));
+
+ // TODO https://github.com/trinodb/trino/issues/17063 Use Delta Deletion Vectors for row-level deletes
assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO delta.default." + tableName + " VALUES (3, 33)"))
- .hasMessageMatching(".* Table .* does not exist");
+ .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported");
assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM delta.default." + tableName))
- .hasMessageMatching(".* Table .* does not exist");
+ .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported");
assertQueryFailure(() -> onTrino().executeQuery("UPDATE delta.default." + tableName + " SET a = 3"))
- .hasMessageMatching(".* Table .* does not exist");
+ .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported");
assertQueryFailure(() -> onTrino().executeQuery("MERGE INTO delta.default." + tableName + " t USING delta.default." + tableName + " s " +
"ON (t.a = s.a) WHEN MATCHED THEN UPDATE SET b = -1"))
- .hasMessageMatching(".* Table .* does not exist");
+ .hasMessageMatching(".* Table .* requires Delta Lake writer version 7 which is not supported");
+ }
+ finally {
+ dropDeltaTableWithRetry("default." + tableName);
+ }
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsWithRandomPrefix()
+ {
+ String tableName = "test_deletion_vectors_random_prefix_" + randomNameSuffix();
+ onDelta().executeQuery("" +
+ "CREATE TABLE default." + tableName +
+ "(a INT, b INT)" +
+ "USING delta " +
+ "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
+ "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.randomizeFilePrefixes' = true)");
+ try {
+ onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22)");
+ onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2");
+
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
+ .containsOnly(row(1, 11));
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
+ .containsOnly(row(1, 11));
+ }
+ finally {
+ dropDeltaTableWithRetry("default." + tableName);
+ }
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsWithCheckpointInterval()
+ {
+ String tableName = "test_deletion_vectors_random_prefix_" + randomNameSuffix();
+ onDelta().executeQuery("" +
+ "CREATE TABLE default." + tableName +
+ "(a INT, b INT)" +
+ "USING delta " +
+ "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
+ "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.checkpointInterval' = 1)");
+ try {
+ onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22)");
+ onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2");
+
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName))
+ .containsOnly(row(1, 11));
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
+ .containsOnly(row(1, 11));
+ }
+ finally {
+ dropDeltaTableWithRetry("default." + tableName);
+ }
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsMergeDelete()
+ {
+ String tableName = "test_deletion_vectors_merge_delete_" + randomNameSuffix();
+ onDelta().executeQuery("" +
+ "CREATE TABLE default." + tableName +
+ "(a INT)" +
+ "USING delta " +
+ "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
+ "TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
+ try {
+ onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 10))");
+ onDelta().executeQuery("MERGE INTO default." + tableName + " t USING default." + tableName + " s " +
+ "ON (t.a = s.a) WHEN MATCHED AND t.a > 5 THEN DELETE");
+
+ List expected = ImmutableList.of(row(1), row(2), row(3), row(4), row(5));
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected);
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected);
+ }
+ finally {
+ dropDeltaTableWithRetry("default." + tableName);
+ }
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsLargeNumbers()
+ {
+ String tableName = "test_deletion_vectors_large_numbers_" + randomNameSuffix();
+ onDelta().executeQuery("" +
+ "CREATE TABLE default." + tableName +
+ "(a INT)" +
+ "USING delta " +
+ "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
+ "TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
+ try {
+ onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 10000))");
+ onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a > 1");
+
+ List expected = ImmutableList.of(row(1));
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected);
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected);
+ }
+ finally {
+ dropDeltaTableWithRetry("default." + tableName);
+ }
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS},
+ dataProviderClass = DataProviders.class, dataProvider = "trueFalse")
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsAcrossAddFile(boolean partitioned)
+ {
+ String tableName = "test_deletion_vectors_accross_add_file_" + randomNameSuffix();
+ onDelta().executeQuery("" +
+ "CREATE TABLE default." + tableName +
+ "(a INT, b INT)" +
+ "USING delta " +
+ (partitioned ? "PARTITIONED BY (a)" : "") +
+ "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
+ "TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
+ try {
+ onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22)");
+ onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3,33), (4,44)");
+ onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 2 OR a = 4");
+
+ List expected = ImmutableList.of(row(1, 11), row(3, 33));
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).containsOnly(expected);
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).containsOnly(expected);
+
+ // Verify behavior when the query doesn't read non-partition columns
+ assertThat(onTrino().executeQuery("SELECT count(*) FROM delta.default." + tableName)).containsOnly(row(2));
+ }
+ finally {
+ dropDeltaTableWithRetry("default." + tableName);
+ }
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsTruncateTable()
+ {
+ testDeletionVectorsDeleteAll(tableName -> onDelta().executeQuery("TRUNCATE TABLE default." + tableName));
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsDeleteFrom()
+ {
+ testDeletionVectorsDeleteAll(tableName -> onDelta().executeQuery("DELETE FROM default." + tableName));
+ }
+
+ private void testDeletionVectorsDeleteAll(Consumer deleteRow)
+ {
+ String tableName = "test_deletion_vectors_delete_all_" + randomNameSuffix();
+ onDelta().executeQuery("" +
+ "CREATE TABLE default." + tableName +
+ "(a INT)" +
+ "USING delta " +
+ "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
+ "TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
+ try {
+ onDelta().executeQuery("INSERT INTO default." + tableName + " SELECT explode(sequence(1, 1000))");
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasRowsCount(1000);
+
+ deleteRow.accept(tableName);
+
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).hasNoRows();
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).hasNoRows();
+ }
+ finally {
+ dropDeltaTableWithRetry("default." + tableName);
+ }
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsOptimize()
+ {
+ String tableName = "test_deletion_vectors_optimize_" + randomNameSuffix();
+ onDelta().executeQuery("" +
+ "CREATE TABLE default." + tableName +
+ "(a INT, b INT)" +
+ "USING delta " +
+ "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
+ "TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
+ try {
+ onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22)");
+ onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (3,33), (4,44)");
+ onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1 OR a = 3");
+
+ List expected = ImmutableList.of(row(2, 22), row(4, 44));
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected);
+
+ onDelta().executeQuery("OPTIMIZE default." + tableName);
+
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected);
+ assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected);
+ }
+ finally {
+ dropDeltaTableWithRetry("default." + tableName);
+ }
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsAbsolutePath()
+ {
+ String baseTableName = "test_deletion_vectors_base_absolute_" + randomNameSuffix();
+ String tableName = "test_deletion_vectors_absolute_" + randomNameSuffix();
+ onDelta().executeQuery("" +
+ "CREATE TABLE default." + baseTableName +
+ "(a INT, b INT)" +
+ "USING delta " +
+ "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + baseTableName + "' " +
+ "TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
+ try {
+ onDelta().executeQuery("INSERT INTO default." + baseTableName + " VALUES (1,11), (2,22), (3,33), (4,44)");
+ onDelta().executeQuery("DELETE FROM default." + baseTableName + " WHERE a = 1 OR a = 3");
+
+ // The cloned table has 'p' (absolute path) storageType for deletion vector
+ onDelta().executeQuery("CREATE TABLE default." + tableName + " SHALLOW CLONE " + baseTableName);
+
+ List expected = ImmutableList.of(row(2, 22), row(4, 44));
+ assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected);
+ // TODO https://github.com/trinodb/trino/issues/17205 Fix below assertion when supporting absolute path
+ assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
+ .hasMessageContaining("Failed to generate splits");
+ }
+ finally {
+ dropDeltaTableWithRetry("default." + baseTableName);
+ dropDeltaTableWithRetry("default." + tableName);
+ }
+ }
+
+ // TODO: Add DELTA_LAKE_OSS group once they support creating a table with deletion vectors
+ @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_EXCLUDE_73, DELTA_LAKE_EXCLUDE_91, DELTA_LAKE_EXCLUDE_104, DELTA_LAKE_EXCLUDE_113, PROFILE_SPECIFIC_TESTS})
+ @Flaky(issue = DATABRICKS_COMMUNICATION_FAILURE_ISSUE, match = DATABRICKS_COMMUNICATION_FAILURE_MATCH)
+ public void testDeletionVectorsWithChangeDataFeed()
+ {
+ String tableName = "test_deletion_vectors_cdf_" + randomNameSuffix();
+ onDelta().executeQuery("" +
+ "CREATE TABLE default." + tableName +
+ "(a INT, b INT)" +
+ "USING delta " +
+ "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " +
+ "TBLPROPERTIES ('delta.enableDeletionVectors' = true, 'delta.enableChangeDataFeed' = true)");
+ try {
+ onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1,11), (2,22), (3,33), (4,44)");
+ onDelta().executeQuery("DELETE FROM default." + tableName + " WHERE a = 1 OR a = 3");
+
+ assertThat(onDelta().executeQuery(
+ "SELECT a, b, _change_type, _commit_version FROM table_changes('default." + tableName + "', 0)"))
+ .containsOnly(
+ row(1, 11, "insert", 1L),
+ row(2, 22, "insert", 1L),
+ row(3, 33, "insert", 1L),
+ row(4, 44, "insert", 1L),
+ row(1, 11, "delete", 2L),
+ row(3, 33, "delete", 2L));
+
+ // TODO Fix table_changes function failure
+ assertQueryFailure(() -> onTrino().executeQuery("SELECT a, b, _change_type, _commit_version FROM TABLE(delta.system.table_changes('default', '" + tableName + "', 0))"))
+ .hasMessageContaining("Change Data Feed is not enabled at version 2. Version contains 'remove' entries without 'cdc' entries");
}
finally {
dropDeltaTableWithRetry("default." + tableName);