Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for v2Checkpoint in Delta Lake #19507

Merged
merged 2 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ features](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#table-featur
- Readers only
* - Timestamp without time zone
- Readers and writers
* - V2 checkpoint
- Readers only
:::

No other features are supported.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.collect.ImmutableMap;

import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-metadata
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
public record CheckpointMetadataEntry(long version, Optional<Map<String, String>> tags)
{
public CheckpointMetadataEntry
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
{
checkArgument(version > 0, "version is not positive: %s", version);
requireNonNull(tags, "tags is null");
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
tags = tags.map(ImmutableMap::copyOf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ private DeltaLakeSchemaSupport() {}
private static final String IDENTITY_COLUMNS_FEATURE_NAME = "identityColumns";
private static final String INVARIANTS_FEATURE_NAME = "invariants";
public static final String TIMESTAMP_NTZ_FEATURE_NAME = "timestampNtz";
public static final String V2_CHECKPOINT_FEATURE_NAME = "v2Checkpoint";

private static final Set<String> SUPPORTED_READER_FEATURES = ImmutableSet.<String>builder()
.add(COLUMN_MAPPING_FEATURE_NAME)
.add(TIMESTAMP_NTZ_FEATURE_NAME)
.add(DELETION_VECTORS_FEATURE_NAME)
.add(V2_CHECKPOINT_FEATURE_NAME)
.build();
private static final Set<String> SUPPORTED_WRITER_FEATURES = ImmutableSet.<String>builder()
.add(APPEND_ONLY_FEATURE_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class DeltaLakeTransactionLogEntry
private final ProtocolEntry protocol;
private final CommitInfoEntry commitInfo;
private final CdcEntry cdcEntry;
private final SidecarEntry sidecar;
private final CheckpointMetadataEntry checkpointMetadata;

private DeltaLakeTransactionLogEntry(
TransactionEntry txn,
Expand All @@ -38,7 +40,9 @@ private DeltaLakeTransactionLogEntry(
MetadataEntry metaData,
ProtocolEntry protocol,
CommitInfoEntry commitInfo,
CdcEntry cdcEntry)
CdcEntry cdcEntry,
SidecarEntry sidecar,
CheckpointMetadataEntry checkpointMetadata)
{
this.txn = txn;
this.add = add;
Expand All @@ -47,6 +51,8 @@ private DeltaLakeTransactionLogEntry(
this.protocol = protocol;
this.commitInfo = commitInfo;
this.cdcEntry = cdcEntry;
this.sidecar = sidecar;
this.checkpointMetadata = checkpointMetadata;
}

@JsonCreator
Expand All @@ -57,51 +63,59 @@ public static DeltaLakeTransactionLogEntry fromJson(
@JsonProperty("metaData") MetadataEntry metaData,
@JsonProperty("protocol") ProtocolEntry protocol,
@JsonProperty("commitInfo") CommitInfoEntry commitInfo,
@JsonProperty("cdc") CdcEntry cdcEntry)
@JsonProperty("cdc") CdcEntry cdcEntry,
@JsonProperty("sidecar") SidecarEntry sidecarEntry,
@JsonProperty("checkpointMetadata") CheckpointMetadataEntry checkpointMetadata)
{
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry, sidecarEntry, checkpointMetadata);
}

public static DeltaLakeTransactionLogEntry transactionEntry(TransactionEntry transaction)
{
requireNonNull(transaction, "transaction is null");
return new DeltaLakeTransactionLogEntry(transaction, null, null, null, null, null, null);
return new DeltaLakeTransactionLogEntry(transaction, null, null, null, null, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry commitInfoEntry(CommitInfoEntry commitInfo)
{
requireNonNull(commitInfo, "commitInfo is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, commitInfo, null);
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, commitInfo, null, null, null);
}

public static DeltaLakeTransactionLogEntry protocolEntry(ProtocolEntry protocolEntry)
{
requireNonNull(protocolEntry, "protocolEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, protocolEntry, null, null);
return new DeltaLakeTransactionLogEntry(null, null, null, null, protocolEntry, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry metadataEntry(MetadataEntry metadataEntry)
{
requireNonNull(metadataEntry, "metadataEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, metadataEntry, null, null, null);
return new DeltaLakeTransactionLogEntry(null, null, null, metadataEntry, null, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry addFileEntry(AddFileEntry addFileEntry)
{
requireNonNull(addFileEntry, "addFileEntry is null");
return new DeltaLakeTransactionLogEntry(null, addFileEntry, null, null, null, null, null);
return new DeltaLakeTransactionLogEntry(null, addFileEntry, null, null, null, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry removeFileEntry(RemoveFileEntry removeFileEntry)
{
requireNonNull(removeFileEntry, "removeFileEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, removeFileEntry, null, null, null, null);
return new DeltaLakeTransactionLogEntry(null, null, removeFileEntry, null, null, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry cdcEntry(CdcEntry cdcEntry)
{
requireNonNull(cdcEntry, "cdcEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, cdcEntry);
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, cdcEntry, null, null);
}

public static DeltaLakeTransactionLogEntry sidecarEntry(SidecarEntry sidecarEntry)
{
requireNonNull(sidecarEntry, "sidecarEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, null, sidecarEntry, null);
}

@Nullable
Expand Down Expand Up @@ -153,9 +167,23 @@ public CdcEntry getCDC()
return cdcEntry;
}

@Nullable
@JsonProperty
public SidecarEntry getSidecar()
{
return sidecar;
}

@Nullable
@JsonProperty
public CheckpointMetadataEntry getCheckpointMetadata()
{
return checkpointMetadata;
}

public DeltaLakeTransactionLogEntry withCommitInfo(CommitInfoEntry commitInfo)
{
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry, sidecar, checkpointMetadata);
}

ebyhr marked this conversation as resolved.
Show resolved Hide resolved
@Override
Expand All @@ -174,18 +202,20 @@ public boolean equals(Object o)
Objects.equals(metaData, that.metaData) &&
Objects.equals(protocol, that.protocol) &&
Objects.equals(commitInfo, that.commitInfo) &&
Objects.equals(cdcEntry, that.cdcEntry);
Objects.equals(cdcEntry, that.cdcEntry) &&
Objects.equals(sidecar, that.sidecar) &&
Objects.equals(checkpointMetadata, that.checkpointMetadata);
}

@Override
public int hashCode()
{
return Objects.hash(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
return Objects.hash(txn, add, remove, metaData, protocol, commitInfo, cdcEntry, sidecar, checkpointMetadata);
}

@Override
public String toString()
{
return String.format("DeltaLakeTransactionLogEntry{%s, %s, %s, %s, %s, %s, %s}", txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
return String.format("DeltaLakeTransactionLogEntry{%s, %s, %s, %s, %s, %s, %s, %s, %s}", txn, add, remove, metaData, protocol, commitInfo, cdcEntry, sidecar, checkpointMetadata);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.collect.ImmutableMap;

import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
public record SidecarEntry(String path, long sizeInBytes, long modificationTime, Optional<Map<String, String>> tags)
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
{
public SidecarEntry
{
checkArgument(sizeInBytes > 0, "sizeInBytes is not positive: %s", sizeInBytes);
requireNonNull(path, "path is null");
requireNonNull(tags, "tags is null");
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
tags = tags.map(ImmutableMap::copyOf);
}
}
Loading
Loading