From 66d0a2e087baa9ebb8de743728b3a6a207c68791 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Fri, 11 Sep 2020 11:02:07 -0700 Subject: [PATCH] Cleanup ParquetPageSourceFactory --- .../plugin/hive/parquet/HdfsParquetDataSource.java | 13 +------------ .../hive/parquet/ParquetPageSourceFactory.java | 4 ++-- .../plugin/iceberg/IcebergPageSourceProvider.java | 5 +++-- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/HdfsParquetDataSource.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/HdfsParquetDataSource.java index bda6919ede75..c809787cdb8a 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/HdfsParquetDataSource.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/HdfsParquetDataSource.java @@ -26,7 +26,6 @@ import io.prestosql.plugin.hive.FileFormatDataSourceStats; import io.prestosql.spi.PrestoException; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.ArrayList; @@ -217,17 +216,7 @@ private Map readLargeDiskRanges(Map diskRanges return slices.build(); } - public static HdfsParquetDataSource buildHdfsParquetDataSource( - FSDataInputStream inputStream, - Path path, - long fileSize, - FileFormatDataSourceStats stats, - ParquetReaderOptions options) - { - return new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats, options); - } - - public static List mergeAdjacentDiskRanges(Collection diskRanges, DataSize maxMergeDistance, DataSize maxReadSize) + private static List mergeAdjacentDiskRanges(Collection diskRanges, DataSize maxMergeDistance, DataSize maxReadSize) { // sort ranges by start offset List ranges = new ArrayList<>(diskRanges); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java index 5fdd6f290066..89326215598e 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -19,6 +19,7 @@ import io.prestosql.parquet.Field; import io.prestosql.parquet.ParquetCorruptionException; import io.prestosql.parquet.ParquetDataSource; +import io.prestosql.parquet.ParquetDataSourceId; import io.prestosql.parquet.ParquetReaderOptions; import io.prestosql.parquet.RichColumnDescriptor; import io.prestosql.parquet.predicate.Predicate; @@ -81,7 +82,6 @@ import static io.prestosql.plugin.hive.HiveSessionProperties.isUseParquetColumnNames; import static io.prestosql.plugin.hive.ReaderProjections.projectBaseColumns; import static io.prestosql.plugin.hive.ReaderProjections.projectSufficientColumns; -import static io.prestosql.plugin.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource; import static io.prestosql.plugin.hive.parquet.ParquetColumnIOConverter.constructField; import static io.prestosql.plugin.hive.util.HiveUtil.getDeserializerClassName; import static java.lang.String.format; @@ -180,7 +180,7 @@ public static ReaderPageSourceWithProjections createPageSource( ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, path, fileSize); FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); fileSchema = fileMetaData.getSchema(); - dataSource = buildHdfsParquetDataSource(inputStream, path, fileSize, stats, options); + dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats, options); Optional message = projectSufficientColumns(columns) .map(ReaderProjections::getReaderColumns) diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java index 07bca114759b..9a4548ec7d16 100644 --- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java @@ -28,6 +28,7 @@ import io.prestosql.parquet.Field; import io.prestosql.parquet.ParquetCorruptionException; import io.prestosql.parquet.ParquetDataSource; +import io.prestosql.parquet.ParquetDataSourceId; import io.prestosql.parquet.ParquetReaderOptions; import io.prestosql.parquet.RichColumnDescriptor; import io.prestosql.parquet.predicate.Predicate; @@ -40,6 +41,7 @@ import io.prestosql.plugin.hive.orc.OrcPageSource; import io.prestosql.plugin.hive.orc.OrcPageSource.ColumnAdaptation; import io.prestosql.plugin.hive.orc.OrcReaderConfig; +import io.prestosql.plugin.hive.parquet.HdfsParquetDataSource; import io.prestosql.plugin.hive.parquet.ParquetPageSource; import io.prestosql.plugin.hive.parquet.ParquetReaderConfig; import io.prestosql.spi.PrestoException; @@ -88,7 +90,6 @@ import static io.prestosql.parquet.ParquetTypeUtils.getParquetTypeByName; import static io.prestosql.parquet.predicate.PredicateUtils.buildPredicate; import static io.prestosql.parquet.predicate.PredicateUtils.predicateMatches; -import static io.prestosql.plugin.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource; import static io.prestosql.plugin.hive.parquet.ParquetColumnIOConverter.constructField; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; @@ -360,7 +361,7 @@ private static ConnectorPageSource createParquetPageSource( FileStatus fileStatus = hdfsEnvironment.doAs(user, () -> fileSystem.getFileStatus(path)); long fileSize = fileStatus.getLen(); FSDataInputStream inputStream = hdfsEnvironment.doAs(user, () -> fileSystem.open(path)); - dataSource = buildHdfsParquetDataSource(inputStream, path, fileSize, fileFormatDataSourceStats, options); + dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, fileFormatDataSourceStats, options); ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> MetadataReader.readFooter(fileSystem, path, fileSize)); FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema();