Skip to content

Commit

Permalink
Convert RemoveFileEntry to record
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Mar 22, 2024
1 parent b614b67 commit 309c7d8
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private Stream<ConnectorSplit> prepareSplits(long currentVersion, long tableRead
cdcEntry.getPath(),
cdcEntry.getCanonicalPartitionValues()));
}
if (entry.getRemove() != null && entry.getRemove().isDataChange()) {
if (entry.getRemove() != null && entry.getRemove().dataChange()) {
containsRemoveEntry = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private void doVacuum(
.collect(toImmutableList()))
.map(DeltaLakeTransactionLogEntry::getRemove)
.filter(Objects::nonNull)
.map(RemoveFileEntry::getPath))
.map(RemoveFileEntry::path))
.peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path))
.collect(toImmutableSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public DeltaLakeDataFileCacheEntry withUpdatesApplied(List<DeltaLakeTransactionL

RemoveFileEntry removeEntry = deltaLakeTransactionLogEntry.getRemove();
if (removeEntry != null) {
activeJsonEntries.remove(removeEntry.getPath());
removedFiles.add(removeEntry.getPath());
activeJsonEntries.remove(removeEntry.path());
removedFiles.add(removeEntry.path());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,88 +13,20 @@
*/
package io.trino.plugin.deltalake.transactionlog;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.annotation.Nullable;

import java.util.Map;
import java.util.Objects;

public class RemoveFileEntry
{
private final String path;
private final Map<String, String> partitionValues;
private final long deletionTimestamp;
private final boolean dataChange;

@JsonCreator
public RemoveFileEntry(
@JsonProperty("path") String path,
@JsonProperty("partitionValues") @Nullable Map<String, String> partitionValues,
@JsonProperty("deletionTimestamp") long deletionTimestamp,
@JsonProperty("dataChange") boolean dataChange)
{
this.path = path;
this.partitionValues = partitionValues;
this.deletionTimestamp = deletionTimestamp;
this.dataChange = dataChange;
}

@JsonProperty
public String getPath()
{
return path;
}

@Nullable
@JsonProperty
public Map<String, String> getPartitionValues()
{
return partitionValues;
}
import static java.util.Objects.requireNonNull;

@JsonProperty
public long getDeletionTimestamp()
{
return deletionTimestamp;
}

@JsonProperty
public boolean isDataChange()
{
return dataChange;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RemoveFileEntry that = (RemoveFileEntry) o;
return deletionTimestamp == that.deletionTimestamp &&
dataChange == that.dataChange &&
Objects.equals(path, that.path) &&
Objects.equals(partitionValues, that.partitionValues);
}

@Override
public int hashCode()
{
return Objects.hash(path, partitionValues, deletionTimestamp, dataChange);
}

@Override
public String toString()
public record RemoveFileEntry(
String path,
@Nullable Map<String, String> partitionValues,
long deletionTimestamp,
boolean dataChange)
{
public RemoveFileEntry
{
return "RemoveFileEntry{" +
"path='" + path + '\'' +
", partitionValues=" + partitionValues +
", deletionTimestamp=" + deletionTimestamp +
", dataChange=" + dataChange +
'}';
requireNonNull(path, "path is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private Stream<AddFileEntry> activeAddEntries(Stream<DeltaLakeTransactionLogEntr
addFilesInTransaction.put(deltaLakeTransactionLogEntry.getAdd().getPath(), deltaLakeTransactionLogEntry.getAdd());
}
else if (deltaLakeTransactionLogEntry.getRemove() != null) {
removedFilesInTransaction.add(deltaLakeTransactionLogEntry.getRemove().getPath());
removedFilesInTransaction.add(deltaLakeTransactionLogEntry.getRemove().path());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private void handleRemoveFileEntry(@Nullable RemoveFileEntry entry)
if (entry == null) {
return;
}
removeFileEntries.put(entry.getPath(), entry);
addFileEntries.remove(entry.getPath());
removeFileEntries.put(entry.path(), entry);
addFileEntries.remove(entry.path());
}

public CheckpointEntries build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,10 @@ private void writeRemoveFileEntry(PageBuilder pageBuilder, RowType entryType, Re
{
pageBuilder.declarePosition();
((RowBlockBuilder) pageBuilder.getBlockBuilder(REMOVE_BLOCK_CHANNEL)).buildEntry(fieldBuilders -> {
writeString(fieldBuilders.get(0), entryType, 0, "path", removeFileEntry.getPath());
writeStringMap(fieldBuilders.get(1), entryType, 1, "partitionValues", removeFileEntry.getPartitionValues());
writeLong(fieldBuilders.get(2), entryType, 2, "deletionTimestamp", removeFileEntry.getDeletionTimestamp());
writeBoolean(fieldBuilders.get(3), entryType, 3, "dataChange", removeFileEntry.isDataChange());
writeString(fieldBuilders.get(0), entryType, 0, "path", removeFileEntry.path());
writeStringMap(fieldBuilders.get(1), entryType, 1, "partitionValues", removeFileEntry.partitionValues());
writeLong(fieldBuilders.get(2), entryType, 2, "deletionTimestamp", removeFileEntry.deletionTimestamp());
writeBoolean(fieldBuilders.get(3), entryType, 3, "dataChange", removeFileEntry.dataChange());
});

// null for others
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ public void testRemove()
.map(this::deserialize)
.map(DeltaLakeTransactionLogEntry::getRemove)
.filter(Objects::nonNull)
.map(RemoveFileEntry::getPath)
.map(RemoveFileEntry::path)
.filter(Objects::nonNull)
.count()).isEqualTo(6);

assertThat(readJsonTransactionLogs("deltalake/person/_delta_log")
.map(this::deserialize)
.map(DeltaLakeTransactionLogEntry::getRemove)
.filter(Objects::nonNull)
.map(RemoveFileEntry::getPath)
.map(RemoveFileEntry::path)
.filter(Objects::nonNull)
.count()).isEqualTo(6);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ private void testAllGetRemoveEntries(String tableName, String resourcePath)
setupTransactionLogAccessFromResources(tableName, resourcePath);

try (Stream<RemoveFileEntry> removeEntries = transactionLogAccess.getRemoveEntries(SESSION, tableSnapshot)) {
Set<String> removedPaths = removeEntries.map(RemoveFileEntry::getPath).collect(Collectors.toSet());
Set<String> expectedPaths = EXPECTED_REMOVE_ENTRIES.stream().map(RemoveFileEntry::getPath).collect(Collectors.toSet());
Set<String> removedPaths = removeEntries.map(RemoveFileEntry::path).collect(Collectors.toSet());
Set<String> expectedPaths = EXPECTED_REMOVE_ENTRIES.stream().map(RemoveFileEntry::path).collect(Collectors.toSet());

assertThat(removedPaths).isEqualTo(expectedPaths);
}
Expand Down

0 comments on commit 309c7d8

Please sign in to comment.