Skip to content

Commit

Permalink
Add support for v2Checkpoint in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 29, 2024
1 parent 11afaf7 commit f1773d4
Show file tree
Hide file tree
Showing 43 changed files with 606 additions and 40 deletions.
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ features](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#table-featur
- Readers only
* - Timestamp without time zone
- Readers and writers
* - V2 checkpoint
- Readers only
:::

No other features are supported.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.collect.ImmutableMap;

import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-metadata
public record CheckpointMetadataEntry(long version, Optional<Map<String, String>> tags)
{
public CheckpointMetadataEntry
{
checkArgument(version > 0, "version is not positive: %s", version);
requireNonNull(tags, "tags is null");
tags = tags.map(ImmutableMap::copyOf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ private DeltaLakeSchemaSupport() {}
private static final String IDENTITY_COLUMNS_FEATURE_NAME = "identityColumns";
private static final String INVARIANTS_FEATURE_NAME = "invariants";
public static final String TIMESTAMP_NTZ_FEATURE_NAME = "timestampNtz";
public static final String V2_CHECKPOINT_FEATURE_NAME = "v2Checkpoint";

private static final Set<String> SUPPORTED_READER_FEATURES = ImmutableSet.<String>builder()
.add(COLUMN_MAPPING_FEATURE_NAME)
.add(TIMESTAMP_NTZ_FEATURE_NAME)
.add(DELETION_VECTORS_FEATURE_NAME)
.add(V2_CHECKPOINT_FEATURE_NAME)
.build();
private static final Set<String> SUPPORTED_WRITER_FEATURES = ImmutableSet.<String>builder()
.add(APPEND_ONLY_FEATURE_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class DeltaLakeTransactionLogEntry
private final ProtocolEntry protocol;
private final CommitInfoEntry commitInfo;
private final CdcEntry cdcEntry;
private final SidecarEntry sidecar;
private final CheckpointMetadataEntry checkpointMetadata;

private DeltaLakeTransactionLogEntry(
TransactionEntry txn,
Expand All @@ -38,7 +40,9 @@ private DeltaLakeTransactionLogEntry(
MetadataEntry metaData,
ProtocolEntry protocol,
CommitInfoEntry commitInfo,
CdcEntry cdcEntry)
CdcEntry cdcEntry,
SidecarEntry sidecar,
CheckpointMetadataEntry checkpointMetadata)
{
this.txn = txn;
this.add = add;
Expand All @@ -47,6 +51,8 @@ private DeltaLakeTransactionLogEntry(
this.protocol = protocol;
this.commitInfo = commitInfo;
this.cdcEntry = cdcEntry;
this.sidecar = sidecar;
this.checkpointMetadata = checkpointMetadata;
}

@JsonCreator
Expand All @@ -57,51 +63,59 @@ public static DeltaLakeTransactionLogEntry fromJson(
@JsonProperty("metaData") MetadataEntry metaData,
@JsonProperty("protocol") ProtocolEntry protocol,
@JsonProperty("commitInfo") CommitInfoEntry commitInfo,
@JsonProperty("cdc") CdcEntry cdcEntry)
@JsonProperty("cdc") CdcEntry cdcEntry,
@JsonProperty("sidecar") SidecarEntry sidecarEntry,
@JsonProperty("checkpointMetadata") CheckpointMetadataEntry checkpointMetadata)
{
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry, sidecarEntry, checkpointMetadata);
}

public static DeltaLakeTransactionLogEntry transactionEntry(TransactionEntry transaction)
{
requireNonNull(transaction, "transaction is null");
return new DeltaLakeTransactionLogEntry(transaction, null, null, null, null, null, null);
return new DeltaLakeTransactionLogEntry(transaction, null, null, null, null, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry commitInfoEntry(CommitInfoEntry commitInfo)
{
requireNonNull(commitInfo, "commitInfo is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, commitInfo, null);
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, commitInfo, null, null, null);
}

public static DeltaLakeTransactionLogEntry protocolEntry(ProtocolEntry protocolEntry)
{
requireNonNull(protocolEntry, "protocolEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, protocolEntry, null, null);
return new DeltaLakeTransactionLogEntry(null, null, null, null, protocolEntry, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry metadataEntry(MetadataEntry metadataEntry)
{
requireNonNull(metadataEntry, "metadataEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, metadataEntry, null, null, null);
return new DeltaLakeTransactionLogEntry(null, null, null, metadataEntry, null, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry addFileEntry(AddFileEntry addFileEntry)
{
requireNonNull(addFileEntry, "addFileEntry is null");
return new DeltaLakeTransactionLogEntry(null, addFileEntry, null, null, null, null, null);
return new DeltaLakeTransactionLogEntry(null, addFileEntry, null, null, null, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry removeFileEntry(RemoveFileEntry removeFileEntry)
{
requireNonNull(removeFileEntry, "removeFileEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, removeFileEntry, null, null, null, null);
return new DeltaLakeTransactionLogEntry(null, null, removeFileEntry, null, null, null, null, null, null);
}

public static DeltaLakeTransactionLogEntry cdcEntry(CdcEntry cdcEntry)
{
requireNonNull(cdcEntry, "cdcEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, cdcEntry);
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, cdcEntry, null, null);
}

public static DeltaLakeTransactionLogEntry sidecarEntry(SidecarEntry sidecarEntry)
{
requireNonNull(sidecarEntry, "sidecarEntry is null");
return new DeltaLakeTransactionLogEntry(null, null, null, null, null, null, null, sidecarEntry, null);
}

@Nullable
Expand Down Expand Up @@ -153,9 +167,23 @@ public CdcEntry getCDC()
return cdcEntry;
}

@Nullable
@JsonProperty
public SidecarEntry getSidecar()
{
return sidecar;
}

@Nullable
@JsonProperty
public CheckpointMetadataEntry getCheckpointMetadata()
{
return checkpointMetadata;
}

public DeltaLakeTransactionLogEntry withCommitInfo(CommitInfoEntry commitInfo)
{
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
return new DeltaLakeTransactionLogEntry(txn, add, remove, metaData, protocol, commitInfo, cdcEntry, sidecar, checkpointMetadata);
}

@Override
Expand All @@ -174,18 +202,20 @@ public boolean equals(Object o)
Objects.equals(metaData, that.metaData) &&
Objects.equals(protocol, that.protocol) &&
Objects.equals(commitInfo, that.commitInfo) &&
Objects.equals(cdcEntry, that.cdcEntry);
Objects.equals(cdcEntry, that.cdcEntry) &&
Objects.equals(sidecar, that.sidecar) &&
Objects.equals(checkpointMetadata, that.checkpointMetadata);
}

@Override
public int hashCode()
{
return Objects.hash(txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
return Objects.hash(txn, add, remove, metaData, protocol, commitInfo, cdcEntry, sidecar, checkpointMetadata);
}

@Override
public String toString()
{
return String.format("DeltaLakeTransactionLogEntry{%s, %s, %s, %s, %s, %s, %s}", txn, add, remove, metaData, protocol, commitInfo, cdcEntry);
return String.format("DeltaLakeTransactionLogEntry{%s, %s, %s, %s, %s, %s, %s, %s, %s}", txn, add, remove, metaData, protocol, commitInfo, cdcEntry, sidecar, checkpointMetadata);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog;

import com.google.common.collect.ImmutableMap;

import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information
public record SidecarEntry(String path, long sizeInBytes, long modificationTime, Optional<Map<String, String>> tags)
{
public SidecarEntry
{
checkArgument(sizeInBytes > 0, "sizeInBytes is not positive: %s", sizeInBytes);
requireNonNull(path, "path is null");
requireNonNull(tags, "tags is null");
tags = tags.map(ImmutableMap::copyOf);
}
}
Loading

0 comments on commit f1773d4

Please sign in to comment.