Skip to content

Commit

Permalink
Make ParquetReader firstRowsOfBlocks required
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 authored and findepi committed Apr 7, 2022
1 parent 53a7672 commit 58fbee9
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class ParquetReader

private final Optional<String> fileCreatedBy;
private final List<BlockMetaData> blocks;
private final Optional<List<Long>> firstRowsOfBlocks;
private final List<Long> firstRowsOfBlocks;
private final List<PrimitiveColumnIO> columns;
private final ParquetDataSource dataSource;
private final DateTimeZone timeZone;
Expand All @@ -100,7 +100,7 @@ public class ParquetReader
/**
* Index in the Parquet file of the first row of the current group
*/
private Optional<Long> firstRowIndexInGroup = Optional.empty();
private long firstRowIndexInGroup;
/**
* Index in the current group of the next row
*/
Expand All @@ -124,7 +124,7 @@ public ParquetReader(
Optional<String> fileCreatedBy,
MessageColumnIO messageColumnIO,
List<BlockMetaData> blocks,
Optional<List<Long>> firstRowsOfBlocks,
List<Long> firstRowsOfBlocks,
ParquetDataSource dataSource,
DateTimeZone timeZone,
AggregatedMemoryContext memoryContext,
Expand All @@ -138,7 +138,7 @@ public ParquetReader(
Optional<String> fileCreatedBy,
MessageColumnIO messageColumnIO,
List<BlockMetaData> blocks,
Optional<List<Long>> firstRowsOfBlocks,
List<Long> firstRowsOfBlocks,
ParquetDataSource dataSource,
DateTimeZone timeZone,
AggregatedMemoryContext memoryContext,
Expand All @@ -159,9 +159,7 @@ public ParquetReader(
this.columnReaders = new PrimitiveColumnReader[columns.size()];
this.maxBytesPerCell = new long[columns.size()];

firstRowsOfBlocks.ifPresent(firstRows -> {
checkArgument(blocks.size() == firstRows.size(), "elements of firstRowsOfBlocks must correspond to blocks");
});
checkArgument(blocks.size() == firstRowsOfBlocks.size(), "elements of firstRowsOfBlocks must correspond to blocks");

this.columnIndexStore = columnIndexStore;
this.blockRowRanges = listWithNulls(this.blocks.size());
Expand Down Expand Up @@ -217,8 +215,7 @@ public void close()
*/
public long lastBatchStartRow()
{
long baseIndex = firstRowIndexInGroup.orElseThrow(() -> new IllegalStateException("row index unavailable"));
return baseIndex + nextRowInGroup - batchSize;
return firstRowIndexInGroup + nextRowInGroup - batchSize;
}

public int nextBatch()
Expand Down Expand Up @@ -247,7 +244,7 @@ private boolean advanceToNextRowGroup()
return false;
}
currentBlockMetadata = blocks.get(currentRowGroup);
firstRowIndexInGroup = firstRowsOfBlocks.map(firstRows -> firstRows.get(currentRowGroup));
firstRowIndexInGroup = firstRowsOfBlocks.get(currentRowGroup);
currentGroupRowCount = currentBlockMetadata.getRowCount();
if (filter.isPresent() && options.isUseColumnIndex()) {
if (columnIndexStore.get(currentRowGroup).isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ && predicateMatches(parquetPredicate, block, dataSource, descriptorsByPath, parq
Optional.ofNullable(fileMetaData.getCreatedBy()),
messageColumn,
blocks.build(),
Optional.of(blockStarts.build()),
blockStarts.build(),
dataSource,
timeZone,
newSimpleAggregatedMemoryContext(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ private static ReaderPageSource createParquetPageSource(
Optional.ofNullable(fileMetaData.getCreatedBy()),
messageColumnIO,
blocks,
Optional.of(blockStarts.build()),
blockStarts.build(),
dataSource,
UTC,
memoryContext,
Expand Down

0 comments on commit 58fbee9

Please sign in to comment.