Skip to content

Commit

Permalink
Add ORC row position adaptation
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 authored and findepi committed Apr 7, 2022
1 parent 3be29f8 commit fc01be5
Showing 1 changed file with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ public Page getNextPage()
MaskDeletedRowsFunction maskDeletedRowsFunction = deletedRows
.map(deletedRows -> deletedRows.getMaskDeletedRowsFunction(page, startRowId))
.orElseGet(() -> MaskDeletedRowsFunction.noMaskForPage(page));
return getColumnAdaptationsPage(page, maskDeletedRowsFunction, recordReader.getFilePosition());
return getColumnAdaptationsPage(page, maskDeletedRowsFunction, recordReader.getFilePosition(), startRowId);
}

private Page getColumnAdaptationsPage(Page page, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition)
private Page getColumnAdaptationsPage(Page page, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
Block[] blocks = new Block[columnAdaptations.size()];
for (int i = 0; i < columnAdaptations.size(); i++) {
blocks[i] = columnAdaptations.get(i).block(page, maskDeletedRowsFunction, filePosition);
blocks[i] = columnAdaptations.get(i).block(page, maskDeletedRowsFunction, filePosition, startRowId);
}
return new Page(maskDeletedRowsFunction.getPositionCount(), blocks);
}
Expand Down Expand Up @@ -247,7 +247,7 @@ public long getMemoryUsage()

public interface ColumnAdaptation
{
Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition);
Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId);

static ColumnAdaptation nullColumn(Type type)
{
Expand Down Expand Up @@ -283,6 +283,11 @@ static ColumnAdaptation constantColumn(Block singleValueBlock)
{
return new ConstantAdaptation(singleValueBlock);
}

static ColumnAdaptation positionColumn()
{
return new PositionAdaptation();
}
}

private static class NullColumn
Expand All @@ -300,7 +305,7 @@ public NullColumn(Type type)
}

@Override
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition)
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
return new RunLengthEncodedBlock(nullBlock, maskDeletedRowsFunction.getPositionCount());
}
Expand All @@ -326,7 +331,7 @@ public SourceColumn(int index)
}

@Override
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition)
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
Block block = sourcePage.getBlock(index);
return new LazyBlock(maskDeletedRowsFunction.getPositionCount(), new MaskingBlockLoader(maskDeletedRowsFunction, block));
Expand Down Expand Up @@ -371,7 +376,7 @@ private static class RowIdAdaptation
implements ColumnAdaptation
{
@Override
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition)
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
Block rowBlock = maskDeletedRowsFunction.apply(fromFieldBlocks(
sourcePage.getPositionCount(),
Expand Down Expand Up @@ -403,7 +408,7 @@ public UpdatedRowAdaptation(HiveUpdateProcessor updateProcessor, List<HiveColumn
}

@Override
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition)
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
return updateProcessor.createUpdateRowBlock(sourcePage, nonUpdatedSourceChannels, maskDeletedRowsFunction);
}
Expand Down Expand Up @@ -433,14 +438,14 @@ public UpdatedRowAdaptationWithOriginalFiles(long startingRowId, int bucketId, H
}

@Override
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition)
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
int positionCount = sourcePage.getPositionCount();
ImmutableList.Builder<Block> originalFilesBlockBuilder = ImmutableList.builder();
originalFilesBlockBuilder.add(
new RunLengthEncodedBlock(ORIGINAL_FILE_TRANSACTION_ID_BLOCK, positionCount),
new RunLengthEncodedBlock(bucketBlock, positionCount),
createOriginalFilesRowIdBlock(startingRowId, filePosition, positionCount));
createRowNumberBlock(startingRowId, filePosition, positionCount));
for (int channel = 0; channel < sourcePage.getChannelCount(); channel++) {
originalFilesBlockBuilder.add(sourcePage.getBlock(channel));
}
Expand All @@ -462,7 +467,7 @@ public OriginalFileRowIdAdaptation(long startingRowId, int bucketId)
}

@Override
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition)
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
int positionCount = sourcePage.getPositionCount();
Block rowBlock = maskDeletedRowsFunction.apply(fromFieldBlocks(
Expand All @@ -471,7 +476,7 @@ public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunct
new Block[] {
new RunLengthEncodedBlock(ORIGINAL_FILE_TRANSACTION_ID_BLOCK, positionCount),
new RunLengthEncodedBlock(bucketBlock, positionCount),
createOriginalFilesRowIdBlock(startingRowId, filePosition, positionCount)
createRowNumberBlock(startingRowId, filePosition, positionCount)
}));
return rowBlock;
}
Expand All @@ -490,13 +495,24 @@ public ConstantAdaptation(Block singleValueBlock)
}

@Override
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition)
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
return new RunLengthEncodedBlock(singleValueBlock, sourcePage.getPositionCount());
}
}

private static Block createOriginalFilesRowIdBlock(long startingRowId, long filePosition, int positionCount)
private static class PositionAdaptation
implements ColumnAdaptation
{
@Override
public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId)
{
checkArgument(startRowId.isEmpty(), "startRowId should not be specified when using PositionAdaptation");
return createRowNumberBlock(0, filePosition, sourcePage.getPositionCount());
}
}

private static Block createRowNumberBlock(long startingRowId, long filePosition, int positionCount)
{
long[] translatedRowIds = new long[positionCount];
for (int index = 0; index < positionCount; index++) {
Expand Down

0 comments on commit fc01be5

Please sign in to comment.