Skip to content

Commit

Permalink
Emphasize the fact that base columns get projected
Browse files Browse the repository at this point in the history
  • Loading branch information
findinpath authored and raunaqmorarka committed Jul 8, 2023
1 parent 2edcf87 commit 570c6f8
Showing 1 changed file with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -568,21 +568,21 @@ private static ReaderPageSourceWithRowPositions createOrcPageSource(
Map<IcebergColumnHandle, Domain> effectivePredicateDomains = effectivePredicate.getDomains()
.orElseThrow(() -> new IllegalArgumentException("Effective predicate is none"));

Optional<ReaderColumns> columnProjections = projectColumns(columns);
Optional<ReaderColumns> baseColumnProjections = projectBaseColumns(columns);
Map<Integer, List<List<Integer>>> projectionsByFieldId = columns.stream()
.collect(groupingBy(
column -> column.getBaseColumnIdentity().getId(),
mapping(IcebergColumnHandle::getPath, toUnmodifiableList())));

List<IcebergColumnHandle> readColumns = columnProjections
List<IcebergColumnHandle> readBaseColumns = baseColumnProjections
.map(readerColumns -> (List<IcebergColumnHandle>) readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList()))
.orElse(columns);
List<OrcColumn> fileReadColumns = new ArrayList<>(readColumns.size());
List<Type> fileReadTypes = new ArrayList<>(readColumns.size());
List<ProjectedLayout> projectedLayouts = new ArrayList<>(readColumns.size());
List<ColumnAdaptation> columnAdaptations = new ArrayList<>(readColumns.size());
List<OrcColumn> fileReadColumns = new ArrayList<>(readBaseColumns.size());
List<Type> fileReadTypes = new ArrayList<>(readBaseColumns.size());
List<ProjectedLayout> projectedLayouts = new ArrayList<>(readBaseColumns.size());
List<ColumnAdaptation> columnAdaptations = new ArrayList<>(readBaseColumns.size());

for (IcebergColumnHandle column : readColumns) {
for (IcebergColumnHandle column : readBaseColumns) {
verify(column.isBaseColumn(), "Column projections must be based from a root column");
OrcColumn orcColumn = fileColumnsByIcebergId.get(column.getId());

Expand Down Expand Up @@ -659,7 +659,7 @@ else if (orcColumn != null) {
memoryUsage,
INITIAL_BATCH_SIZE,
exception -> handleException(orcDataSourceId, exception),
new IdBasedFieldMapperFactory(readColumns));
new IdBasedFieldMapperFactory(readBaseColumns));

return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
Expand All @@ -672,7 +672,7 @@ else if (orcColumn != null) {
memoryUsage,
stats,
reader.getCompressionKind()),
columnProjections),
baseColumnProjections),
recordReader.getStartRowPosition(),
recordReader.getEndRowPosition());
}
Expand Down Expand Up @@ -906,12 +906,12 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource(
.filter(field -> field.getId() != null)
.collect(toImmutableMap(field -> field.getId().intValue(), Function.identity()));

Optional<ReaderColumns> columnProjections = projectColumns(regularColumns);
List<IcebergColumnHandle> readColumns = columnProjections
Optional<ReaderColumns> baseColumnProjections = projectBaseColumns(regularColumns);
List<IcebergColumnHandle> readBaseColumns = baseColumnProjections
.map(readerColumns -> (List<IcebergColumnHandle>) readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList()))
.orElse(regularColumns);

List<org.apache.parquet.schema.Type> parquetFields = readColumns.stream()
List<org.apache.parquet.schema.Type> parquetFields = readBaseColumns.stream()
.map(column -> parquetIdToField.get(column.getId()))
.collect(toList());

Expand Down Expand Up @@ -947,8 +947,8 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource(
int parquetSourceChannel = 0;

ImmutableList.Builder<Field> parquetColumnFieldsBuilder = ImmutableList.builder();
for (int columnIndex = 0; columnIndex < readColumns.size(); columnIndex++) {
IcebergColumnHandle column = readColumns.get(columnIndex);
for (int columnIndex = 0; columnIndex < readBaseColumns.size(); columnIndex++) {
IcebergColumnHandle column = readBaseColumns.get(columnIndex);
if (column.isIsDeletedColumn()) {
pageSourceBuilder.addConstantColumn(nativeValueToBlock(BOOLEAN, false));
}
Expand Down Expand Up @@ -1014,7 +1014,7 @@ else if (column.getId() == TRINO_MERGE_PARTITION_DATA) {
return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
pageSourceBuilder.build(parquetReader),
columnProjections),
baseColumnProjections),
startRowPosition,
endRowPosition);
}
Expand Down Expand Up @@ -1054,16 +1054,16 @@ private static ReaderPageSourceWithRowPositions createAvroPageSource(
ConstantPopulatingPageSource.Builder constantPopulatingPageSourceBuilder = ConstantPopulatingPageSource.builder();
int avroSourceChannel = 0;

Optional<ReaderColumns> columnProjections = projectColumns(columns);
Optional<ReaderColumns> baseColumnProjections = projectBaseColumns(columns);

List<IcebergColumnHandle> readColumns = columnProjections
List<IcebergColumnHandle> readBaseColumns = baseColumnProjections
.map(readerColumns -> (List<IcebergColumnHandle>) readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList()))
.orElse(columns);

InputFile file = new ForwardingInputFile(inputFile);
OptionalLong fileModifiedTime = OptionalLong.empty();
try {
if (readColumns.stream().anyMatch(IcebergColumnHandle::isFileModifiedTimeColumn)) {
if (readBaseColumns.stream().anyMatch(IcebergColumnHandle::isFileModifiedTimeColumn)) {
fileModifiedTime = OptionalLong.of(inputFile.lastModified().toEpochMilli());
}
}
Expand All @@ -1087,7 +1087,7 @@ private static ReaderPageSourceWithRowPositions createAvroPageSource(
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
ImmutableList.Builder<Boolean> rowIndexChannels = ImmutableList.builder();

for (IcebergColumnHandle column : readColumns) {
for (IcebergColumnHandle column : readBaseColumns) {
verify(column.isBaseColumn(), "Column projections must be based from a root column");
org.apache.avro.Schema.Field field = fileColumnsByIcebergId.get(column.getId());

Expand Down Expand Up @@ -1138,7 +1138,7 @@ else if (field == null) {
columnTypes.build(),
rowIndexChannels.build(),
newSimpleAggregatedMemoryContext())),
columnProjections),
baseColumnProjections),
Optional.empty(),
Optional.empty());
}
Expand Down Expand Up @@ -1246,7 +1246,7 @@ public ProjectedLayout getFieldLayout(OrcColumn orcColumn)
/**
* Creates a mapping between the input {@code columns} and base columns if required.
*/
public static Optional<ReaderColumns> projectColumns(List<IcebergColumnHandle> columns)
public static Optional<ReaderColumns> projectBaseColumns(List<IcebergColumnHandle> columns)
{
requireNonNull(columns, "columns is null");

Expand Down

0 comments on commit 570c6f8

Please sign in to comment.