Skip to content

Commit

Permalink
Support reading deletion vectors in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 4, 2023
1 parent dd4bcb0 commit 865c973
Show file tree
Hide file tree
Showing 30 changed files with 1,047 additions and 53 deletions.
5 changes: 5 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@
<artifactId>trino-hive</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive-formats</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-parquet</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ public HiveColumnHandle toHiveColumnHandle()
Optional.empty());
}

public static DeltaLakeColumnHandle rowIdColumnHandle()
{
return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, BIGINT, OptionalInt.empty(), ROW_ID_COLUMN_NAME, BIGINT, SYNTHESIZED, Optional.empty());
}

public static DeltaLakeColumnHandle pathColumnHandle()
{
return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,8 @@ private static void appendAddFileEntries(TransactionLogWriter transactionLogWrit
dataChange,
Optional.of(serializeStatsAsJson(info.getStatistics())),
Optional.empty(),
ImmutableMap.of()));
ImmutableMap.of(),
Optional.empty()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
import io.trino.plugin.deltalake.delete.RowPredicate;
import io.trino.plugin.hive.ReaderProjectionsAdapter;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand All @@ -33,6 +34,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Supplier;

import static com.google.common.base.Throwables.throwIfInstanceOf;
import static io.airlift.slice.Slices.utf8Slice;
Expand Down Expand Up @@ -63,6 +65,7 @@ public class DeltaLakePageSource
private final Block partitionsBlock;
private final ConnectorPageSource delegate;
private final Optional<ReaderProjectionsAdapter> projectionsAdapter;
private final Supplier<Optional<RowPredicate>> deletePredicate;

public DeltaLakePageSource(
List<DeltaLakeColumnHandle> columns,
Expand All @@ -73,7 +76,8 @@ public DeltaLakePageSource(
Optional<ReaderProjectionsAdapter> projectionsAdapter,
String path,
long fileSize,
long fileModifiedTime)
long fileModifiedTime,
Supplier<Optional<RowPredicate>> deletePredicate)
{
int size = columns.size();
requireNonNull(partitionKeys, "partitionKeys is null");
Expand Down Expand Up @@ -131,6 +135,7 @@ else if (missingColumnNames.contains(column.getBaseColumnName())) {
this.rowIdIndex = rowIdIndex;
this.pathBlock = pathBlock;
this.partitionsBlock = partitionsBlock;
this.deletePredicate = requireNonNull(deletePredicate, "deletePredicate is null");
}

@Override
Expand Down Expand Up @@ -168,6 +173,11 @@ public Page getNextPage()
if (projectionsAdapter.isPresent()) {
dataPage = projectionsAdapter.get().adaptPage(dataPage);
}
Optional<RowPredicate> deleteFilterPredicate = deletePredicate.get();
if (deleteFilterPredicate.isPresent()) {
dataPage = deleteFilterPredicate.get().filterPage(dataPage);
}

int batchSize = dataPage.getPositionCount();
Block[] blocks = new Block[prefilledBlocks.length];
for (int i = 0; i < prefilledBlocks.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.base.Suppliers;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -24,6 +25,9 @@
import io.trino.parquet.ParquetDataSource;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.reader.MetadataReader;
import io.trino.plugin.deltalake.delete.PositionDeleteFilter;
import io.trino.plugin.deltalake.delete.RowPredicate;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HiveColumnHandle;
Expand All @@ -35,6 +39,7 @@
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
import io.trino.plugin.hive.parquet.TrinoParquetDataSource;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.LongArrayBlock;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -56,13 +61,15 @@
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.joda.time.DateTimeZone;
import org.roaringbitmap.RoaringBitmap;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -71,12 +78,15 @@
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.ROW_ID_COLUMN_NAME;
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.rowIdColumnHandle;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedNestedReaderEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetOptimizedReaderEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isParquetUseColumnIndex;
import static io.trino.plugin.deltalake.delete.DeletionVectors.readDeletionVectors;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.getColumnMappingMode;
import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.PARQUET_ROW_INDEX_COLUMN;
Expand Down Expand Up @@ -134,6 +144,11 @@ public ConnectorPageSource createPageSource(
.map(DeltaLakeColumnHandle.class::cast)
.collect(toImmutableList());

List<DeltaLakeColumnHandle> requiredColumns = ImmutableList.<DeltaLakeColumnHandle>builderWithExpectedSize(deltaLakeColumns.size() + 1)
.addAll(deltaLakeColumns)
.add(rowIdColumnHandle())
.build();

List<DeltaLakeColumnHandle> regularColumns = deltaLakeColumns.stream()
.filter(column -> (column.getColumnType() == REGULAR) || column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME))
.collect(toImmutableList());
Expand Down Expand Up @@ -166,6 +181,7 @@ public ConnectorPageSource createPageSource(
if (filteredSplitPredicate.isAll() &&
split.getStart() == 0 && split.getLength() == split.getFileSize() &&
split.getFileRowCount().isPresent() &&
split.getDeletionVector().isEmpty() &&
(regularColumns.isEmpty() || onlyRowIdColumn(regularColumns))) {
return new DeltaLakePageSource(
deltaLakeColumns,
Expand All @@ -176,7 +192,8 @@ public ConnectorPageSource createPageSource(
Optional.empty(),
split.getPath(),
split.getFileSize(),
split.getFileModifiedTime());
split.getFileModifiedTime(),
Optional::empty);
}

Location location = Location.of(split.getPath());
Expand All @@ -201,6 +218,9 @@ public ConnectorPageSource createPageSource(
hiveColumnHandles::add,
() -> missingColumnNames.add(column.getBaseColumnName()));
}
if (split.getDeletionVector().isPresent() && !regularColumns.contains(rowIdColumnHandle())) {
hiveColumnHandles.add(PARQUET_ROW_INDEX_COLUMN);
}

TupleDomain<HiveColumnHandle> parquetPredicate = getParquetTupleDomain(filteredSplitPredicate.simplify(domainCompactionThreshold), columnMappingMode, parquetFieldIdToName);

Expand All @@ -224,6 +244,14 @@ public ConnectorPageSource createPageSource(
column -> ((HiveColumnHandle) column).getType(),
HivePageSourceProvider::getProjection));

Supplier<Optional<RowPredicate>> deletePredicate = Suppliers.memoize(() -> {
if (split.getDeletionVector().isEmpty()) {
return Optional.empty();
}
PositionDeleteFilter deleteFilter = readDeletes(session, Location.of(split.getTableLocation()), split.getDeletionVector().get());
return Optional.of(deleteFilter.createPredicate(requiredColumns));
});

return new DeltaLakePageSource(
deltaLakeColumns,
missingColumnNames.build(),
Expand All @@ -233,7 +261,29 @@ public ConnectorPageSource createPageSource(
projectionsAdapter,
split.getPath(),
split.getFileSize(),
split.getFileModifiedTime());
split.getFileModifiedTime(),
deletePredicate);
}

