Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract constants for table feature names in Delta Lake #19083

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode.NONE;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.MAX_COLUMN_ID_CONFIGURATION_KEY;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.TIMESTAMP_NTZ_FEATURE_NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.deserializeType;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
Expand Down Expand Up @@ -2288,8 +2289,8 @@ private ProtocolEntry protocolEntryForNewTable(boolean containsTimestampType, Ma
if (containsTimestampType) {
readerVersion = max(readerVersion, TIMESTAMP_NTZ_SUPPORTED_READER_VERSION);
writerVersion = max(writerVersion, TIMESTAMP_NTZ_SUPPORTED_WRITER_VERSION);
readerFeatures.add("timestampNtz");
writerFeatures.add("timestampNtz");
readerFeatures.add(TIMESTAMP_NTZ_FEATURE_NAME);
writerFeatures.add(TIMESTAMP_NTZ_FEATURE_NAME);
}
return new ProtocolEntry(
readerVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,27 @@ private DeltaLakeSchemaSupport() {}
private static final String DELETION_VECTORS_CONFIGURATION_KEY = "delta.enableDeletionVectors";

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features
private static final String APPEND_ONLY_FEATURE_NAME = "appendOnly";
private static final String CHANGE_DATA_FEED_FEATURE_NAME = "changeDataFeed";
private static final String CHECK_CONSTRAINTS_FEATURE_NAME = "checkConstraints";
private static final String COLUMN_MAPPING_FEATURE_NAME = "columnMapping";
private static final String DELETION_VECTORS_FEATURE_NAME = "deletionVectors";
private static final String IDENTITY_COLUMNS_FEATURE_NAME = "identityColumns";
private static final String INVARIANTS_FEATURE_NAME = "invariants";
public static final String TIMESTAMP_NTZ_FEATURE_NAME = "timestampNtz";

private static final Set<String> SUPPORTED_READER_FEATURES = ImmutableSet.<String>builder()
.add("columnMapping")
.add("timestampNtz")
.add("deletionVectors")
.add(COLUMN_MAPPING_FEATURE_NAME)
.add(TIMESTAMP_NTZ_FEATURE_NAME)
.add(DELETION_VECTORS_FEATURE_NAME)
.build();
private static final Set<String> SUPPORTED_WRITER_FEATURES = ImmutableSet.<String>builder()
.add("appendOnly")
.add("invariants")
.add("checkConstraints")
.add("changeDataFeed")
.add("columnMapping")
.add("timestampNtz")
.add(APPEND_ONLY_FEATURE_NAME)
.add(INVARIANTS_FEATURE_NAME)
.add(CHECK_CONSTRAINTS_FEATURE_NAME)
.add(CHANGE_DATA_FEED_FEATURE_NAME)
.add(COLUMN_MAPPING_FEATURE_NAME)
.add(TIMESTAMP_NTZ_FEATURE_NAME)
.build();

public enum ColumnMappingMode
Expand Down Expand Up @@ -134,15 +143,15 @@ public enum ColumnMappingMode

public static boolean isAppendOnly(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("appendOnly")) {
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains(APPEND_ONLY_FEATURE_NAME)) {
return false;
}
return parseBoolean(metadataEntry.getConfiguration().getOrDefault(APPEND_ONLY_CONFIGURATION_KEY, "false"));
}

public static boolean isDeletionVectorEnabled(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("deletionVectors")) {
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains(DELETION_VECTORS_FEATURE_NAME)) {
return false;
}
return parseBoolean(metadataEntry.getConfiguration().get(DELETION_VECTORS_CONFIGURATION_KEY));
Expand All @@ -151,8 +160,8 @@ public static boolean isDeletionVectorEnabled(MetadataEntry metadataEntry, Proto
public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata, ProtocolEntry protocolEntry)
{
if (protocolEntry.supportsReaderFeatures() || protocolEntry.supportsWriterFeatures()) {
boolean supportsColumnMappingReader = protocolEntry.readerFeaturesContains("columnMapping");
boolean supportsColumnMappingWriter = protocolEntry.writerFeaturesContains("columnMapping");
boolean supportsColumnMappingReader = protocolEntry.readerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME);
boolean supportsColumnMappingWriter = protocolEntry.writerFeaturesContains(COLUMN_MAPPING_FEATURE_NAME);
int columnMappingEnabled = countTrue(supportsColumnMappingReader, supportsColumnMappingWriter);
checkArgument(
columnMappingEnabled == 0 || columnMappingEnabled == 2,
Expand Down Expand Up @@ -487,7 +496,7 @@ public static Map<String, Boolean> getColumnsNullability(MetadataEntry metadataE

public static Map<String, Boolean> getColumnIdentities(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("identityColumns")) {
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains(IDENTITY_COLUMNS_FEATURE_NAME)) {
return ImmutableMap.of();
}
return getColumnProperties(metadataEntry, DeltaLakeSchemaSupport::isIdentityColumn);
Expand All @@ -502,7 +511,7 @@ private static boolean isIdentityColumn(JsonNode node)
public static Map<String, String> getColumnInvariants(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
if (protocolEntry.supportsWriterFeatures()) {
if (!protocolEntry.writerFeaturesContains("invariants")) {
if (!protocolEntry.writerFeaturesContains(INVARIANTS_FEATURE_NAME)) {
return ImmutableMap.of();
}
return getColumnProperties(metadataEntry, DeltaLakeSchemaSupport::getInvariantsWriterFeature);
Expand Down Expand Up @@ -548,7 +557,7 @@ private static String getGeneratedColumnExpressions(JsonNode node)

public static Map<String, String> getCheckConstraints(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("checkConstraints")) {
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains(CHECK_CONSTRAINTS_FEATURE_NAME)) {
return ImmutableMap.of();
}
return metadataEntry.getConfiguration().entrySet().stream()
Expand All @@ -558,7 +567,7 @@ public static Map<String, String> getCheckConstraints(MetadataEntry metadataEntr

public static Optional<Boolean> changeDataFeedEnabled(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains("changeDataFeed")) {
if (protocolEntry.supportsWriterFeatures() && !protocolEntry.writerFeaturesContains(CHANGE_DATA_FEED_FEATURE_NAME)) {
return Optional.empty();
}
String enableChangeDataFeed = metadataEntry.getConfiguration().get(DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY);
Expand Down