diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java index 5690662ea578..b5027572db10 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java @@ -221,20 +221,20 @@ private List buildPages(ConnectorSession session, List co commitInfoEntries.forEach(commitInfoEntry -> { pagesBuilder.beginRow(); - pagesBuilder.appendBigint(commitInfoEntry.getVersion()); - pagesBuilder.appendTimestampTzMillis(commitInfoEntry.getTimestamp(), timeZoneKey); - write(commitInfoEntry.getUserId(), pagesBuilder); - write(commitInfoEntry.getUserName(), pagesBuilder); - write(commitInfoEntry.getOperation(), pagesBuilder); - if (commitInfoEntry.getOperationParameters() == null) { + pagesBuilder.appendBigint(commitInfoEntry.version()); + pagesBuilder.appendTimestampTzMillis(commitInfoEntry.timestamp(), timeZoneKey); + write(commitInfoEntry.userId(), pagesBuilder); + write(commitInfoEntry.userName(), pagesBuilder); + write(commitInfoEntry.operation(), pagesBuilder); + if (commitInfoEntry.operationParameters() == null) { pagesBuilder.appendNull(); } else { - pagesBuilder.appendVarcharVarcharMap(commitInfoEntry.getOperationParameters()); + pagesBuilder.appendVarcharVarcharMap(commitInfoEntry.operationParameters()); } - write(commitInfoEntry.getClusterId(), pagesBuilder); - pagesBuilder.appendBigint(commitInfoEntry.getReadVersion()); - write(commitInfoEntry.getIsolationLevel(), pagesBuilder); + write(commitInfoEntry.clusterId(), pagesBuilder); + pagesBuilder.appendBigint(commitInfoEntry.readVersion()); + write(commitInfoEntry.isolationLevel(), pagesBuilder); commitInfoEntry.isBlindAppend().ifPresentOrElse(pagesBuilder::appendBoolean, pagesBuilder::appendNull); pagesBuilder.endRow(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 0f4583a6a3db..8f62a5f657ae 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -569,11 +569,11 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa if (protocolEntry == null) { return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable())); } - if (protocolEntry.getMinReaderVersion() > MAX_READER_VERSION) { - LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.getMinReaderVersion()); + if (protocolEntry.minReaderVersion() > MAX_READER_VERSION) { + LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.minReaderVersion()); return null; } - Set unsupportedReaderFeatures = unsupportedReaderFeatures(protocolEntry.getReaderFeatures().orElse(ImmutableSet.of())); + Set unsupportedReaderFeatures = unsupportedReaderFeatures(protocolEntry.readerFeatures().orElse(ImmutableSet.of())); if (!unsupportedReaderFeatures.isEmpty()) { LOG.debug("Skip %s because the table contains unsupported reader features: %s", tableName, unsupportedReaderFeatures); return null; @@ -1461,14 +1461,14 @@ private ProtocolEntry buildProtocolEntryForNewColumn(ProtocolEntry protocolEntry } return new ProtocolEntry( - max(protocolEntry.getMinReaderVersion(), TIMESTAMP_NTZ_SUPPORTED_READER_VERSION), - max(protocolEntry.getMinWriterVersion(), TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION), + max(protocolEntry.minReaderVersion(), TIMESTAMP_NTZ_SUPPORTED_READER_VERSION), + max(protocolEntry.minWriterVersion(), TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION), Optional.of(ImmutableSet.builder() - .addAll(protocolEntry.getReaderFeatures().orElse(ImmutableSet.of())) + .addAll(protocolEntry.readerFeatures().orElse(ImmutableSet.of())) .add(TIMESTAMP_NTZ_FEATURE_NAME) .build()), Optional.of(ImmutableSet.builder() - .addAll(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of())) + .addAll(protocolEntry.writerFeatures().orElse(ImmutableSet.of())) .add(TIMESTAMP_NTZ_FEATURE_NAME) .build())); } @@ -2341,7 +2341,7 @@ private void checkWriteSupported(DeltaLakeTableHandle handle) private static void checkUnsupportedWriterFeatures(ProtocolEntry protocolEntry) { - Set unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of())); + Set unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.writerFeatures().orElse(ImmutableSet.of())); if (!unsupportedWriterFeatures.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Unsupported writer features: " + unsupportedWriterFeatures); } @@ -2357,7 +2357,7 @@ private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry) private void checkSupportedWriterVersion(DeltaLakeTableHandle handle) { - int requiredWriterVersion = handle.getProtocolEntry().getMinWriterVersion(); + int requiredWriterVersion = handle.getProtocolEntry().minWriterVersion(); if (requiredWriterVersion > MAX_WRITER_VERSION) { throw new TrinoException( NOT_SUPPORTED, @@ -2528,7 +2528,7 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta long createdTime = Instant.now().toEpochMilli(); - int requiredWriterVersion = currentProtocolEntry.getMinWriterVersion(); + int requiredWriterVersion = currentProtocolEntry.minWriterVersion(); Optional metadataEntry = Optional.empty(); if (properties.containsKey(CHANGE_DATA_FEED_ENABLED_PROPERTY)) { boolean changeDataFeedEnabled = (Boolean) properties.get(CHANGE_DATA_FEED_ENABLED_PROPERTY) @@ -2550,8 +2550,8 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta long commitVersion = readVersion + 1; Optional protocolEntry = Optional.empty(); - if (requiredWriterVersion != currentProtocolEntry.getMinWriterVersion()) { - protocolEntry = Optional.of(new ProtocolEntry(currentProtocolEntry.getMinReaderVersion(), requiredWriterVersion, currentProtocolEntry.getReaderFeatures(), currentProtocolEntry.getWriterFeatures())); + if (requiredWriterVersion != currentProtocolEntry.minWriterVersion()) { + protocolEntry = Optional.of(new ProtocolEntry(currentProtocolEntry.minReaderVersion(), requiredWriterVersion, currentProtocolEntry.readerFeatures(), currentProtocolEntry.writerFeatures())); } try { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePropertiesTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePropertiesTable.java index 1bc17e633828..14a2a9755bcb 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePropertiesTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePropertiesTable.java @@ -107,17 +107,17 @@ private List buildPages(MetadataEntry metadataEntry, ProtocolEntry protoco pagesBuilder.beginRow(); pagesBuilder.appendVarchar(MIN_READER_VERSION_KEY); - pagesBuilder.appendVarchar(String.valueOf(protocolEntry.getMinReaderVersion())); + pagesBuilder.appendVarchar(String.valueOf(protocolEntry.minReaderVersion())); pagesBuilder.endRow(); pagesBuilder.beginRow(); pagesBuilder.appendVarchar(MIN_WRITER_VERSION_KEY); - pagesBuilder.appendVarchar(String.valueOf(protocolEntry.getMinWriterVersion())); + pagesBuilder.appendVarchar(String.valueOf(protocolEntry.minWriterVersion())); pagesBuilder.endRow(); ImmutableSet.builder() - .addAll(protocolEntry.getReaderFeatures().orElseGet(ImmutableSet::of)) - .addAll(protocolEntry.getWriterFeatures().orElseGet(ImmutableSet::of)) + .addAll(protocolEntry.readerFeatures().orElseGet(ImmutableSet::of)) + .addAll(protocolEntry.writerFeatures().orElseGet(ImmutableSet::of)) .build().forEach(feature -> { pagesBuilder.beginRow(); pagesBuilder.appendVarchar(DELTA_FEATURE_PREFIX + feature); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java index a1d9c0da30d8..dc6446a02d9b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesSplitSource.java @@ -102,7 +102,7 @@ private Stream prepareSplits(long currentVersion, long tableRead cdcEntry.getPath(), cdcEntry.getCanonicalPartitionValues())); } - if (entry.getRemove() != null && entry.getRemove().isDataChange()) { + if (entry.getRemove() != null && entry.getRemove().dataChange()) { containsRemoveEntry = true; } } @@ -154,9 +154,9 @@ private TableChangesSplit mapToDeltaLakeTableChangesSplit( path, length, canonicalPartitionValues, - commitInfoEntry.getTimestamp(), + commitInfoEntry.timestamp(), source, - commitInfoEntry.getVersion()); + commitInfoEntry.version()); } @Override diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index b0b410db391b..4cec45f44e08 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -183,10 +183,10 @@ private void doVacuum( TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), handle.getReadVersion()); ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot); - if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) { - throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion())); + if (protocolEntry.minWriterVersion() > MAX_WRITER_VERSION) { + throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.minWriterVersion())); } - Set unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of())); + Set unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.writerFeatures().orElse(ImmutableSet.of())); if (!unsupportedWriterFeatures.isEmpty()) { throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(unsupportedWriterFeatures)); } @@ -221,7 +221,7 @@ private void doVacuum( .collect(toImmutableList())) .map(DeltaLakeTransactionLogEntry::getRemove) .filter(Objects::nonNull) - .map(RemoveFileEntry::getPath)) + .map(RemoveFileEntry::path)) .peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path)) .collect(toImmutableSet()); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CommitInfoEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CommitInfoEntry.java index 2800fb5fb9d0..44a99a59ae61 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CommitInfoEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/CommitInfoEntry.java @@ -13,129 +13,28 @@ */ package io.trino.plugin.deltalake.transactionlog; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - import java.util.Map; -import java.util.Objects; import java.util.Optional; -import static java.lang.String.format; - -public class CommitInfoEntry +import static java.util.Objects.requireNonNull; + +public record CommitInfoEntry( + long version, + long timestamp, + String userId, + String userName, + String operation, + Map operationParameters, + Job job, + Notebook notebook, + String clusterId, + long readVersion, + String isolationLevel, + Optional isBlindAppend) { - private final long version; - private final long timestamp; - private final String userId; - private final String userName; - private final String operation; - private final Map operationParameters; - private final Job job; - private final Notebook notebook; - private final String clusterId; - private final long readVersion; - private final String isolationLevel; - private final Optional isBlindAppend; - - @JsonCreator - public CommitInfoEntry( - @JsonProperty("version") long version, - @JsonProperty("timestamp") long timestamp, - @JsonProperty("userId") String userId, - @JsonProperty("userName") String userName, - @JsonProperty("operation") String operation, - @JsonProperty("operationParameters") Map operationParameters, - @JsonProperty("job") Job job, - @JsonProperty("notebook") Notebook notebook, - @JsonProperty("clusterId") String clusterId, - @JsonProperty("readVersion") long readVersion, - @JsonProperty("isolationLevel") String isolationLevel, - @JsonProperty("isBlindAppend") Optional isBlindAppend) - { - this.version = version; - this.timestamp = timestamp; - this.userId = userId; - this.userName = userName; - this.operation = operation; - this.operationParameters = operationParameters; - this.job = job; - this.notebook = notebook; - this.clusterId = clusterId; - this.readVersion = readVersion; - this.isolationLevel = isolationLevel; - this.isBlindAppend = isBlindAppend; - } - - @JsonProperty - public long getVersion() - { - return version; - } - - @JsonProperty - public long getTimestamp() - { - return timestamp; - } - - @JsonProperty - public String getUserId() - { - return userId; - } - - @JsonProperty - public String getUserName() - { - return userName; - } - - @JsonProperty - public String getOperation() - { - return operation; - } - - @JsonProperty - public Map getOperationParameters() - { - return operationParameters; - } - - @JsonProperty - public Job getJob() - { - return job; - } - - @JsonProperty - public Notebook getNotebook() - { - return notebook; - } - - @JsonProperty - public String getClusterId() - { - return clusterId; - } - - @JsonProperty - public long getReadVersion() + public CommitInfoEntry { - return readVersion; - } - - @JsonProperty - public String getIsolationLevel() - { - return isolationLevel; - } - - @JsonProperty("isBlindAppend") - public Optional isBlindAppend() - { - return isBlindAppend; + requireNonNull(isBlindAppend, "isBlindAppend is null"); } public CommitInfoEntry withVersion(long version) @@ -143,168 +42,7 @@ public CommitInfoEntry withVersion(long version) return new CommitInfoEntry(version, timestamp, userId, userName, operation, operationParameters, job, notebook, clusterId, readVersion, isolationLevel, isBlindAppend); } - @Override - public String toString() - { - return format("CommitInfoEntry{version=%d, timestamp=%d, userId=%s, userName=%s, operation=%s, operationParameters=%s, " + - "job=%s, notebook=%s, clusterId=%s, readVersion=%d, isolationLevel=%s, isBlindAppend=%b}", - version, timestamp, userId, userName, operation, operationParameters, - job, notebook, clusterId, readVersion, isolationLevel, isBlindAppend); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CommitInfoEntry other = (CommitInfoEntry) o; - return this.version == other.version && - this.timestamp == other.timestamp && - Objects.equals(this.userId, other.userId) && - Objects.equals(this.userName, other.userName) && - Objects.equals(this.operation, other.operation) && - Objects.equals(this.operationParameters, other.operationParameters) && - Objects.equals(this.job, other.job) && - Objects.equals(this.notebook, other.notebook) && - Objects.equals(this.clusterId, other.clusterId) && - this.readVersion == other.readVersion && - Objects.equals(this.isolationLevel, other.isolationLevel) && - Objects.equals(this.isBlindAppend, other.isBlindAppend); - } - - @Override - public int hashCode() - { - return Objects.hash(version, timestamp, userId, userName, operation, operationParameters, job, notebook, clusterId, readVersion, isolationLevel, isBlindAppend); - } - - public static class Job - { - private final String jobId; - private final String jobName; - private final String runId; - private final String jobOwnerId; - private final String triggerType; - - @JsonCreator - public Job( - @JsonProperty("jobId") String jobId, - @JsonProperty("jobName") String jobName, - @JsonProperty("runId") String runId, - @JsonProperty("jobOwnerId") String jobOwnerId, - @JsonProperty("triggerType") String triggerType) - { - this.jobId = jobId; - this.jobName = jobName; - this.runId = runId; - this.jobOwnerId = jobOwnerId; - this.triggerType = triggerType; - } + public record Job(String jobId, String jobName, String runId, String jobOwnerId, String triggerType) {} - @JsonProperty - public String getJobId() - { - return jobId; - } - - @JsonProperty - public String getJobName() - { - return jobName; - } - - @JsonProperty - public String getRunId() - { - return runId; - } - - @JsonProperty - public String getJobOwnerId() - { - return jobOwnerId; - } - - @JsonProperty - public String getTriggerType() - { - return triggerType; - } - - @Override - public String toString() - { - return format("CommitInfoEntry.Job{jobId=%s, jobName=%s, runId=%s, jobOwnerId=%s, triggerType=%s}", - jobId, jobName, runId, jobOwnerId, triggerType); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Job other = (Job) o; - return Objects.equals(this.jobId, other.jobId) && - Objects.equals(this.jobName, other.jobName) && - Objects.equals(this.runId, other.runId) && - Objects.equals(this.jobOwnerId, other.jobOwnerId) && - Objects.equals(this.triggerType, other.triggerType); - } - - @Override - public int hashCode() - { - return Objects.hash(jobId, jobName, runId, jobOwnerId, triggerType); - } - } - - public static class Notebook - { - private final String notebookId; - - @JsonCreator - public Notebook(@JsonProperty("notebookId") String notebookId) - { - this.notebookId = notebookId; - } - - @JsonProperty - public String getNotebookId() - { - return notebookId; - } - - @Override - public String toString() - { - return format("CommitInfoEntry.Notebook{notebookId=%s}", notebookId); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Notebook other = (Notebook) o; - return Objects.equals(this.notebookId, other.notebookId); - } - - @Override - public int hashCode() - { - return Objects.hash(notebookId); - } - } + public record Notebook(String notebookId) {} } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeDataFileCacheEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeDataFileCacheEntry.java index bc5f2f9d56f9..8736f06a9304 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeDataFileCacheEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeDataFileCacheEntry.java @@ -57,8 +57,8 @@ public DeltaLakeDataFileCacheEntry withUpdatesApplied(List options; - - @JsonCreator - public Format( - @JsonProperty("provider") String provider, - @JsonProperty("options") Map options) - { - this.provider = provider; - this.options = options; - } - - @JsonProperty - public String getProvider() - { - return provider; - } - - @JsonProperty - public Map getOptions() - { - return options; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Format format = (Format) o; - return Objects.equals(provider, format.provider) && - Objects.equals(options, format.options); - } - - @Override - public int hashCode() - { - return Objects.hash(provider, options); - } - - @Override - public String toString() - { - return format("MetadataEntry.Format{provider=%s, options=%s}", provider, options); - } - } + public record Format(String provider, Map options) {} } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java index 7159b22e6b61..a94ac55a318a 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/ProtocolEntry.java @@ -13,67 +13,33 @@ */ package io.trino.plugin.deltalake.transactionlog; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; -import java.util.Objects; import java.util.Optional; import java.util.Set; -import static java.lang.String.format; +import static java.util.Objects.requireNonNull; -public class ProtocolEntry +public record ProtocolEntry( + int minReaderVersion, + int minWriterVersion, + // The delta protocol documentation mentions that readerFeatures & writerFeatures is Array[String], but their actual implementation is Set + Optional> readerFeatures, + Optional> writerFeatures) { private static final int MIN_VERSION_SUPPORTS_READER_FEATURES = 3; private static final int MIN_VERSION_SUPPORTS_WRITER_FEATURES = 7; - private final int minReaderVersion; - private final int minWriterVersion; - private final Optional> readerFeatures; - private final Optional> writerFeatures; - - @JsonCreator - public ProtocolEntry( - @JsonProperty("minReaderVersion") int minReaderVersion, - @JsonProperty("minWriterVersion") int minWriterVersion, - // The delta protocol documentation mentions that readerFeatures & writerFeatures is Array[String], but their actual implementation is Set - @JsonProperty("readerFeatures") Optional> readerFeatures, - @JsonProperty("writerFeatures") Optional> writerFeatures) + public ProtocolEntry { - this.minReaderVersion = minReaderVersion; - this.minWriterVersion = minWriterVersion; if (minReaderVersion < MIN_VERSION_SUPPORTS_READER_FEATURES && readerFeatures.isPresent()) { throw new IllegalArgumentException("readerFeatures must not exist when minReaderVersion is less than " + MIN_VERSION_SUPPORTS_READER_FEATURES); } if (minWriterVersion < MIN_VERSION_SUPPORTS_WRITER_FEATURES && writerFeatures.isPresent()) { throw new IllegalArgumentException("writerFeatures must not exist when minWriterVersion is less than " + MIN_VERSION_SUPPORTS_WRITER_FEATURES); } - this.readerFeatures = readerFeatures; - this.writerFeatures = writerFeatures; - } - - @JsonProperty - public int getMinReaderVersion() - { - return minReaderVersion; - } - - @JsonProperty - public int getMinWriterVersion() - { - return minWriterVersion; - } - - @JsonProperty - public Optional> getReaderFeatures() - { - return readerFeatures; - } - - @JsonProperty - public Optional> getWriterFeatures() - { - return writerFeatures; + readerFeatures = requireNonNull(readerFeatures, "readerFeatures is null").map(ImmutableSet::copyOf); + writerFeatures = requireNonNull(writerFeatures, "writerFeatures is null").map(ImmutableSet::copyOf); } public boolean supportsReaderFeatures() @@ -95,37 +61,4 @@ public boolean writerFeaturesContains(String featureName) { return writerFeatures.map(features -> features.contains(featureName)).orElse(false); } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ProtocolEntry that = (ProtocolEntry) o; - return minReaderVersion == that.minReaderVersion && - minWriterVersion == that.minWriterVersion && - readerFeatures.equals(that.readerFeatures) && - writerFeatures.equals(that.writerFeatures); - } - - @Override - public int hashCode() - { - return Objects.hash(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures); - } - - @Override - public String toString() - { - return format( - "ProtocolEntry{minReaderVersion=%d, minWriterVersion=%d, readerFeatures=%s, writerFeatures=%s}", - minReaderVersion, - minWriterVersion, - readerFeatures, - writerFeatures); - } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/RemoveFileEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/RemoveFileEntry.java index d4cd89494892..0a690a1def82 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/RemoveFileEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/RemoveFileEntry.java @@ -13,88 +13,20 @@ */ package io.trino.plugin.deltalake.transactionlog; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nullable; import java.util.Map; -import java.util.Objects; -public class RemoveFileEntry -{ - private final String path; - private final Map partitionValues; - private final long deletionTimestamp; - private final boolean dataChange; - - @JsonCreator - public RemoveFileEntry( - @JsonProperty("path") String path, - @JsonProperty("partitionValues") @Nullable Map partitionValues, - @JsonProperty("deletionTimestamp") long deletionTimestamp, - @JsonProperty("dataChange") boolean dataChange) - { - this.path = path; - this.partitionValues = partitionValues; - this.deletionTimestamp = deletionTimestamp; - this.dataChange = dataChange; - } - - @JsonProperty - public String getPath() - { - return path; - } - - @Nullable - @JsonProperty - public Map getPartitionValues() - { - return partitionValues; - } +import static java.util.Objects.requireNonNull; - @JsonProperty - public long getDeletionTimestamp() - { - return deletionTimestamp; - } - - @JsonProperty - public boolean isDataChange() - { - return dataChange; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - RemoveFileEntry that = (RemoveFileEntry) o; - return deletionTimestamp == that.deletionTimestamp && - dataChange == that.dataChange && - Objects.equals(path, that.path) && - Objects.equals(partitionValues, that.partitionValues); - } - - @Override - public int hashCode() - { - return Objects.hash(path, partitionValues, deletionTimestamp, dataChange); - } - - @Override - public String toString() +public record RemoveFileEntry( + String path, + @Nullable Map partitionValues, + long deletionTimestamp, + boolean dataChange) +{ + public RemoveFileEntry { - return "RemoveFileEntry{" + - "path='" + path + '\'' + - ", partitionValues=" + partitionValues + - ", deletionTimestamp=" + deletionTimestamp + - ", dataChange=" + dataChange + - '}'; + requireNonNull(path, "path is null"); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java index 2db893522fe3..548ed5d7e652 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TableSnapshot.java @@ -92,7 +92,7 @@ public static TableSnapshot load( int domainCompactionThreshold) throws IOException { - Optional lastCheckpointVersion = lastCheckpoint.map(LastCheckpoint::getVersion); + Optional lastCheckpointVersion = lastCheckpoint.map(LastCheckpoint::version); TransactionLogTail transactionLogTail = TransactionLogTail.loadNewTail(fileSystem, tableLocation, lastCheckpointVersion, Optional.empty()); return new TableSnapshot( @@ -114,7 +114,7 @@ public Optional getUpdatedSnapshot(TrinoFileSystem fileSystem, Op Optional lastCheckpoint = readLastCheckpoint(fileSystem, tableLocation); if (lastCheckpoint.isPresent()) { long ourCheckpointVersion = getLastCheckpointVersion().orElse(0L); - if (ourCheckpointVersion != lastCheckpoint.get().getVersion()) { + if (ourCheckpointVersion != lastCheckpoint.get().version()) { // There is a new checkpoint in the table, load anew return Optional.of(TableSnapshot.load( table, @@ -219,7 +219,7 @@ public Stream getCheckpointTransactionLogEntries( public Optional getLastCheckpointVersion() { - return lastCheckpoint.map(LastCheckpoint::getVersion); + return lastCheckpoint.map(LastCheckpoint::version); } private CheckpointEntryIterator getCheckpointTransactionLogEntries( @@ -275,13 +275,13 @@ private List getCheckpointPartPaths(LastCheckpoint checkpoint) { Location transactionLogDir = Location.of(getTransactionLogDir(tableLocation)); ImmutableList.Builder paths = ImmutableList.builder(); - if (checkpoint.getParts().isEmpty()) { - paths.add(transactionLogDir.appendPath("%020d.checkpoint.parquet".formatted(checkpoint.getVersion()))); + if (checkpoint.parts().isEmpty()) { + paths.add(transactionLogDir.appendPath("%020d.checkpoint.parquet".formatted(checkpoint.version()))); } else { - int partsCount = checkpoint.getParts().get(); + int partsCount = checkpoint.parts().get(); for (int i = 1; i <= partsCount; i++) { - paths.add(transactionLogDir.appendPath("%020d.checkpoint.%010d.%010d.parquet".formatted(checkpoint.getVersion(), i, partsCount))); + paths.add(transactionLogDir.appendPath("%020d.checkpoint.%010d.%010d.parquet".formatted(checkpoint.version(), i, partsCount))); } } return paths.build(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionEntry.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionEntry.java index 4b77d59ae3d7..41ca6c25d3e1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionEntry.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionEntry.java @@ -13,72 +13,12 @@ */ package io.trino.plugin.deltalake.transactionlog; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; +import static java.util.Objects.requireNonNull; -import java.util.Objects; - -import static java.lang.String.format; - -public class TransactionEntry +public record TransactionEntry(String appId, long version, long lastUpdated) { - private final String appId; - private final long version; - private final long lastUpdated; - - @JsonCreator - public TransactionEntry( - @JsonProperty("appId") String appId, - @JsonProperty("version") long version, - @JsonProperty("lastUpdated") long lastUpdated) - { - this.appId = appId; - this.version = version; - this.lastUpdated = lastUpdated; - } - - @JsonProperty - public String getAppId() - { - return appId; - } - - @JsonProperty - public long getVersion() - { - return version; - } - - @JsonProperty - public long getLastUpdated() - { - return lastUpdated; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TransactionEntry that = (TransactionEntry) o; - return version == that.version && - lastUpdated == that.lastUpdated && - Objects.equals(appId, that.appId); - } - - @Override - public int hashCode() - { - return Objects.hash(appId, version, lastUpdated); - } - - @Override - public String toString() + public TransactionEntry { - return format("TransactionEntry{appId=%s, version=%d, lastUpdated=%d}", appId, version, lastUpdated); + requireNonNull(appId, "appId is null"); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java index 3d23008b0ee5..c64ad57c7a2f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/TransactionLogAccess.java @@ -355,7 +355,7 @@ private Stream activeAddEntries(Stream entry.getVersion()) { + if (currentEntry != null && currentEntry.version() > entry.version()) { return; } - transactionEntries.put(entry.getAppId(), entry); + transactionEntries.put(entry.appId(), entry); } private void handleAddFileEntry(@Nullable AddFileEntry entry) @@ -78,8 +78,8 @@ private void handleRemoveFileEntry(@Nullable RemoveFileEntry entry) if (entry == null) { return; } - removeFileEntries.put(entry.getPath(), entry); - addFileEntries.remove(entry.getPath()); + removeFileEntries.put(entry.path(), entry); + addFileEntries.remove(entry.path()); } public CheckpointEntries build() diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntries.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntries.java index 48f88b94def0..e20a1ba64c34 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntries.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntries.java @@ -20,98 +20,29 @@ import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TransactionEntry; -import java.math.BigInteger; -import java.util.Objects; import java.util.Set; -import java.util.StringJoiner; import static java.util.Objects.requireNonNull; -public class CheckpointEntries +public record CheckpointEntries( + MetadataEntry metadataEntry, + ProtocolEntry protocolEntry, + Set transactionEntries, + Set addFileEntries, + Set removeFileEntries) { - private final MetadataEntry metadataEntry; - private final ProtocolEntry protocolEntry; - private final Set transactionEntries; - private final Set addFileEntries; - private final Set removeFileEntries; - - CheckpointEntries( - MetadataEntry metadataEntry, - ProtocolEntry protocolEntry, - Set transactionEntries, - Set addFileEntries, - Set removeFileEntries) - { - this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null"); - this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null"); - this.transactionEntries = ImmutableSet.copyOf(transactionEntries); - this.addFileEntries = ImmutableSet.copyOf(addFileEntries); - this.removeFileEntries = ImmutableSet.copyOf(removeFileEntries); - } - - public MetadataEntry getMetadataEntry() - { - return metadataEntry; - } - - public ProtocolEntry getProtocolEntry() - { - return protocolEntry; - } - - public Set getTransactionEntries() - { - return transactionEntries; - } - - public Set getAddFileEntries() + public CheckpointEntries { - return addFileEntries; + requireNonNull(metadataEntry, "metadataEntry is null"); + requireNonNull(protocolEntry, "protocolEntry is null"); + transactionEntries = ImmutableSet.copyOf(requireNonNull(transactionEntries, "transactionEntries is null")); + addFileEntries = ImmutableSet.copyOf(requireNonNull(addFileEntries, "addFileEntries is null")); + removeFileEntries = ImmutableSet.copyOf(requireNonNull(removeFileEntries, "removeFileEntries is null")); } - public Set getRemoveFileEntries() - { - return removeFileEntries; - } - - public BigInteger size() + public long size() { // The additional 2 are for the MetadataEntry and ProtocolEntry - return BigInteger.valueOf(transactionEntries.size() + addFileEntries.size() + removeFileEntries.size() + 2); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CheckpointEntries that = (CheckpointEntries) o; - return Objects.equals(metadataEntry, that.metadataEntry) - && Objects.equals(protocolEntry, that.protocolEntry) - && Objects.equals(transactionEntries, that.transactionEntries) - && Objects.equals(addFileEntries, that.addFileEntries) - && Objects.equals(removeFileEntries, that.removeFileEntries); - } - - @Override - public int hashCode() - { - return Objects.hash(metadataEntry, protocolEntry, transactionEntries, addFileEntries, removeFileEntries); - } - - @Override - public String toString() - { - return new StringJoiner(", ", CheckpointEntries.class.getSimpleName() + "[", "]") - .add("metadataEntry=" + metadataEntry) - .add("protocolEntry=" + protocolEntry) - .add("transactionEntries=" + transactionEntries) - .add("addFileEntries=" + addFileEntries) - .add("removeFileEntries=" + removeFileEntries) - .toString(); + return transactionEntries.size() + addFileEntries.size() + removeFileEntries.size() + 2; } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java index c9b32621bd52..d7efc4cffc22 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointWriter.java @@ -105,19 +105,19 @@ public CheckpointWriter(TypeManager typeManager, CheckpointSchemaManager checkpo public void write(CheckpointEntries entries, TrinoOutputFile outputFile) throws IOException { - Map configuration = entries.getMetadataEntry().getConfiguration(); + Map configuration = entries.metadataEntry().getConfiguration(); boolean writeStatsAsJson = Boolean.parseBoolean(configuration.getOrDefault(DELTA_CHECKPOINT_WRITE_STATS_AS_JSON_PROPERTY, "true")); // The default value is false in https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-format, but Databricks defaults to true boolean writeStatsAsStruct = Boolean.parseBoolean(configuration.getOrDefault(DELTA_CHECKPOINT_WRITE_STATS_AS_STRUCT_PROPERTY, "true")); - ProtocolEntry protocolEntry = entries.getProtocolEntry(); + ProtocolEntry protocolEntry = entries.protocolEntry(); RowType metadataEntryType = checkpointSchemaManager.getMetadataEntryType(); - RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType(protocolEntry.getReaderFeatures().isPresent(), protocolEntry.getWriterFeatures().isPresent()); + RowType protocolEntryType = checkpointSchemaManager.getProtocolEntryType(protocolEntry.readerFeatures().isPresent(), protocolEntry.writerFeatures().isPresent()); RowType txnEntryType = checkpointSchemaManager.getTxnEntryType(); RowType addEntryType = checkpointSchemaManager.getAddEntryType( - entries.getMetadataEntry(), - entries.getProtocolEntry(), + entries.metadataEntry(), + entries.protocolEntry(), alwaysTrue(), writeStatsAsJson, writeStatsAsStruct, @@ -151,19 +151,19 @@ public void write(CheckpointEntries entries, TrinoOutputFile outputFile) PageBuilder pageBuilder = new PageBuilder(columnTypes); - writeMetadataEntry(pageBuilder, metadataEntryType, entries.getMetadataEntry()); - writeProtocolEntry(pageBuilder, protocolEntryType, entries.getProtocolEntry()); - for (TransactionEntry transactionEntry : entries.getTransactionEntries()) { + writeMetadataEntry(pageBuilder, metadataEntryType, entries.metadataEntry()); + writeProtocolEntry(pageBuilder, protocolEntryType, entries.protocolEntry()); + for (TransactionEntry transactionEntry : entries.transactionEntries()) { writeTransactionEntry(pageBuilder, txnEntryType, transactionEntry); } - List partitionColumns = extractPartitionColumns(entries.getMetadataEntry(), entries.getProtocolEntry(), typeManager); + List partitionColumns = extractPartitionColumns(entries.metadataEntry(), entries.protocolEntry(), typeManager); List partitionValuesParsedFieldTypes = partitionColumns.stream() .map(column -> RowType.field(column.getColumnName(), column.getType())) .collect(toImmutableList()); - for (AddFileEntry addFileEntry : entries.getAddFileEntries()) { - writeAddFileEntry(pageBuilder, addEntryType, addFileEntry, entries.getMetadataEntry(), entries.getProtocolEntry(), partitionColumns, partitionValuesParsedFieldTypes, writeStatsAsJson, writeStatsAsStruct); + for (AddFileEntry addFileEntry : entries.addFileEntries()) { + writeAddFileEntry(pageBuilder, addEntryType, addFileEntry, entries.metadataEntry(), entries.protocolEntry(), partitionColumns, partitionValuesParsedFieldTypes, writeStatsAsJson, writeStatsAsStruct); } - for (RemoveFileEntry removeFileEntry : entries.getRemoveFileEntries()) { + for (RemoveFileEntry removeFileEntry : entries.removeFileEntries()) { writeRemoveFileEntry(pageBuilder, removeEntryType, removeFileEntry); } // Not writing commit infos for now. DB does not keep them in the checkpoints by default @@ -182,8 +182,8 @@ private void writeMetadataEntry(PageBuilder pageBuilder, RowType entryType, Meta RowType formatType = getInternalRowType(entryType, 3, "format"); ((RowBlockBuilder) fieldBuilders.get(3)).buildEntry(formatBlockBuilders -> { - writeString(formatBlockBuilders.get(0), formatType, 0, "provider", metadataEntry.getFormat().getProvider()); - writeStringMap(formatBlockBuilders.get(1), formatType, 1, "options", metadataEntry.getFormat().getOptions()); + writeString(formatBlockBuilders.get(0), formatType, 0, "provider", metadataEntry.getFormat().provider()); + writeStringMap(formatBlockBuilders.get(1), formatType, 1, "options", metadataEntry.getFormat().options()); }); writeString(fieldBuilders.get(4), entryType, 4, "schemaString", metadataEntry.getSchemaString()); @@ -201,19 +201,19 @@ private void writeProtocolEntry(PageBuilder pageBuilder, RowType entryType, Prot pageBuilder.declarePosition(); ((RowBlockBuilder) pageBuilder.getBlockBuilder(PROTOCOL_BLOCK_CHANNEL)).buildEntry(fieldBuilders -> { int fieldId = 0; - writeLong(fieldBuilders.get(fieldId), entryType, fieldId, "minReaderVersion", (long) protocolEntry.getMinReaderVersion()); + writeLong(fieldBuilders.get(fieldId), entryType, fieldId, "minReaderVersion", (long) protocolEntry.minReaderVersion()); fieldId++; - writeLong(fieldBuilders.get(fieldId), entryType, fieldId, "minWriterVersion", (long) protocolEntry.getMinWriterVersion()); + writeLong(fieldBuilders.get(fieldId), entryType, fieldId, "minWriterVersion", (long) protocolEntry.minWriterVersion()); fieldId++; - if (protocolEntry.getReaderFeatures().isPresent()) { - writeStringList(fieldBuilders.get(fieldId), entryType, fieldId, "readerFeatures", protocolEntry.getReaderFeatures().get().stream().collect(toImmutableList())); + if (protocolEntry.readerFeatures().isPresent()) { + writeStringList(fieldBuilders.get(fieldId), entryType, fieldId, "readerFeatures", protocolEntry.readerFeatures().get().stream().collect(toImmutableList())); fieldId++; } - if (protocolEntry.getWriterFeatures().isPresent()) { - writeStringList(fieldBuilders.get(fieldId), entryType, fieldId, "writerFeatures", protocolEntry.getWriterFeatures().get().stream().collect(toImmutableList())); + if (protocolEntry.writerFeatures().isPresent()) { + writeStringList(fieldBuilders.get(fieldId), entryType, fieldId, "writerFeatures", protocolEntry.writerFeatures().get().stream().collect(toImmutableList())); } }); @@ -225,9 +225,9 @@ private void writeTransactionEntry(PageBuilder pageBuilder, RowType entryType, T { pageBuilder.declarePosition(); ((RowBlockBuilder) pageBuilder.getBlockBuilder(TXN_BLOCK_CHANNEL)).buildEntry(fieldBuilders -> { - writeString(fieldBuilders.get(0), entryType, 0, "appId", transactionEntry.getAppId()); - writeLong(fieldBuilders.get(1), entryType, 1, "version", transactionEntry.getVersion()); - writeLong(fieldBuilders.get(2), entryType, 2, "lastUpdated", transactionEntry.getLastUpdated()); + writeString(fieldBuilders.get(0), entryType, 0, "appId", transactionEntry.appId()); + writeLong(fieldBuilders.get(1), entryType, 1, "version", transactionEntry.version()); + writeLong(fieldBuilders.get(2), entryType, 2, "lastUpdated", transactionEntry.lastUpdated()); }); // null for others @@ -462,10 +462,10 @@ private void writeRemoveFileEntry(PageBuilder pageBuilder, RowType entryType, Re { pageBuilder.declarePosition(); ((RowBlockBuilder) pageBuilder.getBlockBuilder(REMOVE_BLOCK_CHANNEL)).buildEntry(fieldBuilders -> { - writeString(fieldBuilders.get(0), entryType, 0, "path", removeFileEntry.getPath()); - writeStringMap(fieldBuilders.get(1), entryType, 1, "partitionValues", removeFileEntry.getPartitionValues()); - writeLong(fieldBuilders.get(2), entryType, 2, "deletionTimestamp", removeFileEntry.getDeletionTimestamp()); - writeBoolean(fieldBuilders.get(3), entryType, 3, "dataChange", removeFileEntry.isDataChange()); + writeString(fieldBuilders.get(0), entryType, 0, "path", removeFileEntry.path()); + writeStringMap(fieldBuilders.get(1), entryType, 1, "partitionValues", removeFileEntry.partitionValues()); + writeLong(fieldBuilders.get(2), entryType, 2, "deletionTimestamp", removeFileEntry.deletionTimestamp()); + writeBoolean(fieldBuilders.get(3), entryType, 3, "dataChange", removeFileEntry.dataChange()); }); // null for others diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/LastCheckpoint.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/LastCheckpoint.java index 52bfedd3d999..3754e17692db 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/LastCheckpoint.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/LastCheckpoint.java @@ -13,79 +13,14 @@ */ package io.trino.plugin.deltalake.transactionlog.checkpoint; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.math.BigInteger; -import java.util.Objects; import java.util.Optional; -import static com.google.common.base.MoreObjects.toStringHelper; import static java.util.Objects.requireNonNull; -public class LastCheckpoint +public record LastCheckpoint(long version, long size, Optional parts) { - private final long version; - private final BigInteger size; - private final Optional parts; - - @JsonCreator - public LastCheckpoint( - @JsonProperty("version") long version, - @JsonProperty("size") BigInteger size, - @JsonProperty("parts") Optional parts) - { - this.version = version; - this.size = requireNonNull(size, "size is null"); - this.parts = requireNonNull(parts, "parts is null"); - } - - @JsonProperty - public long getVersion() - { - return version; - } - - @JsonProperty - public BigInteger getSize() - { - return size; - } - - @JsonProperty - public Optional getParts() - { - return parts; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LastCheckpoint that = (LastCheckpoint) o; - return version == that.version && - size.equals(that.size) && - parts.equals(that.parts); - } - - @Override - public int hashCode() - { - return Objects.hash(version, size, parts); - } - - @Override - public String toString() + public LastCheckpoint { - return toStringHelper(this) - .addValue(version) - .add("size", size) - .add("parts", parts) - .toString(); + requireNonNull(parts, "parts is null"); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java index 238310d40190..81a5175a517c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java @@ -118,7 +118,7 @@ public static Optional> getEntriesFromJson(lo String line = reader.readLine(); while (line != null) { DeltaLakeTransactionLogEntry deltaLakeTransactionLogEntry = parseJson(line); - if (deltaLakeTransactionLogEntry.getCommitInfo() != null && deltaLakeTransactionLogEntry.getCommitInfo().getVersion() == 0L) { + if (deltaLakeTransactionLogEntry.getCommitInfo() != null && deltaLakeTransactionLogEntry.getCommitInfo().version() == 0L) { // In case that the commit info version is missing, use the version from the transaction log file name deltaLakeTransactionLogEntry = deltaLakeTransactionLogEntry.withCommitInfo(deltaLakeTransactionLogEntry.getCommitInfo().withVersion(entryNumber)); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogWriter.java index 70ae104b8f6d..23d783f86c3d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/TransactionLogWriter.java @@ -98,7 +98,7 @@ public void flush() String transactionLogLocation = getTransactionLogDir(tableLocation); CommitInfoEntry commitInfo = requireNonNull(commitInfoEntry.get().getCommitInfo(), "commitInfoEntry.get().getCommitInfo() is null"); - Location logEntry = getTransactionLogJsonEntryPath(transactionLogLocation, commitInfo.getVersion()); + Location logEntry = getTransactionLogJsonEntryPath(transactionLogLocation, commitInfo.version()); ByteArrayOutputStream bos = new ByteArrayOutputStream(); writeEntry(bos, commitInfoEntry.get()); @@ -106,7 +106,7 @@ public void flush() writeEntry(bos, entry); } - String clusterId = commitInfoEntry.get().getCommitInfo().getClusterId(); + String clusterId = commitInfoEntry.get().getCommitInfo().clusterId(); logSynchronizer.write(session, clusterId, logEntry, bos.toByteArray()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index bacffed3df4d..4f2f2a055ed7 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -683,10 +683,10 @@ private void testTrinoCreateTableWithTimestampNtz(ZoneId sessionZone, Consumer transactionLogs = getEntriesFromJson(0, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); ProtocolEntry protocolEntry = transactionLogs.get(1).getProtocol(); assertThat(protocolEntry).isNotNull(); - assertThat(protocolEntry.getMinReaderVersion()).isEqualTo(3); - assertThat(protocolEntry.getMinWriterVersion()).isEqualTo(7); - assertThat(protocolEntry.getReaderFeatures()).isEqualTo(Optional.of(ImmutableSet.of("timestampNtz"))); - assertThat(protocolEntry.getWriterFeatures()).isEqualTo(Optional.of(ImmutableSet.of("timestampNtz"))); + assertThat(protocolEntry.minReaderVersion()).isEqualTo(3); + assertThat(protocolEntry.minWriterVersion()).isEqualTo(7); + assertThat(protocolEntry.readerFeatures()).hasValue(ImmutableSet.of("timestampNtz")); + assertThat(protocolEntry.writerFeatures()).hasValue(ImmutableSet.of("timestampNtz")); // Insert rows and verify results assertUpdate(session, @@ -876,18 +876,18 @@ public void testAddTimestampNtzColumn() List transactionLogsByCreateTable = getEntriesFromJson(0, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); ProtocolEntry protocolEntryByCreateTable = transactionLogsByCreateTable.get(1).getProtocol(); assertThat(protocolEntryByCreateTable).isNotNull(); - assertThat(protocolEntryByCreateTable.getMinReaderVersion()).isEqualTo(1); - assertThat(protocolEntryByCreateTable.getMinWriterVersion()).isEqualTo(2); - assertThat(protocolEntryByCreateTable.getReaderFeatures()).isEmpty(); - assertThat(protocolEntryByCreateTable.getWriterFeatures()).isEmpty(); + assertThat(protocolEntryByCreateTable.minReaderVersion()).isEqualTo(1); + assertThat(protocolEntryByCreateTable.minWriterVersion()).isEqualTo(2); + assertThat(protocolEntryByCreateTable.readerFeatures()).isEmpty(); + assertThat(protocolEntryByCreateTable.writerFeatures()).isEmpty(); List transactionLogsByAddColumn = getEntriesFromJson(1, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow(); ProtocolEntry protocolEntryByAddColumn = transactionLogsByAddColumn.get(1).getProtocol(); assertThat(protocolEntryByAddColumn).isNotNull(); - assertThat(protocolEntryByAddColumn.getMinReaderVersion()).isEqualTo(3); - assertThat(protocolEntryByAddColumn.getMinWriterVersion()).isEqualTo(7); - assertThat(protocolEntryByAddColumn.getReaderFeatures()).isEqualTo(Optional.of(ImmutableSet.of("timestampNtz"))); - assertThat(protocolEntryByAddColumn.getWriterFeatures()).isEqualTo(Optional.of(ImmutableSet.of("timestampNtz"))); + assertThat(protocolEntryByAddColumn.minReaderVersion()).isEqualTo(3); + assertThat(protocolEntryByAddColumn.minWriterVersion()).isEqualTo(7); + assertThat(protocolEntryByAddColumn.readerFeatures()).hasValue(ImmutableSet.of("timestampNtz")); + assertThat(protocolEntryByAddColumn.writerFeatures()).hasValue(ImmutableSet.of("timestampNtz")); assertUpdate("DROP TABLE " + tableName); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestReadJsonTransactionLog.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestReadJsonTransactionLog.java index 45f546e075eb..d760db28fa53 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestReadJsonTransactionLog.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestReadJsonTransactionLog.java @@ -25,12 +25,10 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; -import java.math.BigInteger; import java.net.URISyntaxException; import java.nio.file.Files; import java.util.Arrays; import java.util.Objects; -import java.util.Optional; import java.util.stream.Stream; import static com.google.common.base.Verify.verify; @@ -68,7 +66,7 @@ public void testRemove() .map(this::deserialize) .map(DeltaLakeTransactionLogEntry::getRemove) .filter(Objects::nonNull) - .map(RemoveFileEntry::getPath) + .map(RemoveFileEntry::path) .filter(Objects::nonNull) .count()).isEqualTo(6); @@ -76,7 +74,7 @@ public void testRemove() .map(this::deserialize) .map(DeltaLakeTransactionLogEntry::getRemove) .filter(Objects::nonNull) - .map(RemoveFileEntry::getPath) + .map(RemoveFileEntry::path) .filter(Objects::nonNull) .count()).isEqualTo(6); } @@ -86,9 +84,9 @@ public void testReadLastCheckpointFile() throws JsonProcessingException { LastCheckpoint lastCheckpoint = objectMapper.readValue("{\"version\":10,\"size\":17}", LastCheckpoint.class); - assertThat(lastCheckpoint.getVersion()).isEqualTo(10L); - assertThat(lastCheckpoint.getSize()).isEqualTo(BigInteger.valueOf(17L)); - assertThat(lastCheckpoint.getParts()).isEqualTo(Optional.empty()); + assertThat(lastCheckpoint.version()).isEqualTo(10L); + assertThat(lastCheckpoint.size()).isEqualTo(17); + assertThat(lastCheckpoint.parts()).isEmpty(); } @Test @@ -96,9 +94,9 @@ public void testReadLastCheckpointFileForMultipart() throws JsonProcessingException { LastCheckpoint lastCheckpoint = objectMapper.readValue("{\"version\":237580,\"size\":658573,\"parts\":2}", LastCheckpoint.class); - assertThat(lastCheckpoint.getVersion()).isEqualTo(237580L); - assertThat(lastCheckpoint.getSize()).isEqualTo(BigInteger.valueOf(658573L)); - assertThat(lastCheckpoint.getParts()).isEqualTo(Optional.of(2)); + assertThat(lastCheckpoint.version()).isEqualTo(237580L); + assertThat(lastCheckpoint.size()).isEqualTo(658573L); + assertThat(lastCheckpoint.parts()).hasValue(2); } private Stream readJsonTransactionLogs(String location) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java index 26982d35534f..64eb9f370f9d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestTransactionLogAccess.java @@ -180,8 +180,8 @@ public void testGetMetadataEntry() assertThat(metadataEntry.getLowercasePartitionColumns()).containsOnly("age"); MetadataEntry.Format format = metadataEntry.getFormat(); - assertThat(format.getOptions().keySet().size()).isEqualTo(0); - assertThat(format.getProvider()).isEqualTo("parquet"); + assertThat(format.options().keySet()).isEmpty(); + assertThat(format.provider()).isEqualTo("parquet"); assertThat(tableSnapshot.getCachedMetadata()).isEqualTo(Optional.of(metadataEntry)); } @@ -376,8 +376,8 @@ private void testAllGetMetadataEntry(String tableName, String resourcePath) assertThat(metadataEntry.getOriginalPartitionColumns()).containsOnly("age"); MetadataEntry.Format format = metadataEntry.getFormat(); - assertThat(format.getOptions().keySet().size()).isEqualTo(0); - assertThat(format.getProvider()).isEqualTo("parquet"); + assertThat(format.options().keySet()).isEmpty(); + assertThat(format.provider()).isEqualTo("parquet"); } @Test @@ -421,8 +421,8 @@ private void testAllGetRemoveEntries(String tableName, String resourcePath) setupTransactionLogAccessFromResources(tableName, resourcePath); try (Stream removeEntries = transactionLogAccess.getRemoveEntries(SESSION, tableSnapshot)) { - Set removedPaths = removeEntries.map(RemoveFileEntry::getPath).collect(Collectors.toSet()); - Set expectedPaths = EXPECTED_REMOVE_ENTRIES.stream().map(RemoveFileEntry::getPath).collect(Collectors.toSet()); + Set removedPaths = removeEntries.map(RemoveFileEntry::path).collect(Collectors.toSet()); + Set expectedPaths = EXPECTED_REMOVE_ENTRIES.stream().map(RemoveFileEntry::path).collect(Collectors.toSet()); assertThat(removedPaths).isEqualTo(expectedPaths); } @@ -446,8 +446,8 @@ private void testAllGetProtocolEntries(String tableName, String resourcePath) try (Stream protocolEntryStream = transactionLogAccess.getProtocolEntries(SESSION, tableSnapshot)) { List protocolEntries = protocolEntryStream.toList(); assertThat(protocolEntries.size()).isEqualTo(1); - assertThat(protocolEntries.get(0).getMinReaderVersion()).isEqualTo(1); - assertThat(protocolEntries.get(0).getMinWriterVersion()).isEqualTo(2); + assertThat(protocolEntries.get(0).minReaderVersion()).isEqualTo(1); + assertThat(protocolEntries.get(0).minWriterVersion()).isEqualTo(2); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java index f357207a4656..ef7333cf7132 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TestCheckpointWriter.java @@ -196,11 +196,11 @@ public void testCheckpointWriteReadJsonRoundtrip() writer.write(entries, createOutputFile(targetPath)); CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, protocolEntry, true); - assertThat(readEntries.getTransactionEntries()).isEqualTo(entries.getTransactionEntries()); - assertThat(readEntries.getRemoveFileEntries()).isEqualTo(entries.getRemoveFileEntries()); - assertThat(readEntries.getMetadataEntry()).isEqualTo(entries.getMetadataEntry()); - assertThat(readEntries.getProtocolEntry()).isEqualTo(entries.getProtocolEntry()); - assertThat(readEntries.getAddFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())).isEqualTo(entries.getAddFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())); + assertThat(readEntries.transactionEntries()).isEqualTo(entries.transactionEntries()); + assertThat(readEntries.removeFileEntries()).isEqualTo(entries.removeFileEntries()); + assertThat(readEntries.metadataEntry()).isEqualTo(entries.metadataEntry()); + assertThat(readEntries.protocolEntry()).isEqualTo(entries.protocolEntry()); + assertThat(readEntries.addFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())).isEqualTo(entries.addFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())); } @Test @@ -333,11 +333,11 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip() writer.write(entries, createOutputFile(targetPath)); CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, protocolEntry, true); - assertThat(readEntries.getTransactionEntries()).isEqualTo(entries.getTransactionEntries()); - assertThat(readEntries.getRemoveFileEntries()).isEqualTo(entries.getRemoveFileEntries()); - assertThat(readEntries.getMetadataEntry()).isEqualTo(entries.getMetadataEntry()); - assertThat(readEntries.getProtocolEntry()).isEqualTo(entries.getProtocolEntry()); - assertThat(readEntries.getAddFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())).isEqualTo(entries.getAddFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())); + assertThat(readEntries.transactionEntries()).isEqualTo(entries.transactionEntries()); + assertThat(readEntries.removeFileEntries()).isEqualTo(entries.removeFileEntries()); + assertThat(readEntries.metadataEntry()).isEqualTo(entries.metadataEntry()); + assertThat(readEntries.protocolEntry()).isEqualTo(entries.protocolEntry()); + assertThat(readEntries.addFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())).isEqualTo(entries.addFileEntries().stream().map(this::makeComparable).collect(toImmutableSet())); } @Test @@ -401,7 +401,7 @@ public void testDisablingRowStatistics() writer.write(entries, createOutputFile(targetPath)); CheckpointEntries readEntries = readCheckpoint(targetPath, metadataEntry, protocolEntry, false); - AddFileEntry addFileEntry = getOnlyElement(readEntries.getAddFileEntries()); + AddFileEntry addFileEntry = getOnlyElement(readEntries.addFileEntries()); assertThat(addFileEntry.getStats()).isPresent(); DeltaLakeParquetFileStatistics fileStatistics = (DeltaLakeParquetFileStatistics) addFileEntry.getStats().get();