Skip to content

Commit

Permalink
Cleanup ParquetPageSourceFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Sep 12, 2020
1 parent 95dedea commit 66d0a2e
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.spi.PrestoException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -217,17 +216,7 @@ private <K> Map<K, ChunkReader> readLargeDiskRanges(Map<K, DiskRange> diskRanges
return slices.build();
}

public static HdfsParquetDataSource buildHdfsParquetDataSource(
FSDataInputStream inputStream,
Path path,
long fileSize,
FileFormatDataSourceStats stats,
ParquetReaderOptions options)
{
return new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats, options);
}

public static List<DiskRange> mergeAdjacentDiskRanges(Collection<DiskRange> diskRanges, DataSize maxMergeDistance, DataSize maxReadSize)
private static List<DiskRange> mergeAdjacentDiskRanges(Collection<DiskRange> diskRanges, DataSize maxMergeDistance, DataSize maxReadSize)
{
// sort ranges by start offset
List<DiskRange> ranges = new ArrayList<>(diskRanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.prestosql.parquet.Field;
import io.prestosql.parquet.ParquetCorruptionException;
import io.prestosql.parquet.ParquetDataSource;
import io.prestosql.parquet.ParquetDataSourceId;
import io.prestosql.parquet.ParquetReaderOptions;
import io.prestosql.parquet.RichColumnDescriptor;
import io.prestosql.parquet.predicate.Predicate;
Expand Down Expand Up @@ -81,7 +82,6 @@
import static io.prestosql.plugin.hive.HiveSessionProperties.isUseParquetColumnNames;
import static io.prestosql.plugin.hive.ReaderProjections.projectBaseColumns;
import static io.prestosql.plugin.hive.ReaderProjections.projectSufficientColumns;
import static io.prestosql.plugin.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static io.prestosql.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.prestosql.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static java.lang.String.format;
Expand Down Expand Up @@ -180,7 +180,7 @@ public static ReaderPageSourceWithProjections createPageSource(
ParquetMetadata parquetMetadata = MetadataReader.readFooter(inputStream, path, fileSize);
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
fileSchema = fileMetaData.getSchema();
dataSource = buildHdfsParquetDataSource(inputStream, path, fileSize, stats, options);
dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, stats, options);

Optional<MessageType> message = projectSufficientColumns(columns)
.map(ReaderProjections::getReaderColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.prestosql.parquet.Field;
import io.prestosql.parquet.ParquetCorruptionException;
import io.prestosql.parquet.ParquetDataSource;
import io.prestosql.parquet.ParquetDataSourceId;
import io.prestosql.parquet.ParquetReaderOptions;
import io.prestosql.parquet.RichColumnDescriptor;
import io.prestosql.parquet.predicate.Predicate;
Expand All @@ -40,6 +41,7 @@
import io.prestosql.plugin.hive.orc.OrcPageSource;
import io.prestosql.plugin.hive.orc.OrcPageSource.ColumnAdaptation;
import io.prestosql.plugin.hive.orc.OrcReaderConfig;
import io.prestosql.plugin.hive.parquet.HdfsParquetDataSource;
import io.prestosql.plugin.hive.parquet.ParquetPageSource;
import io.prestosql.plugin.hive.parquet.ParquetReaderConfig;
import io.prestosql.spi.PrestoException;
Expand Down Expand Up @@ -88,7 +90,6 @@
import static io.prestosql.parquet.ParquetTypeUtils.getParquetTypeByName;
import static io.prestosql.parquet.predicate.PredicateUtils.buildPredicate;
import static io.prestosql.parquet.predicate.PredicateUtils.predicateMatches;
import static io.prestosql.plugin.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static io.prestosql.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
Expand Down Expand Up @@ -360,7 +361,7 @@ private static ConnectorPageSource createParquetPageSource(
FileStatus fileStatus = hdfsEnvironment.doAs(user, () -> fileSystem.getFileStatus(path));
long fileSize = fileStatus.getLen();
FSDataInputStream inputStream = hdfsEnvironment.doAs(user, () -> fileSystem.open(path));
dataSource = buildHdfsParquetDataSource(inputStream, path, fileSize, fileFormatDataSourceStats, options);
dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, fileFormatDataSourceStats, options);
ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(user, () -> MetadataReader.readFooter(fileSystem, path, fileSize));
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();
Expand Down

0 comments on commit 66d0a2e

Please sign in to comment.