Skip to content

Commit

Permalink
Implement Dereference pushdown for the Delta Lake connector
Browse files Browse the repository at this point in the history
  • Loading branch information
krvikash authored and findepi committed May 8, 2023
1 parent 35eda7d commit 43e90f4
Show file tree
Hide file tree
Showing 39 changed files with 1,391 additions and 247 deletions.
6 changes: 6 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ configure processing of Parquet files.
* - ``parquet_writer_batch_size``
- Maximum number of rows processed by the Parquet writer in a batch.
- ``10000``
* - ``projection_pushdown_enabled``
- Read only projected fields from row columns while performing ``SELECT`` queries
- ``true``

.. _delta-lake-type-mapping:

Expand Down Expand Up @@ -930,3 +933,6 @@ connector.
for structural data types. The equivalent catalog session property is
``parquet_optimized_nested_reader_enabled``.
- ``true``
* - ``delta.projection-pushdown-enabled``
- Read only projected fields from row columns while performing ``SELECT`` queries
- ``true``
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,17 @@ public AbstractDeltaLakePageSink(
DeltaLakeColumnHandle column = inputColumns.get(inputIndex);
switch (column.getColumnType()) {
case PARTITION_KEY:
int partitionPosition = canonicalToOriginalPartitionPositions.get(column.getName());
int partitionPosition = canonicalToOriginalPartitionPositions.get(column.getColumnName());
partitionColumnInputIndex[partitionPosition] = inputIndex;
originalPartitionColumnNames[partitionPosition] = canonicalToOriginalPartitionColumns.get(column.getName());
partitionColumnTypes[partitionPosition] = column.getType();
originalPartitionColumnNames[partitionPosition] = canonicalToOriginalPartitionColumns.get(column.getColumnName());
partitionColumnTypes[partitionPosition] = column.getBaseType();
break;
case REGULAR:
verify(column.isBaseColumn(), "Unexpected dereference: %s", column);
dataColumnHandles.add(column);
dataColumnsInputIndex.add(inputIndex);
dataColumnNames.add(column.getPhysicalName());
dataColumnTypes.add(column.getPhysicalType());
dataColumnNames.add(column.getBasePhysicalColumnName());
dataColumnTypes.add(column.getBasePhysicalType());
break;
case SYNTHESIZED:
processSynthesizedColumn(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;

import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.BLOCK_POSITION;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
Expand All @@ -37,7 +38,8 @@ public class DeltaLakeBucketFunction
public DeltaLakeBucketFunction(TypeOperators typeOperators, List<DeltaLakeColumnHandle> partitioningColumns, int bucketCount)
{
this.hashCodeInvokers = partitioningColumns.stream()
.map(DeltaLakeColumnHandle::getType)
.peek(column -> verify(column.isBaseColumn(), "Unexpected dereference: %s", column))
.map(DeltaLakeColumnHandle::getBaseType)
.map(type -> typeOperators.getHashCodeOperator(type, simpleConvention(FAIL_ON_NULL, BLOCK_POSITION)))
.collect(toImmutableList());
this.bucketCount = bucketCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.deltalake;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -23,10 +24,13 @@
import java.util.Optional;
import java.util.OptionalInt;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.plugin.deltalake.DeltaHiveTypeTranslator.toHiveType;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.RowType.field;
Expand Down Expand Up @@ -56,64 +60,68 @@ public class DeltaLakeColumnHandle
public static final String FILE_MODIFIED_TIME_COLUMN_NAME = "$file_modified_time";
public static final Type FILE_MODIFIED_TIME_TYPE = TIMESTAMP_TZ_MILLIS;

private final String name;
private final Type type;
private final OptionalInt fieldId;
private final String baseColumnName;
private final Type baseType;
private final OptionalInt baseFieldId;
// Hold field names in Parquet files
// The value is same as 'name' when the column mapping mode is none
// The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. col-6707cc9e-f3aa-4e6b-b8ef-1b03d3475680
private final String physicalName;
private final String basePhysicalColumnName;
// Hold type in Parquet files
// The value is same as 'type' when the column mapping mode is none
// The value is same as 'delta.columnMapping.physicalName' when the column mapping mode is id or name. e.g. row(col-5924c8b3-04cf-4146-abb5-2c229e7ff708 integer)
private final Type physicalType;
private final Type basePhysicalType;
private final DeltaLakeColumnType columnType;
private final Optional<DeltaLakeColumnProjectionInfo> projectionInfo;

@JsonCreator
public DeltaLakeColumnHandle(
@JsonProperty("name") String name,
@JsonProperty("type") Type type,
@JsonProperty("fieldId") OptionalInt fieldId,
@JsonProperty("physicalName") String physicalName,
@JsonProperty("physicalType") Type physicalType,
@JsonProperty("columnType") DeltaLakeColumnType columnType)
{
this.name = requireNonNull(name, "name is null");
this.type = requireNonNull(type, "type is null");
this.fieldId = requireNonNull(fieldId, "fieldId is null");
this.physicalName = requireNonNull(physicalName, "physicalName is null");
this.physicalType = requireNonNull(physicalType, "physicalType is null");
@JsonProperty("baseColumnName") String baseColumnName,
@JsonProperty("baseType") Type baseType,
@JsonProperty("baseFieldId") OptionalInt baseFieldId,
@JsonProperty("basePhysicalColumnName") String basePhysicalColumnName,
@JsonProperty("basePhysicalType") Type basePhysicalType,
@JsonProperty("columnType") DeltaLakeColumnType columnType,
@JsonProperty("projectionInfo") Optional<DeltaLakeColumnProjectionInfo> projectionInfo)
{
this.baseColumnName = requireNonNull(baseColumnName, "baseColumnName is null");
this.baseType = requireNonNull(baseType, "baseType is null");
this.baseFieldId = requireNonNull(baseFieldId, "baseFieldId is null");
this.basePhysicalColumnName = requireNonNull(basePhysicalColumnName, "basePhysicalColumnName is null");
this.basePhysicalType = requireNonNull(basePhysicalType, "basePhysicalType is null");
this.columnType = requireNonNull(columnType, "columnType is null");
checkArgument(projectionInfo.isEmpty() || columnType == REGULAR, "Projection info present for column type: %s", columnType);
this.projectionInfo = projectionInfo;
}

@JsonProperty
public String getName()
public String getBaseColumnName()
{
return name;
return baseColumnName;
}

@JsonProperty
public Type getType()
public Type getBaseType()
{
return type;
return baseType;
}

@JsonProperty
public OptionalInt getFieldId()
public OptionalInt getBaseFieldId()
{
return fieldId;
return baseFieldId;
}

@JsonProperty
public String getPhysicalName()
public String getBasePhysicalColumnName()
{
return physicalName;
return basePhysicalColumnName;
}

@JsonProperty
public Type getPhysicalType()
public Type getBasePhysicalType()
{
return physicalType;
return basePhysicalType;
}

@JsonProperty
Expand All @@ -122,6 +130,12 @@ public DeltaLakeColumnType getColumnType()
return columnType;
}

@JsonProperty
public Optional<DeltaLakeColumnProjectionInfo> getProjectionInfo()
{
return projectionInfo;
}

@Override
public boolean equals(Object obj)
{
Expand All @@ -132,64 +146,88 @@ public boolean equals(Object obj)
return false;
}
DeltaLakeColumnHandle other = (DeltaLakeColumnHandle) obj;
return Objects.equals(this.name, other.name) &&
Objects.equals(this.type, other.type) &&
Objects.equals(this.fieldId, other.fieldId) &&
Objects.equals(this.physicalName, other.physicalName) &&
Objects.equals(this.physicalType, other.physicalType) &&
this.columnType == other.columnType;
return Objects.equals(this.baseColumnName, other.baseColumnName) &&
Objects.equals(this.baseType, other.baseType) &&
Objects.equals(this.baseFieldId, other.baseFieldId) &&
Objects.equals(this.basePhysicalColumnName, other.basePhysicalColumnName) &&
Objects.equals(this.basePhysicalType, other.basePhysicalType) &&
this.columnType == other.columnType &&
Objects.equals(this.projectionInfo, other.projectionInfo);
}

@JsonIgnore
public String getColumnName()
{
checkState(isBaseColumn(), "Unexpected dereference: %s", this);
return baseColumnName;
}

@JsonIgnore
public String getQualifiedPhysicalName()
{
return projectionInfo.map(projectionInfo -> basePhysicalColumnName + "#" + projectionInfo.getPartialName())
.orElse(basePhysicalColumnName);
}

public long getRetainedSizeInBytes()
{
// type is not accounted for as the instances are cached (by TypeRegistry) and shared
return INSTANCE_SIZE
+ estimatedSizeOf(name)
+ sizeOf(fieldId)
+ estimatedSizeOf(physicalName);
+ estimatedSizeOf(baseColumnName)
+ sizeOf(baseFieldId)
+ estimatedSizeOf(basePhysicalColumnName)
+ projectionInfo.map(DeltaLakeColumnProjectionInfo::getRetainedSizeInBytes).orElse(0L);
}

@JsonIgnore
public boolean isBaseColumn()
{
return projectionInfo.isEmpty();
}

@Override
public int hashCode()
{
return Objects.hash(name, type, fieldId, physicalName, physicalType, columnType);
return Objects.hash(baseColumnName, baseType, baseFieldId, basePhysicalColumnName, basePhysicalType, columnType, projectionInfo);
}

@Override
public String toString()
{
return name + ":" + type.getDisplayName() + ":" + columnType;
return getQualifiedPhysicalName() +
":" + projectionInfo.map(DeltaLakeColumnProjectionInfo::getType).orElse(baseType).getDisplayName() +
":" + columnType;
}

public HiveColumnHandle toHiveColumnHandle()
{
return new HiveColumnHandle(
physicalName, // this name is used for accessing Parquet files, so it should be physical name
basePhysicalColumnName, // this name is used for accessing Parquet files, so it should be physical name
0, // hiveColumnIndex; we provide fake value because we always find columns by name
toHiveType(physicalType),
physicalType,
Optional.empty(),
toHiveType(basePhysicalType),
basePhysicalType,
projectionInfo.map(DeltaLakeColumnProjectionInfo::toHiveColumnProjectionInfo),
columnType.toHiveColumnType(),
Optional.empty());
}

public static DeltaLakeColumnHandle pathColumnHandle()
{
return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED);
return new DeltaLakeColumnHandle(PATH_COLUMN_NAME, PATH_TYPE, OptionalInt.empty(), PATH_COLUMN_NAME, PATH_TYPE, SYNTHESIZED, Optional.empty());
}

public static DeltaLakeColumnHandle fileSizeColumnHandle()
{
return new DeltaLakeColumnHandle(FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, OptionalInt.empty(), FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, SYNTHESIZED);
return new DeltaLakeColumnHandle(FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, OptionalInt.empty(), FILE_SIZE_COLUMN_NAME, FILE_SIZE_TYPE, SYNTHESIZED, Optional.empty());
}

public static DeltaLakeColumnHandle fileModifiedTimeColumnHandle()
{
return new DeltaLakeColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, OptionalInt.empty(), FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, SYNTHESIZED);
return new DeltaLakeColumnHandle(FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, OptionalInt.empty(), FILE_MODIFIED_TIME_COLUMN_NAME, FILE_MODIFIED_TIME_TYPE, SYNTHESIZED, Optional.empty());
}

public static DeltaLakeColumnHandle mergeRowIdColumnHandle()
{
return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, OptionalInt.empty(), ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, SYNTHESIZED);
return new DeltaLakeColumnHandle(ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, OptionalInt.empty(), ROW_ID_COLUMN_NAME, MERGE_ROW_ID_TYPE, SYNTHESIZED, Optional.empty());
}
}
Loading

0 comments on commit 43e90f4

Please sign in to comment.