diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 3ab6f5048b89..ae90d0e8997f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -45,7 +45,7 @@ import io.trino.plugin.deltalake.statistics.DeltaLakeTableStatisticsProvider; import io.trino.plugin.deltalake.statistics.ExtendedStatistics; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; -import io.trino.plugin.deltalake.transactionlog.CdfFileEntry; +import io.trino.plugin.deltalake.transactionlog.CdcEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; @@ -1565,7 +1565,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg .collect(partitioningBy(dataFile -> dataFile.getDataFileType() == DATA)); List newFiles = ImmutableList.copyOf(split.get(true)); - List cdfFiles = ImmutableList.copyOf(split.get(false)); + List cdcFiles = ImmutableList.copyOf(split.get(false)); if (mergeHandle.getInsertTableHandle().isRetriesEnabled()) { cleanExtraOutputFilesForUpdate(session, handle.getLocation(), allFiles); @@ -1592,8 +1592,8 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg long writeTimestamp = Instant.now().toEpochMilli(); - if (!cdfFiles.isEmpty()) { - appendCdfFileEntries(transactionLogWriter, cdfFiles, handle.getMetadataEntry().getOriginalPartitionColumns()); + if (!cdcFiles.isEmpty()) { + appendCdcFilesInfos(transactionLogWriter, cdcFiles, handle.getMetadataEntry().getOriginalPartitionColumns()); } for (String file : oldFiles) { @@ -1621,12 +1621,12 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg } } - private static void appendCdfFileEntries( + private static void appendCdcFilesInfos( TransactionLogWriter transactionLogWriter, - List cdfFilesInfos, + List cdcFilesInfos, List partitionColumnNames) { - for (DataFileInfo info : cdfFilesInfos) { + for (DataFileInfo info : cdcFilesInfos) { // using Hashmap because partition values can be null Map partitionValues = new HashMap<>(); for (int i = 0; i < partitionColumnNames.size(); i++) { @@ -1634,8 +1634,8 @@ private static void appendCdfFileEntries( } partitionValues = unmodifiableMap(partitionValues); - transactionLogWriter.appendCdfFileEntry( - new CdfFileEntry( + transactionLogWriter.appendCdcEntry( + new CdcEntry( toUriFormat(info.getPath()), partitionValues, info.getSize())); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java index f36bc4b37a36..17535a33efe7 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java @@ -18,7 +18,7 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; -import io.trino.plugin.deltalake.transactionlog.CdfFileEntry; +import io.trino.plugin.deltalake.transactionlog.CdcEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.spi.TrinoException; @@ -92,15 +92,15 @@ public CompletableFuture getNextBatch(int maxSize) // TODO boolean containsCdcEntry = false; boolean containsRemoveEntry = false; for (DeltaLakeTransactionLogEntry entry : entries) { - CdfFileEntry cdfFileEntry = entry.getCDC(); - if (cdfFileEntry != null) { + CdcEntry cdcEntry = entry.getCDC(); + if (cdcEntry != null) { containsCdcEntry = true; splits.add(mapToDeltaLakeTableChangesSplit( commitInfo, CDF_FILE, - cdfFileEntry.getSize(), - cdfFileEntry.getPath(), - cdfFileEntry.getCanonicalPartitionValues())); + cdcEntry.getSize(), + cdcEntry.getPath(), + cdcEntry.getCanonicalPartitionValues())); } if (entry.getRemove() != null && entry.getRemove().isDataChange()) { containsRemoveEntry = true; diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CdfFileEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CdcEntry.java similarity index 94% rename from plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CdfFileEntry.java rename to plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CdcEntry.java index d558ae89d05a..e8230630d654 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CdfFileEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CdcEntry.java @@ -23,7 +23,7 @@ import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.canonicalizePartitionValues; import static java.lang.String.format; -public class CdfFileEntry +public class CdcEntry { private final String path; private final Map partitionValues; @@ -32,7 +32,7 @@ public class CdfFileEntry private final boolean dataChange; @JsonCreator - public CdfFileEntry( + public CdcEntry( @JsonProperty("path") String path, @JsonProperty("partitionValues") Map partitionValues, @JsonProperty("size") long size) @@ -77,7 +77,7 @@ public boolean isDataChange() @Override public String toString() { - return format("CdfFileEntry{path=%s, partitionValues=%s, size=%d, dataChange=%b}", + return format("CdcEntry{path=%s, partitionValues=%s, size=%d, dataChange=%b}", path, partitionValues, size, dataChange); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogEntry.java index 0659969fe9b2..91c992403fdb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogEntry.java @@ -28,7 +28,7 @@ public class DeltaLakeTransactionLogEntry private final MetadataEntry metaData; private final ProtocolEntry protocol; private final CommitInfoEntry commitInfo; - private final CdfFileEntry cdfFileEntry; + private final CdcEntry cdcEntry; private DeltaLakeTransactionLogEntry( TransactionEntry txn, @@ -37,7 +37,7 @@ private DeltaLakeTransactionLogEntry( MetadataEntry metaData, ProtocolEntry protocol, CommitInfoEntry commitInfo, - CdfFileEntry cdfFileEntry) + CdcEntry cdcEntry) { this.txn = txn; this.add = add; @@ -45,7 +45,7 @@ private DeltaLakeTransactionLogEntry( this.metaData = metaData; this.protocol = protocol; this.commitInfo = commitInfo; - this.cdfFileEntry = cdfFileEntry; + this.cdcEntry = cdcEntry; } @JsonCreator @@ -56,9 +56,9 @@ public static DeltaLakeTransactionLogEntry fromJson( @JsonProperty("metaData") MetadataEntry metaData, @JsonProperty("protocol") ProtocolEntry protocol, @JsonProperty("commitInfo") CommitInfoEntry commitInfo, - @JsonProperty("cdc") CdfFileEntry cdfFileEntry) // TODO rename CdfFileEntry to CdcEntry https://github.com/trinodb/trino/issues/17183 + @JsonProperty("cdc") CdcEntry cdcEntry) { - return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdfFileEntry); + return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry); } public static DeltaLakeTransactionLogEntry transactionEntry(TransactionEntry transaction) @@ -97,10 +97,10 @@ public static DeltaLakeTransactionLogEntry removeFileEntry(RemoveFileEntry remov return new DeltaLakeTransactionLogEntry(null, null, removeFileEntry, null, null, null, null); } - public static DeltaLakeTransactionLogEntry cdfFileEntry(CdfFileEntry cdfFileEntry) + public static DeltaLakeTransactionLogEntry cdcEntry(CdcEntry cdcEntry) { - requireNonNull(cdfFileEntry, "cdfFileEntry is null"); - return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, cdfFileEntry); + requireNonNull(cdcEntry, "cdcEntry is null"); + return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, cdcEntry); } @Nullable @@ -147,19 +147,19 @@ public CommitInfoEntry getCommitInfo() @Nullable @JsonProperty - public CdfFileEntry getCDC() + public CdcEntry getCDC() { - return cdfFileEntry; + return cdcEntry; } public DeltaLakeTransactionLogEntry withCommitInfo(CommitInfoEntry commitInfo) { - return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdfFileEntry); + return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry); } @Override public String toString() { - return String.format("DeltaLakeTransactionLogEntry{%s, %s, %s, %s, %s, %s, %s}", txn, add, remove, metaData, protocol, commitInfo, cdfFileEntry); + return String.format("DeltaLakeTransactionLogEntry{%s, %s, %s, %s, %s, %s, %s}", txn, add, remove, metaData, protocol, commitInfo, cdcEntry); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogWriter.java index e16f87e372c1..70ae104b8f6d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogWriter.java @@ -17,7 +17,7 @@ import io.airlift.json.ObjectMapperProvider; import io.trino.filesystem.Location; import io.trino.plugin.deltalake.transactionlog.AddFileEntry; -import io.trino.plugin.deltalake.transactionlog.CdfFileEntry; +import io.trino.plugin.deltalake.transactionlog.CdcEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; @@ -81,9 +81,9 @@ public void appendRemoveFileEntry(RemoveFileEntry removeFileEntry) entries.add(DeltaLakeTransactionLogEntry.removeFileEntry(removeFileEntry)); } - public void appendCdfFileEntry(CdfFileEntry cdfFileEntry) + public void appendCdcEntry(CdcEntry cdcEntry) { - entries.add(DeltaLakeTransactionLogEntry.cdfFileEntry(cdfFileEntry)); + entries.add(DeltaLakeTransactionLogEntry.cdcEntry(cdcEntry)); } public boolean isUnsafe()