Skip to content

Commit

Permalink
Rename CdfFileEntry to CdcEntry
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinsbd authored and ebyhr committed May 16, 2023
1 parent ebed7dc commit 50ea0ba
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.trino.plugin.deltalake.statistics.DeltaLakeTableStatisticsProvider;
import io.trino.plugin.deltalake.statistics.ExtendedStatistics;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdfFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdcEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
Expand Down Expand Up @@ -1565,7 +1565,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg
.collect(partitioningBy(dataFile -> dataFile.getDataFileType() == DATA));

List<DataFileInfo> newFiles = ImmutableList.copyOf(split.get(true));
List<DataFileInfo> cdfFiles = ImmutableList.copyOf(split.get(false));
List<DataFileInfo> cdcFiles = ImmutableList.copyOf(split.get(false));

if (mergeHandle.getInsertTableHandle().isRetriesEnabled()) {
cleanExtraOutputFilesForUpdate(session, handle.getLocation(), allFiles);
Expand All @@ -1592,8 +1592,8 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg

long writeTimestamp = Instant.now().toEpochMilli();

if (!cdfFiles.isEmpty()) {
appendCdfFileEntries(transactionLogWriter, cdfFiles, handle.getMetadataEntry().getOriginalPartitionColumns());
if (!cdcFiles.isEmpty()) {
appendCdcFilesInfos(transactionLogWriter, cdcFiles, handle.getMetadataEntry().getOriginalPartitionColumns());
}

for (String file : oldFiles) {
Expand Down Expand Up @@ -1621,21 +1621,21 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg
}
}

private static void appendCdfFileEntries(
private static void appendCdcFilesInfos(
TransactionLogWriter transactionLogWriter,
List<DataFileInfo> cdfFilesInfos,
List<DataFileInfo> cdcFilesInfos,
List<String> partitionColumnNames)
{
for (DataFileInfo info : cdfFilesInfos) {
for (DataFileInfo info : cdcFilesInfos) {
// using Hashmap because partition values can be null
Map<String, String> partitionValues = new HashMap<>();
for (int i = 0; i < partitionColumnNames.size(); i++) {
partitionValues.put(partitionColumnNames.get(i), info.getPartitionValues().get(i));
}
partitionValues = unmodifiableMap(partitionValues);

transactionLogWriter.appendCdfFileEntry(
new CdfFileEntry(
transactionLogWriter.appendCdcEntry(
new CdcEntry(
toUriFormat(info.getPath()),
partitionValues,
info.getSize()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdfFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdcEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -92,15 +92,15 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize) // TODO
boolean containsCdcEntry = false;
boolean containsRemoveEntry = false;
for (DeltaLakeTransactionLogEntry entry : entries) {
CdfFileEntry cdfFileEntry = entry.getCDC();
if (cdfFileEntry != null) {
CdcEntry cdcEntry = entry.getCDC();
if (cdcEntry != null) {
containsCdcEntry = true;
splits.add(mapToDeltaLakeTableChangesSplit(
commitInfo,
CDF_FILE,
cdfFileEntry.getSize(),
cdfFileEntry.getPath(),
cdfFileEntry.getCanonicalPartitionValues()));
cdcEntry.getSize(),
cdcEntry.getPath(),
cdcEntry.getCanonicalPartitionValues()));
}
if (entry.getRemove() != null && entry.getRemove().isDataChange()) {
containsRemoveEntry = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues;
import static java.lang.String.format;

public class CdfFileEntry
public class CdcEntry
{
private final String path;
private final Map<String, String> partitionValues;
Expand All @@ -32,7 +32,7 @@ public class CdfFileEntry
private final boolean dataChange;

@JsonCreator
public CdfFileEntry(
public CdcEntry(
@JsonProperty("path") String path,
@JsonProperty("partitionValues") Map<String, String> partitionValues,
@JsonProperty("size") long size)
Expand Down Expand Up @@ -77,7 +77,7 @@ public boolean isDataChange()
@Override
public String toString()
{
return format("CdfFileEntry{path=%s, partitionValues=%s, size=%d, dataChange=%b}",
return format("CdcEntry{path=%s, partitionValues=%s, size=%d, dataChange=%b}",
path, partitionValues, size, dataChange);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DeltaLakeTransactionLogEntry
private final MetadataEntry metaData;
private final ProtocolEntry protocol;
private final CommitInfoEntry commitInfo;
private final CdfFileEntry cdfFileEntry;
private final CdcEntry cdcEntry;

private DeltaLakeTransactionLogEntry(
TransactionEntry txn,
Expand All @@ -37,15 +37,15 @@ private DeltaLakeTransactionLogEntry(
MetadataEntry metaData,
ProtocolEntry protocol,
CommitInfoEntry commitInfo,
CdfFileEntry cdfFileEntry)
CdcEntry cdcEntry)
{
this.txn = txn;
this.add = add;
this.remove = remove;
this.metaData = metaData;
this.protocol = protocol;
this.commitInfo = commitInfo;
this.cdfFileEntry = cdfFileEntry;
this.cdcEntry = cdcEntry;
}

@JsonCreator
Expand All @@ -56,9 +56,9 @@ public static DeltaLakeTransactionLogEntry fromJson(
@JsonProperty("metaData") MetadataEntry metaData,
@JsonProperty("protocol") ProtocolEntry protocol,
@JsonProperty("commitInfo") CommitInfoEntry commitInfo,
@JsonProperty("cdc") CdfFileEntry cdfFileEntry) // TODO rename CdfFileEntry to CdcEntry https://github.com/trinodb/trino/issues/17183
@JsonProperty("cdc") CdcEntry cdcEntry)
{
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdfFileEntry);
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
}

public static DeltaLakeTransactionLogEntry transactionEntry(TransactionEntry transaction)
Expand Down Expand Up @@ -97,10 +97,10 @@ public static DeltaLakeTransactionLogEntry removeFileEntry(RemoveFileEntry remov
return new DeltaLakeTransactionLogEntry(null, null, removeFileEntry, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry cdfFileEntry(CdfFileEntry cdfFileEntry)
public static DeltaLakeTransactionLogEntry cdcEntry(CdcEntry cdcEntry)
{
requireNonNull(cdfFileEntry, "cdfFileEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, cdfFileEntry);
requireNonNull(cdcEntry, "cdcEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, cdcEntry);
}

@Nullable
Expand Down Expand Up @@ -147,19 +147,19 @@ public CommitInfoEntry getCommitInfo()

@Nullable
@JsonProperty
public CdfFileEntry getCDC()
public CdcEntry getCDC()
{
return cdfFileEntry;
return cdcEntry;
}

public DeltaLakeTransactionLogEntry withCommitInfo(CommitInfoEntry commitInfo)
{
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdfFileEntry);
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
}

@Override
public String toString()
{
return String.format("DeltaLakeTransactionLogEntry{%s, %s, %s, %s, %s, %s, %s}", txn, add, remove, metaData, protocol, commitInfo, cdfFileEntry);
return String.format("DeltaLakeTransactionLogEntry{%s, %s, %s, %s, %s, %s, %s}", txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.airlift.json.ObjectMapperProvider;
import io.trino.filesystem.Location;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdfFileEntry;
import io.trino.plugin.deltalake.transactionlog.CdcEntry;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
Expand Down Expand Up @@ -81,9 +81,9 @@ public void appendRemoveFileEntry(RemoveFileEntry removeFileEntry)
entries.add(DeltaLakeTransactionLogEntry.removeFileEntry(removeFileEntry));
}

public void appendCdfFileEntry(CdfFileEntry cdfFileEntry)
public void appendCdcEntry(CdcEntry cdcEntry)
{
entries.add(DeltaLakeTransactionLogEntry.cdfFileEntry(cdfFileEntry));
entries.add(DeltaLakeTransactionLogEntry.cdcEntry(cdcEntry));
}

public boolean isUnsafe()
Expand Down

0 comments on commit 50ea0ba

Please sign in to comment.