Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix type of DeltaLakeTableHandle.projectedColumns #17365

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
{
DeltaLakeTableHandle table = checkValidTableHandle(tableHandle);
return table.getProjectedColumns()
.map(projectedColumns -> (List<DeltaLakeColumnHandle>) projectedColumns.stream()
.map(DeltaLakeColumnHandle.class::cast) // TODO DeltaLakeTableHandle.projectedColumns should be a collection of DeltaLakeColumnHandle
.collect(toImmutableList()))
.map(projectColumns -> (Collection<DeltaLakeColumnHandle>) projectColumns)
.orElseGet(() -> getColumns(table.getMetadataEntry())).stream()
// This method does not calculate column name for the projected columns
.peek(handle -> checkArgument(handle.isBaseColumn(), "Unsupported projected column: %s", handle))
Expand Down Expand Up @@ -2283,7 +2281,9 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti
// all references are simple variables
if (!isProjectionPushdownEnabled(session)
|| columnProjections.values().stream().allMatch(ProjectedColumnRepresentation::isVariable)) {
Set<ColumnHandle> projectedColumns = ImmutableSet.copyOf(assignments.values());
Set<DeltaLakeColumnHandle> projectedColumns = assignments.values().stream()
.map(DeltaLakeColumnHandle.class::cast)
.collect(toImmutableSet());
// Check if column was projected already in previous call
if (deltaLakeTableHandle.getProjectedColumns().isPresent()
&& deltaLakeTableHandle.getProjectedColumns().get().equals(projectedColumns)) {
Expand All @@ -2306,7 +2306,7 @@ public Optional<ProjectionApplicationResult<ConnectorTableHandle>> applyProjecti

Map<String, Assignment> newAssignments = new HashMap<>();
ImmutableMap.Builder<ConnectorExpression, Variable> newVariablesBuilder = ImmutableMap.builder();
ImmutableSet.Builder<ColumnHandle> projectedColumnsBuilder = ImmutableSet.builder();
ImmutableSet.Builder<DeltaLakeColumnHandle> projectedColumnsBuilder = ImmutableSet.builder();

for (Map.Entry<ConnectorExpression, ProjectedColumnRepresentation> entry : columnProjections.entrySet()) {
ConnectorExpression expression = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private static boolean mayAnyDataColumnProjected(DeltaLakeTableHandle tableHandl
return true;
}
return tableHandle.getProjectedColumns().get().stream()
.map(columnHandle -> ((DeltaLakeColumnHandle) columnHandle).getColumnType())
.map(DeltaLakeColumnHandle::getColumnType)
.anyMatch(DeltaLakeColumnType.REGULAR::equals);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;

Expand Down Expand Up @@ -51,7 +50,7 @@ public enum WriteType
private final Optional<WriteType> writeType;
private final long readVersion;

private final Optional<Set<ColumnHandle>> projectedColumns;
private final Optional<Set<DeltaLakeColumnHandle>> projectedColumns;
// UPDATE only: The list of columns being updated
private final Optional<List<DeltaLakeColumnHandle>> updatedColumns;
// UPDATE only: The list of columns which need to be copied when applying updates to the new Parquet file
Expand All @@ -74,7 +73,7 @@ public DeltaLakeTableHandle(
@JsonProperty("enforcedPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
@JsonProperty("nonPartitionConstraint") TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
@JsonProperty("writeType") Optional<WriteType> writeType,
@JsonProperty("projectedColumns") Optional<Set<ColumnHandle>> projectedColumns,
@JsonProperty("projectedColumns") Optional<Set<DeltaLakeColumnHandle>> projectedColumns,
@JsonProperty("updatedColumns") Optional<List<DeltaLakeColumnHandle>> updatedColumns,
@JsonProperty("updateRowIdColumns") Optional<List<DeltaLakeColumnHandle>> updateRowIdColumns,
@JsonProperty("analyzeHandle") Optional<AnalyzeHandle> analyzeHandle,
Expand Down Expand Up @@ -107,7 +106,7 @@ public DeltaLakeTableHandle(
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<WriteType> writeType,
Optional<Set<ColumnHandle>> projectedColumns,
Optional<Set<DeltaLakeColumnHandle>> projectedColumns,
Optional<List<DeltaLakeColumnHandle>> updatedColumns,
Optional<List<DeltaLakeColumnHandle>> updateRowIdColumns,
Optional<AnalyzeHandle> analyzeHandle,
Expand All @@ -134,7 +133,7 @@ public DeltaLakeTableHandle(
this.readVersion = readVersion;
}

public DeltaLakeTableHandle withProjectedColumns(Set<ColumnHandle> projectedColumns)
public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> projectedColumns)
{
return new DeltaLakeTableHandle(
schemaName,
Expand Down Expand Up @@ -245,7 +244,7 @@ public Optional<WriteType> getWriteType()

// Projected columns are not needed on workers
@JsonIgnore
public Optional<Set<ColumnHandle>> getProjectedColumns()
public Optional<Set<DeltaLakeColumnHandle>> getProjectedColumns()
{
return projectedColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,11 @@ public Object[][] testApplyProjectionProvider()

@Test(dataProvider = "testApplyProjectionProvider")
public void testApplyProjection(
Set<ColumnHandle> inputProjectedColumns,
Set<DeltaLakeColumnHandle> inputProjectedColumns,
Map<String, ColumnHandle> inputAssignments,
List<ConnectorExpression> inputProjections,
List<ConnectorExpression> expectedProjections,
Set<ColumnHandle> expectedProjectedColumns,
Set<DeltaLakeColumnHandle> expectedProjectedColumns,
Map<String, ColumnHandle> expectedAssignments)
{
DeltaLakeMetadata deltaLakeMetadata = deltaLakeMetadataFactory.create(SESSION.getIdentity());
Expand All @@ -455,8 +455,7 @@ public void testApplyProjection(
inputAssignments)
.get();

assertThat(((DeltaLakeTableHandle) projection.getHandle())
.getProjectedColumns())
assertThat(((DeltaLakeTableHandle) projection.getHandle()).getProjectedColumns())
.isEqualTo(Optional.of(expectedProjectedColumns));

assertThat(projection.getProjections())
Expand Down Expand Up @@ -519,7 +518,7 @@ public void testGetInputInfoForUnPartitionedTable()
assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(false)));
}

private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set<ColumnHandle> projectedColumns, Set<DeltaLakeColumnHandle> constrainedColumns)
private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set<DeltaLakeColumnHandle> projectedColumns, Set<DeltaLakeColumnHandle> constrainedColumns)
{
return new DeltaLakeTableHandle(
"test_schema_name",
Expand Down