Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ProtocolEntry to DeltaLakeTableHandle #18914

Merged
merged 1 commit into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
managed,
tableLocation,
metadataEntry,
protocolEntry,
TupleDomain.all(),
TupleDomain.all(),
Optional.empty(),
Expand Down Expand Up @@ -1258,7 +1259,7 @@ private static boolean isCreatedBy(Table table, String queryId)
public void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<String> comment)
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle);
checkSupportedWriterVersion(handle);
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (columnMappingMode != ID && columnMappingMode != NAME && columnMappingMode != NONE) {
throw new TrinoException(NOT_SUPPORTED, "Setting a table comment with column mapping %s is not supported".formatted(columnMappingMode));
Expand All @@ -1280,7 +1281,7 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
SET_TBLPROPERTIES_OPERATION,
session,
comment,
getProtocolEntry(session, handle));
handle.getProtocolEntry());
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -1294,7 +1295,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle) tableHandle;
DeltaLakeColumnHandle deltaLakeColumnHandle = (DeltaLakeColumnHandle) column;
verify(deltaLakeColumnHandle.isBaseColumn(), "Unexpected dereference: %s", column);
checkSupportedWriterVersion(session, deltaLakeTableHandle);
checkSupportedWriterVersion(deltaLakeTableHandle);
ColumnMappingMode columnMappingMode = getColumnMappingMode(deltaLakeTableHandle.getMetadataEntry());
if (columnMappingMode != ID && columnMappingMode != NAME && columnMappingMode != NONE) {
throw new TrinoException(NOT_SUPPORTED, "Setting a column comment with column mapping %s is not supported".formatted(columnMappingMode));
Expand Down Expand Up @@ -1324,7 +1325,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
CHANGE_COLUMN_OPERATION,
session,
Optional.ofNullable(deltaLakeTableHandle.getMetadataEntry().getDescription()),
getProtocolEntry(session, deltaLakeTableHandle));
deltaLakeTableHandle.getProtocolEntry());
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -1348,7 +1349,7 @@ public void setViewColumnComment(ConnectorSession session, SchemaTableName viewN
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata newColumnMetadata)
{
DeltaLakeTableHandle handle = checkValidTableHandle(tableHandle);
checkSupportedWriterVersion(session, handle);
checkSupportedWriterVersion(handle);
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (changeDataFeedEnabled(handle.getMetadataEntry()) && CHANGE_DATA_FEED_COLUMN_NAMES.contains(newColumnMetadata.getName())) {
throw new TrinoException(NOT_SUPPORTED, "Column name %s is forbidden when change data feed is enabled".formatted(newColumnMetadata.getName()));
Expand Down Expand Up @@ -1410,7 +1411,7 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle
ADD_COLUMN_OPERATION,
session,
Optional.ofNullable(handle.getMetadataEntry().getDescription()),
getProtocolEntry(session, handle));
handle.getProtocolEntry());
transactionLogWriter.flush();
}
catch (Exception e) {
Expand All @@ -1427,7 +1428,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
String dropColumnName = deltaLakeColumn.getBaseColumnName();
MetadataEntry metadataEntry = table.getMetadataEntry();

checkSupportedWriterVersion(session, table);
checkSupportedWriterVersion(table);
ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
if (columnMappingMode != ColumnMappingMode.NAME && columnMappingMode != ColumnMappingMode.ID) {
throw new TrinoException(NOT_SUPPORTED, "Cannot drop column from table using column mapping mode " + columnMappingMode);
Expand Down Expand Up @@ -1476,7 +1477,7 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
DROP_COLUMN_OPERATION,
session,
Optional.ofNullable(metadataEntry.getDescription()),
getProtocolEntry(session, table));
table.getProtocolEntry());
transactionLogWriter.flush();
}
catch (Exception e) {
Expand Down Expand Up @@ -1508,7 +1509,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
verify(deltaLakeColumn.isBaseColumn(), "Unexpected dereference: %s", deltaLakeColumn);
String sourceColumnName = deltaLakeColumn.getBaseColumnName();

checkSupportedWriterVersion(session, table);
checkSupportedWriterVersion(table);
if (changeDataFeedEnabled(table.getMetadataEntry())) {
throw new TrinoException(NOT_SUPPORTED, "Cannot rename column when change data feed is enabled");
}
Expand Down Expand Up @@ -1558,7 +1559,7 @@ public void renameColumn(ConnectorSession session, ConnectorTableHandle tableHan
RENAME_COLUMN_OPERATION,
session,
Optional.ofNullable(metadataEntry.getDescription()),
getProtocolEntry(session, table));
table.getProtocolEntry());
transactionLogWriter.flush();
// Don't update extended statistics because it uses physical column names internally
}
Expand Down Expand Up @@ -1671,7 +1672,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
{
DeltaLakeTableHandle table = (DeltaLakeTableHandle) tableHandle;
checkWriteAllowed(session, table);
checkWriteSupported(session, table);
checkWriteSupported(table);

List<DeltaLakeColumnHandle> inputColumns = columns.stream()
.map(handle -> (DeltaLakeColumnHandle) handle)
Expand Down Expand Up @@ -1845,7 +1846,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true");
}
checkWriteAllowed(session, handle);
checkWriteSupported(session, handle);
checkWriteSupported(handle);

List<DeltaLakeColumnHandle> inputColumns = getColumns(handle.getMetadataEntry()).stream()
.filter(column -> column.getColumnType() != SYNTHESIZED)
Expand Down Expand Up @@ -2054,7 +2055,7 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.getProcedureHandle();

checkWriteAllowed(session, table);
checkSupportedWriterVersion(session, table);
checkSupportedWriterVersion(table);