private PositionDeleteFilter readDeletes(
ConnectorSession session,
Location tableLocation,
DeletionVectorEntry deletionVector)
{
try {
RoaringBitmap[] deletedRows = readDeletionVectors(
fileSystemFactory.create(session),
tableLocation,
deletionVector.storageType(),
deletionVector.pathOrInlineDv(),
deletionVector.offset(),
deletionVector.sizeInBytes(),
deletionVector.cardinality());
return new PositionDeleteFilter(deletedRows);
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Failed to read deletion vectors", e);
}
}

public Map<Integer, String> loadParquetIdAndNameMapping(TrinoInputFile inputFile, ParquetReaderOptions options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.SizeOf;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ConnectorSplit;
Expand All @@ -41,34 +42,40 @@ public class DeltaLakeSplit
{
private static final int INSTANCE_SIZE = instanceSize(DeltaLakeSplit.class);

private final String tableLocation;
private final String path;
private final long start;
private final long length;
private final long fileSize;
private final Optional<Long> fileRowCount;
private final long fileModifiedTime;
private final Optional<DeletionVectorEntry> deletionVector;
private final SplitWeight splitWeight;
private final TupleDomain<DeltaLakeColumnHandle> statisticsPredicate;
private final Map<String, Optional<String>> partitionKeys;

@JsonCreator
public DeltaLakeSplit(
@JsonProperty("tableLocation") String tableLocation,
@JsonProperty("path") String path,
@JsonProperty("start") long start,
@JsonProperty("length") long length,
@JsonProperty("fileSize") long fileSize,
@JsonProperty("rowCount") Optional<Long> fileRowCount,
@JsonProperty("fileModifiedTime") long fileModifiedTime,
@JsonProperty("deletionVector") Optional<DeletionVectorEntry> deletionVector,
@JsonProperty("splitWeight") SplitWeight splitWeight,
@JsonProperty("statisticsPredicate") TupleDomain<DeltaLakeColumnHandle> statisticsPredicate,
@JsonProperty("partitionKeys") Map<String, Optional<String>> partitionKeys)
{
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
this.path = requireNonNull(path, "path is null");
this.start = start;
this.length = length;
this.fileSize = fileSize;
this.fileRowCount = requireNonNull(fileRowCount, "rowCount is null");
this.fileModifiedTime = fileModifiedTime;
this.deletionVector = requireNonNull(deletionVector, "deletionVector is null");
this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
this.statisticsPredicate = requireNonNull(statisticsPredicate, "statisticsPredicate is null");
this.partitionKeys = requireNonNull(partitionKeys, "partitionKeys is null");
Expand All @@ -94,6 +101,12 @@ public SplitWeight getSplitWeight()
return splitWeight;
}

@JsonProperty
public String getTableLocation()
{
return tableLocation;
}

@JsonProperty
public String getPath()
{
Expand Down Expand Up @@ -130,6 +143,12 @@ public long getFileModifiedTime()
return fileModifiedTime;
}

@JsonProperty
public Optional<DeletionVectorEntry> getDeletionVector()
{
return deletionVector;
}

/**
* A TupleDomain representing the min/max statistics from the file this split was generated from. This does not contain any partitioning information.
*/
Expand All @@ -149,8 +168,10 @@ public Map<String, Optional<String>> getPartitionKeys()
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ estimatedSizeOf(tableLocation)
+ estimatedSizeOf(path)
+ sizeOf(fileRowCount, value -> LONG_INSTANCE_SIZE)
+ sizeOf(deletionVector, DeletionVectorEntry::sizeInBytes)
+ splitWeight.getRetainedSizeInBytes()
+ statisticsPredicate.getRetainedSizeInBytes(DeltaLakeColumnHandle::getRetainedSizeInBytes)
+ estimatedSizeOf(partitionKeys, SizeOf::estimatedSizeOf, value -> sizeOf(value, SizeOf::estimatedSizeOf));
Expand All @@ -170,11 +191,13 @@ public Object getInfo()
public String toString()
{
return toStringHelper(this)
.add("tableLocation", tableLocation)
.add("path", path)
.add("start", start)
.add("length", length)
.add("fileSize", fileSize)
.add("rowCount", fileRowCount)
.add("deletionVector", deletionVector)
.add("statisticsPredicate", statisticsPredicate)
.add("partitionKeys", partitionKeys)
.toString();
Expand All @@ -193,15 +216,17 @@ public boolean equals(Object o)
return start == that.start &&
length == that.length &&
fileSize == that.fileSize &&
tableLocation.equals(that.tableLocation) &&
path.equals(that.path) &&
fileRowCount.equals(that.fileRowCount) &&
deletionVector.equals(that.deletionVector) &&
Objects.equals(statisticsPredicate, that.statisticsPredicate) &&
Objects.equals(partitionKeys, that.partitionKeys);
}

@Override
public int hashCode()
{
return Objects.hash(path, start, length, fileSize, fileRowCount, statisticsPredicate, partitionKeys);
return Objects.hash(path, start, length, fileSize, fileRowCount, deletionVector, statisticsPredicate, partitionKeys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ private Stream<DeltaLakeSplit> getSplits(

return splitsForFile(
session,
tableHandle.location(),
addAction,
splitPath,
addAction.getCanonicalPartitionValues(),
Expand Down Expand Up @@ -273,6 +274,7 @@ private static boolean pathMatchesPredicate(Domain pathDomain, String path)

private List<DeltaLakeSplit> splitsForFile(
ConnectorSession session,
String tableLocation,
AddFileEntry addFileEntry,
String splitPath,
Map<String, Optional<String>> partitionKeys,
Expand All @@ -285,12 +287,14 @@ private List<DeltaLakeSplit> splitsForFile(
if (!splittable) {
// remainingInitialSplits is not used when !splittable
return ImmutableList.of(new DeltaLakeSplit(
tableLocation,
splitPath,
0,
fileSize,
fileSize,
addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords),
addFileEntry.getModificationTime(),
addFileEntry.getDeletionVector(),
SplitWeight.standard(),
statisticsPredicate,
partitionKeys));
Expand All @@ -309,12 +313,14 @@ private List<DeltaLakeSplit> splitsForFile(
long splitSize = Math.min(maxSplitSize, fileSize - currentOffset);

splits.add(new DeltaLakeSplit(
tableLocation,
splitPath,
currentOffset,
splitSize,
fileSize,
Optional.empty(),
addFileEntry.getModificationTime(),
addFileEntry.getDeletionVector(),
SplitWeight.fromProportion(Math.min(Math.max((double) splitSize / maxSplitSize, minimumAssignedSplitWeight), 1.0)),
statisticsPredicate,
partitionKeys));
Expand Down
Loading

0 comments on commit 865c973

Please sign in to comment.