Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta Lake checkpoint cleanups #21195

Merged
merged 8 commits into from
Mar 22, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Convert RemoveFileEntry to record
electrum committed Mar 22, 2024
commit 75d9f505ac6c5504d7aabfc43f09ad6db68f4e0a
Original file line number Diff line number Diff line change
@@ -102,7 +102,7 @@ private Stream<ConnectorSplit> prepareSplits(long currentVersion, long tableRead
cdcEntry.getPath(),
cdcEntry.getCanonicalPartitionValues()));
}
if (entry.getRemove() != null && entry.getRemove().isDataChange()) {
if (entry.getRemove() != null && entry.getRemove().dataChange()) {
containsRemoveEntry = true;
}
}
Original file line number Diff line number Diff line change
@@ -221,7 +221,7 @@ private void doVacuum(
.collect(toImmutableList()))
.map(DeltaLakeTransactionLogEntry::getRemove)
.filter(Objects::nonNull)
.map(RemoveFileEntry::getPath))
.map(RemoveFileEntry::path))
.peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path))
.collect(toImmutableSet());
}
Original file line number Diff line number Diff line change
@@ -57,8 +57,8 @@ public DeltaLakeDataFileCacheEntry withUpdatesApplied(List<DeltaLakeTransactionL

RemoveFileEntry removeEntry = deltaLakeTransactionLogEntry.getRemove();
if (removeEntry != null) {
activeJsonEntries.remove(removeEntry.getPath());
removedFiles.add(removeEntry.getPath());
activeJsonEntries.remove(removeEntry.path());
removedFiles.add(removeEntry.path());
}
});

Original file line number Diff line number Diff line change
@@ -13,88 +13,20 @@
*/
package io.trino.plugin.deltalake.transactionlog;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;

import java.util.Map;
import java.util.Objects;

public class RemoveFileEntry
{
private final String path;
private final Map<String, String> partitionValues;
private final long deletionTimestamp;
private final boolean dataChange;

@JsonCreator
public RemoveFileEntry(
@JsonProperty("path") String path,
@JsonProperty("partitionValues") @Nullable Map<String, String> partitionValues,
@JsonProperty("deletionTimestamp") long deletionTimestamp,
@JsonProperty("dataChange") boolean dataChange)
{
this.path = path;
this.partitionValues = partitionValues;
this.deletionTimestamp = deletionTimestamp;
this.dataChange = dataChange;
}

@JsonProperty
public String getPath()
{
return path;
}

@Nullable
@JsonProperty
public Map<String, String> getPartitionValues()
{
return partitionValues;
}
import static java.util.Objects.requireNonNull;

@JsonProperty
public long getDeletionTimestamp()
{
return deletionTimestamp;
}

@JsonProperty
public boolean isDataChange()
{
return dataChange;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RemoveFileEntry that = (RemoveFileEntry) o;
return deletionTimestamp == that.deletionTimestamp &&
dataChange == that.dataChange &&
Objects.equals(path, that.path) &&
Objects.equals(partitionValues, that.partitionValues);
}

@Override
public int hashCode()
{
return Objects.hash(path, partitionValues, deletionTimestamp, dataChange);
}

@Override
public String toString()
public record RemoveFileEntry(
String path,
@Nullable Map<String, String> partitionValues,
long deletionTimestamp,
boolean dataChange)
{
public RemoveFileEntry
{
return "RemoveFileEntry{" +
"path='" + path + '\'' +
", partitionValues=" + partitionValues +
", deletionTimestamp=" + deletionTimestamp +
", dataChange=" + dataChange +
'}';
requireNonNull(path, "path is null");
}
}
Original file line number Diff line number Diff line change
@@ -355,7 +355,7 @@ private Stream<AddFileEntry> activeAddEntries(Stream<DeltaLakeTransactionLogEntr
addFilesInTransaction.put(deltaLakeTransactionLogEntry.getAdd().getPath(), deltaLakeTransactionLogEntry.getAdd());
}
else if (deltaLakeTransactionLogEntry.getRemove() != null) {
removedFilesInTransaction.add(deltaLakeTransactionLogEntry.getRemove().getPath());
removedFilesInTransaction.add(deltaLakeTransactionLogEntry.getRemove().path());
}
});

Original file line number Diff line number Diff line change
@@ -78,8 +78,8 @@ private void handleRemoveFileEntry(@Nullable RemoveFileEntry entry)
if (entry == null) {
return;
}
removeFileEntries.put(entry.getPath(), entry);
addFileEntries.remove(entry.getPath());
removeFileEntries.put(entry.path(), entry);
addFileEntries.remove(entry.path());
}

public CheckpointEntries build()
Original file line number Diff line number Diff line change
@@ -462,10 +462,10 @@ private void writeRemoveFileEntry(PageBuilder pageBuilder, RowType entryType, Re
{
pageBuilder.declarePosition();
((RowBlockBuilder) pageBuilder.getBlockBuilder(REMOVE_BLOCK_CHANNEL)).buildEntry(fieldBuilders -> {
writeString(fieldBuilders.get(0), entryType, 0, "path", removeFileEntry.getPath());
writeStringMap(fieldBuilders.get(1), entryType, 1, "partitionValues", removeFileEntry.getPartitionValues());
writeLong(fieldBuilders.get(2), entryType, 2, "deletionTimestamp", removeFileEntry.getDeletionTimestamp());
writeBoolean(fieldBuilders.get(3), entryType, 3, "dataChange", removeFileEntry.isDataChange());
writeString(fieldBuilders.get(0), entryType, 0, "path", removeFileEntry.path());
writeStringMap(fieldBuilders.get(1), entryType, 1, "partitionValues", removeFileEntry.partitionValues());
writeLong(fieldBuilders.get(2), entryType, 2, "deletionTimestamp", removeFileEntry.deletionTimestamp());
writeBoolean(fieldBuilders.get(3), entryType, 3, "dataChange", removeFileEntry.dataChange());
});

// null for others
Original file line number Diff line number Diff line change
@@ -66,15 +66,15 @@ public void testRemove()
.map(this::deserialize)
.map(DeltaLakeTransactionLogEntry::getRemove)
.filter(Objects::nonNull)
.map(RemoveFileEntry::getPath)
.map(RemoveFileEntry::path)
.filter(Objects::nonNull)
.count()).isEqualTo(6);

assertThat(readJsonTransactionLogs("deltalake/person/_delta_log")
.map(this::deserialize)
.map(DeltaLakeTransactionLogEntry::getRemove)
.filter(Objects::nonNull)
.map(RemoveFileEntry::getPath)
.map(RemoveFileEntry::path)
.filter(Objects::nonNull)
.count()).isEqualTo(6);
}
Original file line number Diff line number Diff line change
@@ -421,8 +421,8 @@ private void testAllGetRemoveEntries(String tableName, String resourcePath)
setupTransactionLogAccessFromResources(tableName, resourcePath);

try (Stream<RemoveFileEntry> removeEntries = transactionLogAccess.getRemoveEntries(SESSION, tableSnapshot)) {
Set<String> removedPaths = removeEntries.map(RemoveFileEntry::getPath).collect(Collectors.toSet());
Set<String> expectedPaths = EXPECTED_REMOVE_ENTRIES.stream().map(RemoveFileEntry::getPath).collect(Collectors.toSet());
Set<String> removedPaths = removeEntries.map(RemoveFileEntry::path).collect(Collectors.toSet());
Set<String> expectedPaths = EXPECTED_REMOVE_ENTRIES.stream().map(RemoveFileEntry::path).collect(Collectors.toSet());

assertThat(removedPaths).isEqualTo(expectedPaths);
}