return new BeginTableExecuteResult<>(
executeHandle.withProcedureHandle(optimizeHandle.withCurrentVersion(table.getReadVersion())),
Expand Down Expand Up @@ -2158,9 +2159,9 @@ private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableH
}
}

private void checkWriteSupported(ConnectorSession session, DeltaLakeTableHandle handle)
private void checkWriteSupported(DeltaLakeTableHandle handle)
{
checkSupportedWriterVersion(session, handle);
checkSupportedWriterVersion(handle);
checkUnsupportedGeneratedColumns(handle.getMetadataEntry());
ColumnMappingMode columnMappingMode = getColumnMappingMode(handle.getMetadataEntry());
if (!(columnMappingMode == NONE || columnMappingMode == ColumnMappingMode.NAME || columnMappingMode == ColumnMappingMode.ID)) {
Expand All @@ -2180,21 +2181,16 @@ private void checkUnsupportedGeneratedColumns(MetadataEntry metadataEntry)
}
}

private void checkSupportedWriterVersion(ConnectorSession session, DeltaLakeTableHandle handle)
private void checkSupportedWriterVersion(DeltaLakeTableHandle handle)
{
int requiredWriterVersion = getProtocolEntry(session, handle).getMinWriterVersion();
int requiredWriterVersion = handle.getProtocolEntry().getMinWriterVersion();
if (requiredWriterVersion > MAX_WRITER_VERSION) {
throw new TrinoException(
NOT_SUPPORTED,
format("Table %s requires Delta Lake writer version %d which is not supported", handle.getSchemaTableName(), requiredWriterVersion));
}
}

private ProtocolEntry getProtocolEntry(ConnectorSession session, DeltaLakeTableHandle handle)
{
return transactionLogAccess.getProtocolEntry(session, getSnapshot(handle.getSchemaTableName(), handle.getLocation(), session));
}

private TableSnapshot getSnapshot(SchemaTableName schemaTableName, String tableLocation, ConnectorSession session)
{
try {
Expand Down Expand Up @@ -2338,7 +2334,7 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties));
}

ProtocolEntry currentProtocolEntry = getProtocolEntry(session, handle);
ProtocolEntry currentProtocolEntry = handle.getProtocolEntry();

long createdTime = Instant.now().toEpochMilli();

Expand Down Expand Up @@ -2582,6 +2578,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
tableHandle.isManaged(),
tableHandle.getLocation(),
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
// Do not simplify the enforced constraint, the connector is guaranteeing the constraint will be applied as is.
// The unenforced constraint will still be checked by the engine.
tableHandle.getEnforcedPartitionConstraint()
Expand Down Expand Up @@ -2884,6 +2881,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
handle.isManaged(),
handle.getLocation(),
metadata,
handle.getProtocolEntry(),
TupleDomain.all(),
TupleDomain.all(),
Optional.empty(),
Expand Down Expand Up @@ -3326,7 +3324,7 @@ private OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandl
throw new TrinoException(NOT_SUPPORTED, "Cannot modify rows from a table with '" + APPEND_ONLY_CONFIGURATION_KEY + "' set to true");
}
checkWriteAllowed(session, tableHandle);
checkWriteSupported(session, tableHandle);
checkWriteSupported(tableHandle);

String tableLocation = tableHandle.location();
List<AddFileEntry> activeFiles = getAddFileEntriesMatchingEnforcedPartitionConstraint(session, tableHandle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;

Expand Down Expand Up @@ -46,6 +47,7 @@ public enum WriteType
private final boolean managed;
private final String location;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;
private final TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint;
private final Optional<WriteType> writeType;
Expand Down Expand Up @@ -73,6 +75,7 @@ public DeltaLakeTableHandle(
@JsonProperty("managed") boolean managed,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry,
@JsonProperty("enforcedPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
@JsonProperty("nonPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
@JsonProperty("writeType") Optional<WriteType> writeType,
Expand All @@ -88,6 +91,7 @@ public DeltaLakeTableHandle(
managed,
location,
metadataEntry,
protocolEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
ImmutableSet.of(),
Expand All @@ -107,6 +111,7 @@ public DeltaLakeTableHandle(
boolean managed,
String location,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Set<DeltaLakeColumnHandle> constraintColumns,
Expand All @@ -124,6 +129,7 @@ public DeltaLakeTableHandle(
this.managed = managed;
this.location = requireNonNull(location, "location is null");
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.enforcedPartitionConstraint = requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null");
this.nonPartitionConstraint = requireNonNull(nonPartitionConstraint, "nonPartitionConstraint is null");
this.writeType = requireNonNull(writeType, "writeType is null");
Expand All @@ -147,6 +153,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> proj
managed,
location,
metadataEntry,
protocolEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
Expand All @@ -168,6 +175,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
managed,
location,
metadataEntry,
protocolEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
Expand Down Expand Up @@ -234,6 +242,12 @@ public MetadataEntry getMetadataEntry()
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public TupleDomain<DeltaLakeColumnHandle> getEnforcedPartitionConstraint()
{
Expand Down Expand Up @@ -324,6 +338,7 @@ public boolean equals(Object o)
managed == that.managed &&
Objects.equals(location, that.location) &&
Objects.equals(metadataEntry, that.metadataEntry) &&
Objects.equals(protocolEntry, that.protocolEntry) &&
Objects.equals(enforcedPartitionConstraint, that.enforcedPartitionConstraint) &&
Objects.equals(nonPartitionConstraint, that.nonPartitionConstraint) &&
Objects.equals(writeType, that.writeType) &&
Expand All @@ -344,6 +359,7 @@ public int hashCode()
managed,
location,
metadataEntry,
protocolEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
writeType,
Expand Down
Loading