Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Dereference pushdown for the Delta Lake connector #17085

Merged
merged 4 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ configure processing of Parquet files.
* - ``parquet_writer_batch_size``
krvikash marked this conversation as resolved.
Show resolved Hide resolved
- Maximum number of rows processed by the Parquet writer in a batch.
- ``10000``
* - ``projection_pushdown_enabled``
- Read only projected fields from row columns while performing ``SELECT`` queries
- ``true``

.. _delta-lake-type-mapping:

Expand Down Expand Up @@ -930,3 +933,6 @@ connector.
for structural data types. The equivalent catalog session property is
``parquet_optimized_nested_reader_enabled``.
- ``true``
* - ``delta.projection-pushdown-enabled``
- Read only projected fields from row columns while performing ``SELECT`` queries
- ``true``
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,17 @@ public AbstractDeltaLakePageSink(
DeltaLakeColumnHandle column = inputColumns.get(inputIndex);
switch (column.getColumnType()) {
case PARTITION_KEY:
int partitionPosition = canonicalToOriginalPartitionPositions.get(column.getName());
int partitionPosition = canonicalToOriginalPartitionPositions.get(column.getColumnName());
partitionColumnInputIndex[partitionPosition] = inputIndex;
originalPartitionColumnNames[partitionPosition] = canonicalToOriginalPartitionColumns.get(column.getName());
partitionColumnTypes[partitionPosition] = column.getType();
originalPartitionColumnNames[partitionPosition] = canonicalToOriginalPartitionColumns.get(column.getColumnName());
partitionColumnTypes[partitionPosition] = column.getBaseType();
break;
case REGULAR:
verify(column.isBaseColumn(), "Unexpected dereference: %s", column);
dataColumnHandles.add(column);
dataColumnsInputIndex.add(inputIndex);
dataColumnNames.add(column.getPhysicalName());
dataColumnTypes.add(column.getPhysicalType());
dataColumnNames.add(column.getBasePhysicalColumnName());
krvikash marked this conversation as resolved.
Show resolved Hide resolved
dataColumnTypes.add(column.getBasePhysicalType());
break;
case SYNTHESIZED:
processSynthesizedColumn(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;

import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.BLOCK_POSITION;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
Expand All @@ -37,7 +38,8 @@ public class DeltaLakeBucketFunction
public DeltaLakeBucketFunction(TypeOperators typeOperators, List<DeltaLakeColumnHandle> partitioningColumns, int bucketCount)
{
this.hashCodeInvokers = partitioningColumns.stream()
.map(DeltaLakeColumnHandle::getType)
.peek(column -> verify(column.isBaseColumn(), "Unexpected dereference: %s", column))
.map(DeltaLakeColumnHandle::getBaseType)
krvikash marked this conversation as resolved.
Show resolved Hide resolved
.map(type -> typeOperators.getHashCodeOperator(type, simpleConvention(FAIL_ON_NULL, BLOCK_POSITION)))
.collect(toImmutableList());
this.bucketCount = bucketCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -23,9 +24,13 @@
import java.util.Optional;
import java.util.OptionalInt;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.RowType.field;
Expand Down Expand Up @@ -55,64 +60,68 @@ public class DeltaLakeColumnHandle
public static final String FILE_MODIFIED_TIME_COLUMN_NAME = "$file_modified_time";
public static final Type FILE_MODIFIED_TIME_TYPE = TIMESTAMP_TZ_MILLIS;

private final String name;
private final Type type;
private final OptionalInt fieldId;
private final String baseColumnName;
private final Type baseType;
private final OptionalInt baseFieldId;
// Hold field names in Parquet files
// The value is same as 'name' when the column mapping mode is none
// The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. col-6707cc9e-f3aa-4e6b-b8ef-1b03d3475680
private final String physicalName;
private final String basePhysicalColumnName;
// Hold type in Parquet files
// The value is same as 'type' when the column mapping mode is none
// The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. row(col-5924c8b3-04cf-4146-abb5-2c229e7ff708 integer)
private final Type physicalType;
private final Type basePhysicalType;
private final DeltaLakeColumnType columnType;
private final Optional<DeltaLakeColumnProjectionInfo> projectionInfo;
findepi marked this conversation as resolved.
Show resolved Hide resolved
krvikash marked this conversation as resolved.
Show resolved Hide resolved

@JsonCreator
public DeltaLakeColumnHandle(
@JsonProperty("name") String name,
@JsonProperty("type") Type type,
@JsonProperty("fieldId") OptionalInt fieldId,
@JsonProperty("physicalName") String physicalName,
@JsonProperty("physicalType") Type physicalType,
@JsonProperty("columnType") DeltaLakeColumnType columnType)
{
this.name = requireNonNull(name, "name is null");
this.type = requireNonNull(type, "type is null");
this.fieldId = requireNonNull(fieldId, "fieldId is null");
this.physicalName = requireNonNull(physicalName, "physicalName is null");
this.physicalType = requireNonNull(physicalType, "physicalType is null");
@JsonProperty("baseColumnName") String baseColumnName,
@JsonProperty("baseType") Type baseType,
@JsonProperty("baseFieldId") OptionalInt baseFieldId,
@JsonProperty("basePhysicalColumnName") String basePhysicalColumnName,
@JsonProperty("basePhysicalType") Type basePhysicalType,
@JsonProperty("columnType") DeltaLakeColumnType columnType,
krvikash marked this conversation as resolved.
Show resolved Hide resolved
@JsonProperty("projectionInfo") Optional<DeltaLakeColumnProjectionInfo> projectionInfo)
{
this.baseColumnName = requireNonNull(baseColumnName, "baseColumnName is null");
this.baseType = requireNonNull(baseType, "baseType is null");
this.baseFieldId = requireNonNull(baseFieldId, "baseFieldId is null");
this.basePhysicalColumnName = requireNonNull(basePhysicalColumnName, "basePhysicalColumnName is null");
this.basePhysicalType = requireNonNull(basePhysicalType, "basePhysicalType is null");
this.columnType = requireNonNull(columnType, "columnType is null");
checkArgument(projectionInfo.isEmpty() || columnType == REGULAR, "Projection info present for column type: %s", columnType);
this.projectionInfo = projectionInfo;
}

@JsonProperty
public String getName()
public String getBaseColumnName()
{
return name;
return baseColumnName;
}

@JsonProperty
public Type getType()
public Type getBaseType()
{
return type;
return baseType;
}

@JsonProperty
public OptionalInt getFieldId()
public OptionalInt getBaseFieldId()
{
return fieldId;
return baseFieldId;
}

@JsonProperty
public String getPhysicalName()
public String getBasePhysicalColumnName()
{
return physicalName;
return basePhysicalColumnName;
}

@JsonProperty
public Type getPhysicalType()
public Type getBasePhysicalType()
{
return physicalType;
return basePhysicalType;
}

@JsonProperty
Expand All @@ -121,6 +130,12 @@ public DeltaLakeColumnType getColumnType()
return columnType;
}

@JsonProperty
public Optional<DeltaLakeColumnProjectionInfo> getProjectionInfo()
{
return projectionInfo;
}

@Override
public boolean equals(Object obj)
{
Expand All @@ -131,56 +146,88 @@ public boolean equals(Object obj)
return false;
}
DeltaLakeColumnHandle other = (DeltaLakeColumnHandle) obj;
return Objects.equals(this.name, other.name) &&
Objects.equals(this.type, other.type) &&
Objects.equals(this.fieldId, other.fieldId) &&
Objects.equals(this.physicalName, other.physicalName) &&
Objects.equals(this.physicalType, other.physicalType) &&
this.columnType == other.columnType;
return Objects.equals(this.baseColumnName, other.baseColumnName) &&
Objects.equals(this.baseType, other.baseType) &&
Objects.equals(this.baseFieldId, other.baseFieldId) &&
Objects.equals(this.basePhysicalColumnName, other.basePhysicalColumnName) &&
Objects.equals(this.basePhysicalType, other.basePhysicalType) &&
this.columnType == other.columnType &&
Objects.equals(this.projectionInfo, other.projectionInfo);
}

@JsonIgnore
public String getColumnName()
{
checkState(isBaseColumn(), "Unexpected dereference: %s", this);
return baseColumnName;
}

@JsonIgnore
public String getQualifiedPhysicalName()
{
return projectionInfo.map(projectionInfo -> basePhysicalColumnName + "#" + projectionInfo.getPartialName())
krvikash marked this conversation as resolved.
Show resolved Hide resolved
.orElse(basePhysicalColumnName);
}

public long getRetainedSizeInBytes()
{
// type is not accounted for as the instances are cached (by TypeRegistry) and shared
return INSTANCE_SIZE + estimatedSizeOf(name);
return INSTANCE_SIZE
+ estimatedSizeOf(baseColumnName)
+ sizeOf(baseFieldId)
+ estimatedSizeOf(basePhysicalColumnName)
+ projectionInfo.map(DeltaLakeColumnProjectionInfo::getRetainedSizeInBytes).orElse(0L);
krvikash marked this conversation as resolved.
Show resolved Hide resolved
}

@JsonIgnore
public boolean isBaseColumn()
{
return projectionInfo.isEmpty();
}

@Override
public int hashCode()
{
return Objects.hash(name, type, fieldId, physicalName, physicalType, columnType);
return Objects.hash(baseColumnName, baseType, baseFieldId, basePhysicalColumnName, basePhysicalType, columnType, projectionInfo);
}

@Override
public String toString()
{
return name + ":" + type.getDisplayName() + ":" + columnType;
return getQualifiedPhysicalName() +
":" + projectionInfo.map(DeltaLakeColumnProjectionInfo::getType).orElse(baseType).getDisplayName() +
":" + columnType;
}

public HiveColumnHandle toHiveColumnHandle()
{
return new HiveColumnHandle(
physicalName, // this name is used for accessing Parquet files, so it should be physical name
basePhysicalColumnName, // this name is used for accessing Parquet files, so it should be physical name
0, // hiveColumnIndex; we provide fake value because we always find columns by name
findepi marked this conversation as resolved.
Show resolved Hide resolved
toHiveType(physicalType),
physicalType,
Optional.empty(),
toHiveType(basePhysicalType),
basePhysicalType,
projectionInfo.map(DeltaLakeColumnProjectionInfo::toHiveColumnProjectionInfo),
columnType.toHiveColumnType(),
Optional.empty());
}

public static DeltaLakeColumnHandle pathColumnHandle()
{
return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED);
return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED, Optional.empty());
}

public static DeltaLakeColumnHandle fileSizeColumnHandle()
{
return new DeltaLakeColumnHandle(FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, OptionalInt.empty(), FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, SYNTHESIZED);
return new DeltaLakeColumnHandle(FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, OptionalInt.empty(), FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, SYNTHESIZED, Optional.empty());
}

public static DeltaLakeColumnHandle fileModifiedTimeColumnHandle()
{
return new DeltaLakeColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, OptionalInt.empty(), FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, SYNTHESIZED);
return new DeltaLakeColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, OptionalInt.empty(), FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, SYNTHESIZED, Optional.empty());
}

public static DeltaLakeColumnHandle mergeRowIdColumnHandle()
{
return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, OptionalInt.empty(), ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, SYNTHESIZED, Optional.empty());
}
}
Loading