diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReaderColumn.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReaderColumn.java deleted file mode 100644 index a2060e273711..000000000000 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReaderColumn.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.parquet.reader; - -import io.trino.parquet.Field; -import io.trino.spi.type.Type; - -import java.util.List; -import java.util.Optional; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static java.util.Objects.requireNonNull; - -/** - * @param type Column type - * @param field Field description. Empty optional will result in column populated with {@code NULL} - * @param isRowIndexColumn Whether column should be populated with the indices of its rows - */ -public record ParquetReaderColumn(Type type, Optional field, boolean isRowIndexColumn) -{ - public static List getParquetReaderFields(List parquetReaderColumns) - { - return parquetReaderColumns.stream() - .filter(column -> !column.isRowIndexColumn()) - .map(ParquetReaderColumn::field) - .filter(Optional::isPresent) - .map(Optional::get) - .collect(toImmutableList()); - } - - public ParquetReaderColumn(Type type, Optional field, boolean isRowIndexColumn) - { - this.type = requireNonNull(type, "type is null"); - this.field = requireNonNull(field, "field is null"); - checkArgument( - !isRowIndexColumn || field.isEmpty(), - "Field info for row index column must be empty Optional"); - this.isRowIndexColumn = isRowIndexColumn; - } -} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java index e5a15bb4269e..283eab238afa 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSource.java @@ -17,7 +17,6 @@ import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.reader.ParquetReader; -import io.trino.parquet.reader.ParquetReaderColumn; import io.trino.spi.Page; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; @@ -25,6 +24,7 @@ import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.metrics.Metrics; +import io.trino.spi.type.Type; import java.io.IOException; import java.io.UncheckedIOException; @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.OptionalLong; +import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.base.util.Closables.closeAllSuppress; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CURSOR_ERROR; @@ -42,20 +43,19 @@ public class ParquetPageSource implements ConnectorPageSource { private final ParquetReader parquetReader; - private final List parquetReaderColumns; - private final boolean areSyntheticColumnsPresent; + private final List columnAdaptations; + private final boolean isColumnAdaptationRequired; private boolean closed; private long completedPositions; - public ParquetPageSource( + private ParquetPageSource( ParquetReader parquetReader, - List parquetReaderColumns) + List columnAdaptations) { this.parquetReader = requireNonNull(parquetReader, "parquetReader is null"); - this.parquetReaderColumns = ImmutableList.copyOf(requireNonNull(parquetReaderColumns, "parquetReaderColumns is null")); - this.areSyntheticColumnsPresent = parquetReaderColumns.stream() - .anyMatch(column -> column.isRowIndexColumn() || column.field().isEmpty()); + this.columnAdaptations = ImmutableList.copyOf(requireNonNull(columnAdaptations, "columnAdaptations is null")); + this.isColumnAdaptationRequired = isColumnAdaptationRequired(columnAdaptations); } @Override @@ -131,29 +131,60 @@ public Metrics getMetrics() return parquetReader.getMetrics(); } + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private final ImmutableList.Builder columns = ImmutableList.builder(); + + private Builder() {} + + public Builder addConstantColumn(Block value) + { + columns.add(new ConstantColumn(value)); + return this; + } + + public Builder addSourceColumn(int sourceChannel) + { + columns.add(new SourceColumn(sourceChannel)); + return this; + } + + public Builder addNullColumn(Type type) + { + columns.add(new NullColumn(type)); + return this; + } + + public Builder addRowIndexColumn() + { + columns.add(new RowIndexColumn()); + return this; + } + + public ConnectorPageSource build(ParquetReader parquetReader) + { + return new ParquetPageSource(parquetReader, this.columns.build()); + } + } + private Page getColumnAdaptationsPage(Page page) { - if (!areSyntheticColumnsPresent) { + if (!isColumnAdaptationRequired) { return page; } if (page == null) { return null; } int batchSize = page.getPositionCount(); - Block[] blocks = new Block[parquetReaderColumns.size()]; - int sourceColumn = 0; - for (int columnIndex = 0; columnIndex < parquetReaderColumns.size(); columnIndex++) { - ParquetReaderColumn column = parquetReaderColumns.get(columnIndex); - if (column.isRowIndexColumn()) { - blocks[columnIndex] = getRowIndexColumn(parquetReader.lastBatchStartRow(), batchSize); - } - else if (column.field().isEmpty()) { - blocks[columnIndex] = RunLengthEncodedBlock.create(column.type(), null, batchSize); - } - else { - blocks[columnIndex] = page.getBlock(sourceColumn); - sourceColumn++; - } + Block[] blocks = new Block[columnAdaptations.size()]; + long startRowId = parquetReader.lastBatchStartRow(); + for (int columnChannel = 0; columnChannel < columnAdaptations.size(); columnChannel++) { + blocks[columnChannel] = columnAdaptations.get(columnChannel).getBlock(page, startRowId); } return new Page(batchSize, blocks); } @@ -169,7 +200,100 @@ static TrinoException handleException(ParquetDataSourceId dataSourceId, Exceptio return new TrinoException(HIVE_CURSOR_ERROR, format("Failed to read Parquet file: %s", dataSourceId), exception); } - private static Block getRowIndexColumn(long baseIndex, int size) + private static boolean isColumnAdaptationRequired(List columnAdaptations) + { + // If no synthetic columns are added and the source columns are in order, no adaptations are required + for (int columnChannel = 0; columnChannel < columnAdaptations.size(); columnChannel++) { + ColumnAdaptation column = columnAdaptations.get(columnChannel); + if (column instanceof SourceColumn) { + int delegateChannel = ((SourceColumn) column).getSourceChannel(); + if (columnChannel != delegateChannel) { + return true; + } + } + else { + return true; + } + } + return false; + } + + private interface ColumnAdaptation + { + Block getBlock(Page sourcePage, long startRowId); + } + + private static class NullColumn + implements ColumnAdaptation + { + private final Block nullBlock; + + private NullColumn(Type type) + { + this.nullBlock = type.createBlockBuilder(null, 1, 0) + .appendNull() + .build(); + } + + @Override + public Block getBlock(Page sourcePage, long startRowId) + { + return RunLengthEncodedBlock.create(nullBlock, sourcePage.getPositionCount()); + } + } + + private static class SourceColumn + implements ColumnAdaptation + { + private final int sourceChannel; + + private SourceColumn(int sourceChannel) + { + checkArgument(sourceChannel >= 0, "sourceChannel is negative"); + this.sourceChannel = sourceChannel; + } + + @Override + public Block getBlock(Page sourcePage, long startRowId) + { + return sourcePage.getBlock(sourceChannel); + } + + public int getSourceChannel() + { + return sourceChannel; + } + } + + private static class ConstantColumn + implements ColumnAdaptation + { + private final Block singleValueBlock; + + private ConstantColumn(Block singleValueBlock) + { + checkArgument(singleValueBlock.getPositionCount() == 1, "ConstantColumnAdaptation singleValueBlock may only contain one position"); + this.singleValueBlock = singleValueBlock; + } + + @Override + public Block getBlock(Page sourcePage, long startRowId) + { + return RunLengthEncodedBlock.create(singleValueBlock, sourcePage.getPositionCount()); + } + } + + private static class RowIndexColumn + implements ColumnAdaptation + { + @Override + public Block getBlock(Page sourcePage, long startRowId) + { + return createRowNumberBlock(startRowId, sourcePage.getPositionCount()); + } + } + + private static Block createRowNumberBlock(long baseIndex, int size) { long[] rowIndices = new long[size]; for (int position = 0; position < size; position++) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index 0a2a5c55c42c..c9b2a23cfa96 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -20,6 +20,7 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.parquet.BloomFilterStore; +import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; @@ -28,7 +29,6 @@ import io.trino.parquet.predicate.TupleDomainParquetPredicate; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; -import io.trino.parquet.reader.ParquetReaderColumn; import io.trino.parquet.reader.TrinoColumnIndexStore; import io.trino.plugin.hive.AcidInfo; import io.trino.plugin.hive.FileFormatDataSourceStats; @@ -71,7 +71,6 @@ import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.parquet.ParquetTypeUtils.constructField; @@ -81,7 +80,6 @@ import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; -import static io.trino.parquet.reader.ParquetReaderColumn.getParquetReaderFields; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT; @@ -277,24 +275,23 @@ && predicateMatches( .map(HiveColumnHandle.class::cast) .collect(toUnmodifiableList())) .orElse(columns); - List parquetReaderColumns = createParquetReaderColumns(baseColumns, fileSchema, messageColumn, useColumnNames); ParquetDataSourceId dataSourceId = dataSource.getId(); - ConnectorPageSource parquetPageSource = new ParquetPageSource( - new ParquetReader( - Optional.ofNullable(fileMetaData.getCreatedBy()), - getParquetReaderFields(parquetReaderColumns), - blocks.build(), - blockStarts.build(), - dataSource, - timeZone, - newSimpleAggregatedMemoryContext(), - options, - exception -> handleException(dataSourceId, exception), - Optional.of(parquetPredicate), - columnIndexes.build(), - parquetWriteValidation), - parquetReaderColumns); + ParquetDataSource finalDataSource = dataSource; + ParquetReaderProvider parquetReaderProvider = fields -> new ParquetReader( + Optional.ofNullable(fileMetaData.getCreatedBy()), + fields, + blocks.build(), + blockStarts.build(), + finalDataSource, + timeZone, + newSimpleAggregatedMemoryContext(), + options, + exception -> handleException(dataSourceId, exception), + Optional.of(parquetPredicate), + columnIndexes.build(), + parquetWriteValidation); + ConnectorPageSource parquetPageSource = createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider); return new ReaderPageSource(parquetPageSource, readerProjections); } catch (Exception e) { @@ -483,24 +480,45 @@ public static org.apache.parquet.schema.Type getParquetType(HiveColumnHandle col return null; } - public static List createParquetReaderColumns(List baseColumns, MessageType fileSchema, MessageColumnIO messageColumn, boolean useColumnNames) + public interface ParquetReaderProvider { + ParquetReader createParquetReader(List fields) + throws IOException; + } + + public static ConnectorPageSource createParquetPageSource( + List baseColumns, + MessageType fileSchema, + MessageColumnIO messageColumn, + boolean useColumnNames, + ParquetReaderProvider parquetReaderProvider) + throws IOException + { + ParquetPageSource.Builder pageSourceBuilder = ParquetPageSource.builder(); + ImmutableList.Builder parquetColumnFieldsBuilder = ImmutableList.builder(); + int sourceChannel = 0; for (HiveColumnHandle column : baseColumns) { - checkArgument(column == PARQUET_ROW_INDEX_COLUMN || column.getColumnType() == REGULAR, "column type must be REGULAR: %s", column); + if (column == PARQUET_ROW_INDEX_COLUMN) { + pageSourceBuilder.addRowIndexColumn(); + continue; + } + checkArgument(column.getColumnType() == REGULAR, "column type must be REGULAR: %s", column); + org.apache.parquet.schema.Type parquetType = getParquetType(column, fileSchema, useColumnNames); + if (parquetType == null) { + pageSourceBuilder.addNullColumn(column.getBaseType()); + continue; + } + String columnName = useColumnNames ? column.getBaseColumnName() : fileSchema.getFields().get(column.getBaseHiveColumnIndex()).getName(); + Optional field = constructField(column.getBaseType(), lookupColumnByName(messageColumn, columnName)); + if (field.isEmpty()) { + pageSourceBuilder.addNullColumn(column.getBaseType()); + continue; + } + parquetColumnFieldsBuilder.add(field.get()); + pageSourceBuilder.addSourceColumn(sourceChannel); + sourceChannel++; } - return baseColumns.stream() - .map(column -> { - boolean isRowIndexColumn = column == PARQUET_ROW_INDEX_COLUMN; - return new ParquetReaderColumn( - column.getBaseType(), - isRowIndexColumn ? Optional.empty() : Optional.ofNullable(getParquetType(column, fileSchema, useColumnNames)) - .flatMap(field -> { - String columnName = useColumnNames ? column.getBaseColumnName() : fileSchema.getFields().get(column.getBaseHiveColumnIndex()).getName(); - return constructField(column.getBaseType(), lookupColumnByName(messageColumn, columnName)); - }), - isRowIndexColumn); - }) - .collect(toImmutableList()); + return pageSourceBuilder.build(parquetReaderProvider.createParquetReader(parquetColumnFieldsBuilder.build())); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java index 8662b504a5df..f41f3ed4b7f8 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestTimestampMicros.java @@ -115,22 +115,23 @@ private ConnectorPageSource createPageSource(ConnectorSession session, File parq ReaderPageSource pageSourceWithProjections = pageSourceFactory.createPageSource( newEmptyConfiguration(), - session, - new Path(parquetFile.toURI()), - 0, - parquetFile.length(), - parquetFile.length(), - schema, - List.of(createBaseColumn(columnName, 0, columnHiveType, columnType, REGULAR, Optional.empty())), - TupleDomain.all(), - Optional.empty(), - OptionalInt.empty(), - false, - AcidTransaction.NO_ACID_TRANSACTION) + session, + new Path(parquetFile.toURI()), + 0, + parquetFile.length(), + parquetFile.length(), + schema, + List.of(createBaseColumn(columnName, 0, columnHiveType, columnType, REGULAR, Optional.empty())), + TupleDomain.all(), + Optional.empty(), + OptionalInt.empty(), + false, + AcidTransaction.NO_ACID_TRANSACTION) .orElseThrow(); - pageSourceWithProjections.getReaderColumns() - .ifPresent(projections -> { throw new IllegalStateException("Unexpected projections: " + projections); }); + pageSourceWithProjections.getReaderColumns().ifPresent(projections -> { + throw new IllegalStateException("Unexpected projections: " + projections); + }); return pageSourceWithProjections.get(); } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 6d8248a643f7..a1dcf5ebddaf 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -24,12 +24,10 @@ import io.trino.parquet.predicate.TupleDomainParquetPredicate; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; -import io.trino.parquet.reader.ParquetReaderColumn; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hive.ReaderColumns; -import io.trino.plugin.hive.parquet.ParquetPageSource; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hive.parquet.TrinoParquetDataSource; import io.trino.spi.TrinoException; @@ -77,9 +75,9 @@ import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; -import static io.trino.parquet.reader.ParquetReaderColumn.getParquetReaderFields; import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns; -import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createParquetReaderColumns; +import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.ParquetReaderProvider; +import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.createParquetPageSource; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.getColumnIndexStore; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.getParquetMessageType; import static io.trino.plugin.hive.parquet.ParquetPageSourceFactory.getParquetTupleDomain; @@ -230,14 +228,14 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq .map(HiveColumnHandle.class::cast) .collect(toUnmodifiableList())) .orElse(columns); - List parquetReaderColumns = createParquetReaderColumns(baseColumns, fileSchema, messageColumn, useColumnNames); ParquetDataSourceId dataSourceId = dataSource.getId(); - ParquetReader parquetReader = new ParquetReader( + ParquetDataSource finalDataSource = dataSource; + ParquetReaderProvider parquetReaderProvider = fields -> new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), - getParquetReaderFields(parquetReaderColumns), + fields, blocks.build(), blockStarts.build(), - dataSource, + finalDataSource, timeZone, newSimpleAggregatedMemoryContext(), options.withBatchColumnReaders(isParquetOptimizedReaderEnabled(session)) @@ -246,10 +244,7 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq Optional.of(parquetPredicate), columnIndexes.build(), Optional.empty()); - - return new ParquetPageSource( - parquetReader, - parquetReaderColumns); + return createParquetPageSource(baseColumns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider); } catch (IOException | RuntimeException e) { try { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 2096aa7dde75..6d46b71f22c8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -34,6 +34,7 @@ import io.trino.orc.TupleDomainOrcPredicate; import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder; import io.trino.orc.metadata.OrcType; +import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; @@ -41,7 +42,6 @@ import io.trino.parquet.predicate.TupleDomainParquetPredicate; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; -import io.trino.parquet.reader.ParquetReaderColumn; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.ReaderColumns; import io.trino.plugin.hive.ReaderPageSource; @@ -135,7 +135,6 @@ import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.predicate.PredicateUtils.buildPredicate; import static io.trino.parquet.predicate.PredicateUtils.predicateMatches; -import static io.trino.parquet.reader.ParquetReaderColumn.getParquetReaderFields; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_FILE_RECORD_COUNT; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_DATA; import static io.trino.plugin.iceberg.IcebergColumnHandle.TRINO_MERGE_PARTITION_SPEC_ID; @@ -942,73 +941,67 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema); - ConstantPopulatingPageSource.Builder constantPopulatingPageSourceBuilder = ConstantPopulatingPageSource.builder(); + ParquetPageSource.Builder pageSourceBuilder = ParquetPageSource.builder(); int parquetSourceChannel = 0; - ImmutableList.Builder parquetReaderColumnBuilder = ImmutableList.builder(); + ImmutableList.Builder parquetColumnFieldsBuilder = ImmutableList.builder(); for (int columnIndex = 0; columnIndex < readColumns.size(); columnIndex++) { IcebergColumnHandle column = readColumns.get(columnIndex); if (column.isIsDeletedColumn()) { - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(BOOLEAN, false)); + pageSourceBuilder.addConstantColumn(nativeValueToBlock(BOOLEAN, false)); } else if (partitionKeys.containsKey(column.getId())) { Type trinoType = column.getType(); - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock( + pageSourceBuilder.addConstantColumn(nativeValueToBlock( trinoType, deserializePartitionValue(trinoType, partitionKeys.get(column.getId()).orElse(null), column.getName()))); } else if (column.isPathColumn()) { - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(inputFile.location()))); + pageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(inputFile.location()))); } else if (column.isFileModifiedTimeColumn()) { - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(inputFile.lastModified().toEpochMilli(), UTC_KEY))); + pageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(inputFile.lastModified().toEpochMilli(), UTC_KEY))); } else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) { // $row_id is a composite of multiple physical columns, it is assembled by the IcebergPageSource - parquetReaderColumnBuilder.add(new ParquetReaderColumn(column.getType(), Optional.empty(), false)); - constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel); - parquetSourceChannel++; + pageSourceBuilder.addNullColumn(column.getType()); } else if (column.isRowPositionColumn()) { - parquetReaderColumnBuilder.add(new ParquetReaderColumn(BIGINT, Optional.empty(), true)); - constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel); - parquetSourceChannel++; + pageSourceBuilder.addRowIndexColumn(); } else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) { - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), fileRecordCount)); + pageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), fileRecordCount)); } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) { - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId)); + pageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), (long) partitionSpecId)); } else if (column.getId() == TRINO_MERGE_PARTITION_DATA) { - constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), utf8Slice(partitionData))); + pageSourceBuilder.addConstantColumn(nativeValueToBlock(column.getType(), utf8Slice(partitionData))); } else { org.apache.parquet.schema.Type parquetField = parquetFields.get(columnIndex); Type trinoType = column.getBaseType(); - if (parquetField == null) { - parquetReaderColumnBuilder.add(new ParquetReaderColumn(trinoType, Optional.empty(), false)); + pageSourceBuilder.addNullColumn(trinoType); + continue; } - else { - // The top level columns are already mapped by name/id appropriately. - ColumnIO columnIO = messageColumnIO.getChild(parquetField.getName()); - parquetReaderColumnBuilder.add(new ParquetReaderColumn( - trinoType, - IcebergParquetColumnIOConverter.constructField(new FieldContext(trinoType, column.getColumnIdentity()), columnIO), - false)); + // The top level columns are already mapped by name/id appropriately. + ColumnIO columnIO = messageColumnIO.getChild(parquetField.getName()); + Optional field = IcebergParquetColumnIOConverter.constructField(new FieldContext(trinoType, column.getColumnIdentity()), columnIO); + if (field.isEmpty()) { + pageSourceBuilder.addNullColumn(trinoType); + continue; } - - constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel); + parquetColumnFieldsBuilder.add(field.get()); + pageSourceBuilder.addSourceColumn(parquetSourceChannel); parquetSourceChannel++; } } - List parquetReaderColumns = parquetReaderColumnBuilder.build(); ParquetDataSourceId dataSourceId = dataSource.getId(); ParquetReader parquetReader = new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), - getParquetReaderFields(parquetReaderColumns), + parquetColumnFieldsBuilder.build(), blocks, blockStarts.build(), dataSource, @@ -1018,7 +1011,7 @@ else if (column.getId() == TRINO_MERGE_PARTITION_DATA) { exception -> handleException(dataSourceId, exception)); return new ReaderPageSourceWithRowPositions( new ReaderPageSource( - constantPopulatingPageSourceBuilder.build(new ParquetPageSource(parquetReader, parquetReaderColumns)), + pageSourceBuilder.build(parquetReader), columnProjections), startRowPosition, endRowPosition);