Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for writer version 7 in Delta Lake #18423

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.SchemaTableName;

Expand All @@ -30,6 +31,7 @@ public class DeltaLakeInsertTableHandle
private final SchemaTableName tableName;
private final String location;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final List<DeltaLakeColumnHandle> inputColumns;
private final long readVersion;
private final boolean retriesEnabled;
Expand All @@ -39,12 +41,14 @@ public DeltaLakeInsertTableHandle(
@JsonProperty("tableName") SchemaTableName tableName,
@JsonProperty("location") String location,
@JsonProperty("metadataEntry") MetadataEntry metadataEntry,
@JsonProperty("protocolEntry") ProtocolEntry protocolEntry,
@JsonProperty("inputColumns") List<DeltaLakeColumnHandle> inputColumns,
@JsonProperty("readVersion") long readVersion,
@JsonProperty("retriesEnabled") boolean retriesEnabled)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.inputColumns = ImmutableList.copyOf(inputColumns);
this.location = requireNonNull(location, "location is null");
this.readVersion = readVersion;
Expand All @@ -69,6 +73,12 @@ public MetadataEntry getMetadataEntry()
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public List<DeltaLakeColumnHandle> getInputColumns()
{
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle;
import io.trino.plugin.deltalake.procedure.DeltaTableOptimizeHandle;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.hive.NodeVersion;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.connector.ConnectorInsertTableHandle;
Expand Down Expand Up @@ -119,7 +120,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
{
DeltaLakeInsertTableHandle tableHandle = (DeltaLakeInsertTableHandle) insertTableHandle;
MetadataEntry metadataEntry = tableHandle.getMetadataEntry();
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, tableHandle.getProtocolEntry(), typeManager);
return new DeltaLakePageSink(
typeManager.getTypeOperators(),
tableHandle.getInputColumns(),
Expand All @@ -142,7 +143,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
switch (executeHandle.getProcedureId()) {
case OPTIMIZE:
DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.getProcedureHandle();
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(optimizeHandle.getMetadataEntry(), typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry(), typeManager);
return new DeltaLakePageSink(
typeManager.getTypeOperators(),
optimizeHandle.getTableColumns(),
Expand All @@ -167,7 +168,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
DeltaLakeMergeTableHandle merge = (DeltaLakeMergeTableHandle) mergeHandle;
DeltaLakeInsertTableHandle tableHandle = merge.getInsertTableHandle();
ConnectorPageSink pageSink = createPageSink(transactionHandle, session, tableHandle, pageSinkId);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.getMetadataEntry(), typeManager);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager);

return new DeltaLakeMergeSink(
typeManager.getTypeOperators(),
Expand All @@ -183,7 +184,7 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
tableHandle.getInputColumns(),
domainCompactionThreshold,
() -> createCdfPageSink(merge, session),
changeDataFeedEnabled(tableHandle.getMetadataEntry()),
changeDataFeedEnabled(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry()).orElse(false),
parquetSchemaMapping);
}

Expand All @@ -192,8 +193,9 @@ private DeltaLakeCdfPageSink createCdfPageSink(
ConnectorSession session)
{
MetadataEntry metadataEntry = mergeTableHandle.getTableHandle().getMetadataEntry();
ProtocolEntry protocolEntry = mergeTableHandle.getTableHandle().getProtocolEntry();
Set<String> partitionKeys = mergeTableHandle.getTableHandle().getMetadataEntry().getOriginalPartitionColumns().stream().collect(toImmutableSet());
List<DeltaLakeColumnHandle> tableColumns = extractSchema(metadataEntry, typeManager).stream()
List<DeltaLakeColumnHandle> tableColumns = extractSchema(metadataEntry, protocolEntry, typeManager).stream()
.map(metadata -> new DeltaLakeColumnHandle(
metadata.getName(),
metadata.getType(),
Expand All @@ -216,7 +218,7 @@ private DeltaLakeCdfPageSink createCdfPageSink(
.build();
Location tableLocation = Location.of(mergeTableHandle.getTableHandle().getLocation());

DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, typeManager, true);
DeltaLakeParquetSchemaMapping parquetSchemaMapping = createParquetSchemaMapping(metadataEntry, protocolEntry, typeManager, true);

return new DeltaLakeCdfPageSink(
typeManager.getTypeOperators(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ public ConnectorPageSource createPageSource(
.collect(toImmutableList());

Map<String, Optional<String>> partitionKeys = split.getPartitionKeys();
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry());
ColumnMappingMode columnMappingMode = getColumnMappingMode(table.getMetadataEntry(), table.getProtocolEntry());
Optional<List<String>> partitionValues = Optional.empty();
if (deltaLakeColumns.stream().anyMatch(column -> column.getBaseColumnName().equals(ROW_ID_COLUMN_NAME))) {
partitionValues = Optional.of(new ArrayList<>());
for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), typeManager)) {
for (DeltaLakeColumnMetadata column : extractSchema(table.getMetadataEntry(), table.getProtocolEntry(), typeManager)) {
Optional<String> value = switch (columnMappingMode) {
case NONE:
yield partitionKeys.get(column.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.json.ObjectMapperProvider;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.Location;
import io.trino.spi.TrinoException;
import io.trino.spi.type.DecimalType;
Expand Down Expand Up @@ -86,14 +87,14 @@ public final class DeltaLakeParquetSchemas

private DeltaLakeParquetSchemas() {}

public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, TypeManager typeManager)
public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager)
{
return createParquetSchemaMapping(metadataEntry, typeManager, false);
return createParquetSchemaMapping(metadataEntry, protocolEntry, typeManager, false);
}

public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, TypeManager typeManager, boolean addChangeDataFeedFields)
public static DeltaLakeParquetSchemaMapping createParquetSchemaMapping(MetadataEntry metadataEntry, ProtocolEntry protocolEntry, TypeManager typeManager, boolean addChangeDataFeedFields)
{
DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry);
DeltaLakeSchemaSupport.ColumnMappingMode columnMappingMode = getColumnMappingMode(metadataEntry, protocolEntry);
return createParquetSchemaMapping(
metadataEntry.getSchemaString(),
typeManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private Stream<DeltaLakeSplit> getSplits(
.map(DeltaLakeColumnHandle.class::cast))
.map(DeltaLakeColumnHandle::getBaseColumnName)
.collect(toImmutableSet());
List<DeltaLakeColumnMetadata> schema = extractSchema(tableHandle.getMetadataEntry(), typeManager);
List<DeltaLakeColumnMetadata> schema = extractSchema(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry(), typeManager);
List<DeltaLakeColumnMetadata> predicatedColumns = schema.stream()
.filter(column -> predicatedColumnNames.contains(column.getName()))
.collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;

import java.util.List;
import java.util.Optional;
Expand All @@ -30,6 +31,7 @@ public class DeltaTableOptimizeHandle
extends DeltaTableProcedureHandle
{
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;
private final List<DeltaLakeColumnHandle> tableColumns;
private final List<String> originalPartitionColumns;
private final DataSize maxScannedFileSize;
Expand All @@ -39,13 +41,15 @@ public class DeltaTableOptimizeHandle
@JsonCreator
public DeltaTableOptimizeHandle(
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
List<DeltaLakeColumnHandle> tableColumns,
List<String> originalPartitionColumns,
DataSize maxScannedFileSize,
Optional<Long> currentVersion,
boolean retriesEnabled)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.tableColumns = ImmutableList.copyOf(requireNonNull(tableColumns, "tableColumns is null"));
this.originalPartitionColumns = ImmutableList.copyOf(requireNonNull(originalPartitionColumns, "originalPartitionColumns is null"));
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
Expand All @@ -58,6 +62,7 @@ public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
checkState(this.currentVersion.isEmpty(), "currentVersion already set");
return new DeltaTableOptimizeHandle(
metadataEntry,
protocolEntry,
tableColumns,
originalPartitionColumns,
maxScannedFileSize,
Expand All @@ -71,6 +76,12 @@ public MetadataEntry getMetadataEntry()
return metadataEntry;
}

@JsonProperty
public ProtocolEntry getProtocolEntry()
{
return protocolEntry;
}

@JsonProperty
public List<DeltaLakeColumnHandle> getTableColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake.procedure;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -60,6 +61,7 @@
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.unsupportedWriterFeatures;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.TRANSACTION_LOG_DIRECTORY;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -175,11 +177,15 @@ private void doVacuum(
accessControl.checkCanDeleteFromTable(null, tableName);

TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(tableName, handle.getLocation(), session);
// TODO https://github.com/trinodb/trino/issues/15873 Check writer features when supporting writer version 7
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion()));
}
Set<String> unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of()));
if (!unsupportedWriterFeatures.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(unsupportedWriterFeatures));
}

String tableLocation = tableSnapshot.getTableLocation();
String transactionLogDir = getTransactionLogDir(tableLocation);
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
double numRecords = 0L;

MetadataEntry metadata = tableHandle.getMetadataEntry();
List<DeltaLakeColumnMetadata> columnMetadata = DeltaLakeSchemaSupport.extractSchema(metadata, typeManager);
List<DeltaLakeColumnMetadata> columnMetadata = DeltaLakeSchemaSupport.extractSchema(metadata, tableHandle.getProtocolEntry(), typeManager);
List<DeltaLakeColumnHandle> columns = columnMetadata.stream()
.map(columnMeta -> new DeltaLakeColumnHandle(
columnMeta.getName(),
Expand Down
Loading