Skip to content

Commit

Permalink
fixup! Support reading deletion vectors in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Aug 25, 2023
1 parent 05aca34 commit a527e66
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
Expand Down Expand Up @@ -159,9 +158,9 @@ public List<DeltaLakeTransactionLogEntry> getJsonTransactionLogEntries()
return logTail.getFileEntries();
}

public Map<Long, List<DeltaLakeTransactionLogEntry>> getJsonTransactionLogVersionAndEntries()
public List<Transaction> getTransactions()
{
return logTail.getVersionAndFileEntries();
return logTail.getTransactions();
}

public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.collect.ImmutableList;

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public record Transaction(long transactionId, List<DeltaLakeTransactionLogEntry> transactionEntries)
{
public Transaction
{
checkArgument(transactionId >= 0, "transactionId must be >= 0");
transactionEntries = ImmutableList.copyOf(requireNonNull(transactionEntries, "transactionEntries is null"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import static io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator.EntryType.REMOVE;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static java.lang.String.format;
import static java.util.Map.Entry.comparingByKey;
import static java.util.Objects.requireNonNull;

public class TransactionLogAccess
Expand Down Expand Up @@ -285,18 +284,18 @@ public static ImmutableList<DeltaLakeColumnMetadata> columnsWithStats(List<Delta
.collect(toImmutableList());
}

private Stream<AddFileEntry> activeAddEntries(Stream<DeltaLakeTransactionLogEntry> checkpointEntries, Map<Long, List<DeltaLakeTransactionLogEntry>> jsonEntries)
private Stream<AddFileEntry> activeAddEntries(Stream<DeltaLakeTransactionLogEntry> checkpointEntries, List<Transaction> transactions)
{
Map<String, AddFileEntry> activeJsonEntries = new LinkedHashMap<>();
HashSet<String> removedFiles = new HashSet<>();

// The json entries containing the last few entries in the log need to be applied on top of the parquet snapshot:
// - Any files which have been removed need to be excluded
// - Any files with newer add actions need to be updated with the most recent metadata
jsonEntries.entrySet().stream().sorted(comparingByKey()).forEachOrdered(deltaLakeTransactionLogEntries -> {
transactions.forEach(transaction -> {
Map<String, AddFileEntry> addFilesInTransaction = new LinkedHashMap<>();
Set<String> removedFilesInTransaction = new HashSet<>();
deltaLakeTransactionLogEntries.getValue().forEach(deltaLakeTransactionLogEntry -> {
transaction.transactionEntries().forEach(deltaLakeTransactionLogEntry -> {
if (deltaLakeTransactionLogEntry.getAdd() != null) {
addFilesInTransaction.put(deltaLakeTransactionLogEntry.getAdd().getPath(), deltaLakeTransactionLogEntry.getAdd());
}
Expand Down Expand Up @@ -374,19 +373,19 @@ public Stream<CommitInfoEntry> getCommitInfoEntries(TableSnapshot tableSnapshot,
private <T> Stream<T> getEntries(
TableSnapshot tableSnapshot,
Set<CheckpointEntryIterator.EntryType> entryTypes,
BiFunction<Stream<DeltaLakeTransactionLogEntry>, Map<Long, List<DeltaLakeTransactionLogEntry>>, Stream<T>> entryMapper,
BiFunction<Stream<DeltaLakeTransactionLogEntry>, List<Transaction>, Stream<T>> entryMapper,
ConnectorSession session,
TrinoFileSystem fileSystem,
FileFormatDataSourceStats stats)
{
try {
Map<Long, List<DeltaLakeTransactionLogEntry>> jsonEntries = tableSnapshot.getJsonTransactionLogVersionAndEntries();
List<Transaction> transactions = tableSnapshot.getTransactions();
Stream<DeltaLakeTransactionLogEntry> checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries(
session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats);

return entryMapper.apply(
checkpointEntries,
jsonEntries);
transactions);
}
catch (IOException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error reading transaction log for " + tableSnapshot.getTable(), e);
Expand All @@ -407,7 +406,7 @@ private <T> Stream<T> getEntries(
return getEntries(
tableSnapshot,
ImmutableSet.of(entryType),
(checkpointStream, jsonStream) -> entryMapper.apply(Stream.concat(checkpointStream, jsonStream.values().stream().flatMap(Collection::stream))),
(checkpointStream, jsonStream) -> entryMapper.apply(Stream.concat(checkpointStream, jsonStream.stream().map(Transaction::transactionEntries).flatMap(Collection::stream))),
session,
fileSystem,
stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,19 @@
package io.trino.plugin.deltalake.transactionlog.checkpoint;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MissingTransactionLogException;
import io.trino.plugin.deltalake.transactionlog.Transaction;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -41,12 +40,12 @@ public class TransactionLogTail
{
private static final int JSON_LOG_ENTRY_READ_BUFFER_SIZE = 1024 * 1024;

private final Map<Long, List<DeltaLakeTransactionLogEntry>> entries;
private final List<Transaction> entries;
private final long version;

private TransactionLogTail(Map<Long, List<DeltaLakeTransactionLogEntry>> entries, long version)
private TransactionLogTail(List<Transaction> entries, long version)
{
this.entries = ImmutableMap.copyOf(requireNonNull(entries, "entries is null"));
this.entries = ImmutableList.copyOf(requireNonNull(entries, "entries is null"));
this.version = version;
}

Expand All @@ -67,7 +66,7 @@ public static TransactionLogTail loadNewTail(
Optional<Long> endVersion)
throws IOException
{
ImmutableMap.Builder<Long, List<DeltaLakeTransactionLogEntry>> entriesBuilder = ImmutableMap.builder();
ImmutableList.Builder<Transaction> entriesBuilder = ImmutableList.builder();

long version = startVersion.orElse(0L);
long entryNumber = startVersion.map(start -> start + 1).orElse(0L);
Expand All @@ -79,7 +78,7 @@ public static TransactionLogTail loadNewTail(
while (!endOfTail) {
results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem);
if (results.isPresent()) {
entriesBuilder.put(entryNumber, results.get());
entriesBuilder.add(new Transaction(entryNumber, results.get()));
version = entryNumber;
entryNumber++;
}
Expand All @@ -95,13 +94,13 @@ public static TransactionLogTail loadNewTail(
}
}

return new TransactionLogTail(entriesBuilder.buildOrThrow(), version);
return new TransactionLogTail(entriesBuilder.build(), version);
}

public Optional<TransactionLogTail> getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation)
throws IOException
{
ImmutableMap.Builder<Long, List<DeltaLakeTransactionLogEntry>> entriesBuilder = ImmutableMap.builder();
ImmutableList.Builder<Transaction> entriesBuilder = ImmutableList.builder();

long newVersion = version;

Expand All @@ -112,9 +111,9 @@ public Optional<TransactionLogTail> getUpdatedTail(TrinoFileSystem fileSystem, S
if (results.isPresent()) {
if (version == newVersion) {
// initialize entriesBuilder with entries we have already read
entriesBuilder.putAll(entries);
entriesBuilder.addAll(entries);
}
entriesBuilder.put(newVersion + 1, results.get());
entriesBuilder.add(new Transaction(newVersion + 1, results.get()));
newVersion++;
}
else {
Expand All @@ -125,7 +124,7 @@ public Optional<TransactionLogTail> getUpdatedTail(TrinoFileSystem fileSystem, S
if (newVersion == version) {
return Optional.empty();
}
return Optional.of(new TransactionLogTail(entriesBuilder.buildOrThrow(), newVersion));
return Optional.of(new TransactionLogTail(entriesBuilder.build(), newVersion));
}

public static Optional<List<DeltaLakeTransactionLogEntry>> getEntriesFromJson(long entryNumber, String transactionLogDir, TrinoFileSystem fileSystem)
Expand Down Expand Up @@ -157,13 +156,10 @@ public static Optional<List<DeltaLakeTransactionLogEntry>> getEntriesFromJson(lo

public List<DeltaLakeTransactionLogEntry> getFileEntries()
{
return entries.values().stream().flatMap(Collection::stream).collect(toImmutableList());
return entries.stream().map(Transaction::transactionEntries).flatMap(Collection::stream).collect(toImmutableList());
}

/**
* @return transaction log entries with version keys
*/
public Map<Long, List<DeltaLakeTransactionLogEntry>> getVersionAndFileEntries()
public List<Transaction> getTransactions()
{
return entries;
}
Expand Down

0 comments on commit a527e66

Please sign in to comment.