Skip to content

Commit

Permalink
Fix type of DeltaLakeTableHandle.projectedColumns
Browse files Browse the repository at this point in the history
It's a collection of Delta column handles.
  • Loading branch information
findepi committed May 10, 2023
1 parent 88a16c4 commit e7f11d3
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
{
DeltaLakeTableHandle table = checkValidTableHandle(tableHandle);
return table.getProjectedColumns()
.map(projectedColumns -> (List<DeltaLakeColumnHandle>) projectedColumns.stream()
.map(DeltaLakeColumnHandle.class::cast) // TODO DeltaLakeTableHandle.projectedColumns should be a collection of DeltaLakeColumnHandle
.collect(toImmutableList()))
.map(projectColumns -> (Collection<DeltaLakeColumnHandle>) projectColumns)
.orElseGet(() -> getColumns(table.getMetadataEntry())).stream()
// This method does not calculate column name for the projected columns
.peek(handle -> checkArgument(handle.isBaseColumn(), "Unsupported projected column: %s", handle))
Expand Down Expand Up @@ -2283,7 +2281,9 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti
// all references are simple variables
if (!isProjectionPushdownEnabled(session)
|| columnProjections.values().stream().allMatch(ProjectedColumnRepresentation::isVariable)) {
Set<ColumnHandle> projectedColumns = ImmutableSet.copyOf(assignments.values());
Set<DeltaLakeColumnHandle> projectedColumns = assignments.values().stream()
.map(DeltaLakeColumnHandle.class::cast)
.collect(toImmutableSet());
// Check if column was projected already in previous call
if (deltaLakeTableHandle.getProjectedColumns().isPresent()
&& deltaLakeTableHandle.getProjectedColumns().get().equals(projectedColumns)) {
Expand All @@ -2306,7 +2306,7 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti

Map<String, Assignment> newAssignments = new HashMap<>();
ImmutableMap.Builder<ConnectorExpression, Variable> newVariablesBuilder = ImmutableMap.builder();
ImmutableSet.Builder<ColumnHandle> projectedColumnsBuilder = ImmutableSet.builder();
ImmutableSet.Builder<DeltaLakeColumnHandle> projectedColumnsBuilder = ImmutableSet.builder();

for (Map.Entry<ConnectorExpression, ProjectedColumnRepresentation> entry : columnProjections.entrySet()) {
ConnectorExpression expression = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private static boolean mayAnyDataColumnProjected(DeltaLakeTableHandle tableHandl
return true;
}
return tableHandle.getProjectedColumns().get().stream()
.map(columnHandle -> ((DeltaLakeColumnHandle) columnHandle).getColumnType())
.map(DeltaLakeColumnHandle::getColumnType)
.anyMatch(DeltaLakeColumnType.REGULAR::equals);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;

Expand Down Expand Up @@ -51,7 +50,7 @@ public enum WriteType
private final Optional<WriteType> writeType;
private final long readVersion;

private final Optional<Set<ColumnHandle>> projectedColumns;
private final Optional<Set<DeltaLakeColumnHandle>> projectedColumns;
// UPDATE only: The list of columns being updated
private final Optional<List<DeltaLakeColumnHandle>> updatedColumns;
// UPDATE only: The list of columns which need to be copied when applying updates to the new Parquet file
Expand All @@ -74,7 +73,7 @@ public DeltaLakeTableHandle(
@JsonProperty("enforcedPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
@JsonProperty("nonPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
@JsonProperty("writeType") Optional<WriteType> writeType,
@JsonProperty("projectedColumns") Optional<Set<ColumnHandle>> projectedColumns,
@JsonProperty("projectedColumns") Optional<Set<DeltaLakeColumnHandle>> projectedColumns,
@JsonProperty("updatedColumns") Optional<List<DeltaLakeColumnHandle>> updatedColumns,
@JsonProperty("updateRowIdColumns") Optional<List<DeltaLakeColumnHandle>> updateRowIdColumns,
@JsonProperty("analyzeHandle") Optional<AnalyzeHandle> analyzeHandle,
Expand Down Expand Up @@ -107,7 +106,7 @@ public DeltaLakeTableHandle(
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<WriteType> writeType,
Optional<Set<ColumnHandle>> projectedColumns,
Optional<Set<DeltaLakeColumnHandle>> projectedColumns,
Optional<List<DeltaLakeColumnHandle>> updatedColumns,
Optional<List<DeltaLakeColumnHandle>> updateRowIdColumns,
Optional<AnalyzeHandle> analyzeHandle,
Expand All @@ -134,7 +133,7 @@ public DeltaLakeTableHandle(
this.readVersion = readVersion;
}

public DeltaLakeTableHandle withProjectedColumns(Set<ColumnHandle> projectedColumns)
public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> projectedColumns)
{
return new DeltaLakeTableHandle(
schemaName,
Expand Down Expand Up @@ -245,7 +244,7 @@ public Optional<WriteType> getWriteType()

// Projected columns are not needed on workers
@JsonIgnore
public Optional<Set<ColumnHandle>> getProjectedColumns()
public Optional<Set<DeltaLakeColumnHandle>> getProjectedColumns()
{
return projectedColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,11 @@ public Object[][] testApplyProjectionProvider()

@Test(dataProvider = "testApplyProjectionProvider")
public void testApplyProjection(
Set<ColumnHandle> inputProjectedColumns,
Set<DeltaLakeColumnHandle> inputProjectedColumns,
Map<String, ColumnHandle> inputAssignments,
List<ConnectorExpression> inputProjections,
List<ConnectorExpression> expectedProjections,
Set<ColumnHandle> expectedProjectedColumns,
Set<DeltaLakeColumnHandle> expectedProjectedColumns,
Map<String, ColumnHandle> expectedAssignments)
{
DeltaLakeMetadata deltaLakeMetadata = deltaLakeMetadataFactory.create(SESSION.getIdentity());
Expand All @@ -455,8 +455,7 @@ public void testApplyProjection(
inputAssignments)
.get();

assertThat(((DeltaLakeTableHandle) projection.getHandle())
.getProjectedColumns())
assertThat(((DeltaLakeTableHandle) projection.getHandle()).getProjectedColumns())
.isEqualTo(Optional.of(expectedProjectedColumns));

assertThat(projection.getProjections())
Expand Down Expand Up @@ -519,7 +518,7 @@ public void testGetInputInfoForUnPartitionedTable()
assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(false)));
}

private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set<ColumnHandle> projectedColumns, Set<DeltaLakeColumnHandle> constrainedColumns)
private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set<DeltaLakeColumnHandle> projectedColumns, Set<DeltaLakeColumnHandle> constrainedColumns)
{
return new DeltaLakeTableHandle(
"test_schema_name",
Expand Down

0 comments on commit e7f11d3

Please sign in to comment.