From fc01be53ac8872f8ded85314208684bc35844dbd Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Mon, 4 Apr 2022 13:12:10 -0400 Subject: [PATCH] Add ORC row position adaptation --- .../trino/plugin/hive/orc/OrcPageSource.java | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java index 1d64dd472d41..dd6e63e664c4 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSource.java @@ -177,14 +177,14 @@ public Page getNextPage() MaskDeletedRowsFunction maskDeletedRowsFunction = deletedRows .map(deletedRows -> deletedRows.getMaskDeletedRowsFunction(page, startRowId)) .orElseGet(() -> MaskDeletedRowsFunction.noMaskForPage(page)); - return getColumnAdaptationsPage(page, maskDeletedRowsFunction, recordReader.getFilePosition()); + return getColumnAdaptationsPage(page, maskDeletedRowsFunction, recordReader.getFilePosition(), startRowId); } - private Page getColumnAdaptationsPage(Page page, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition) + private Page getColumnAdaptationsPage(Page page, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId) { Block[] blocks = new Block[columnAdaptations.size()]; for (int i = 0; i < columnAdaptations.size(); i++) { - blocks[i] = columnAdaptations.get(i).block(page, maskDeletedRowsFunction, filePosition); + blocks[i] = columnAdaptations.get(i).block(page, maskDeletedRowsFunction, filePosition, startRowId); } return new Page(maskDeletedRowsFunction.getPositionCount(), blocks); } @@ -247,7 +247,7 @@ public long getMemoryUsage() public interface ColumnAdaptation { - Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition); + Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId); static ColumnAdaptation nullColumn(Type type) { @@ -283,6 +283,11 @@ static ColumnAdaptation constantColumn(Block singleValueBlock) { return new ConstantAdaptation(singleValueBlock); } + + static ColumnAdaptation positionColumn() + { + return new PositionAdaptation(); + } } private static class NullColumn @@ -300,7 +305,7 @@ public NullColumn(Type type) } @Override - public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition) + public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId) { return new RunLengthEncodedBlock(nullBlock, maskDeletedRowsFunction.getPositionCount()); } @@ -326,7 +331,7 @@ public SourceColumn(int index) } @Override - public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition) + public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId) { Block block = sourcePage.getBlock(index); return new LazyBlock(maskDeletedRowsFunction.getPositionCount(), new MaskingBlockLoader(maskDeletedRowsFunction, block)); @@ -371,7 +376,7 @@ private static class RowIdAdaptation implements ColumnAdaptation { @Override - public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition) + public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId) { Block rowBlock = maskDeletedRowsFunction.apply(fromFieldBlocks( sourcePage.getPositionCount(), @@ -403,7 +408,7 @@ public UpdatedRowAdaptation(HiveUpdateProcessor updateProcessor, List originalFilesBlockBuilder = ImmutableList.builder(); originalFilesBlockBuilder.add( new RunLengthEncodedBlock(ORIGINAL_FILE_TRANSACTION_ID_BLOCK, positionCount), new RunLengthEncodedBlock(bucketBlock, positionCount), - createOriginalFilesRowIdBlock(startingRowId, filePosition, positionCount)); + createRowNumberBlock(startingRowId, filePosition, positionCount)); for (int channel = 0; channel < sourcePage.getChannelCount(); channel++) { originalFilesBlockBuilder.add(sourcePage.getBlock(channel)); } @@ -462,7 +467,7 @@ public OriginalFileRowIdAdaptation(long startingRowId, int bucketId) } @Override - public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition) + public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId) { int positionCount = sourcePage.getPositionCount(); Block rowBlock = maskDeletedRowsFunction.apply(fromFieldBlocks( @@ -471,7 +476,7 @@ public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunct new Block[] { new RunLengthEncodedBlock(ORIGINAL_FILE_TRANSACTION_ID_BLOCK, positionCount), new RunLengthEncodedBlock(bucketBlock, positionCount), - createOriginalFilesRowIdBlock(startingRowId, filePosition, positionCount) + createRowNumberBlock(startingRowId, filePosition, positionCount) })); return rowBlock; } @@ -490,13 +495,24 @@ public ConstantAdaptation(Block singleValueBlock) } @Override - public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition) + public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId) { return new RunLengthEncodedBlock(singleValueBlock, sourcePage.getPositionCount()); } } - private static Block createOriginalFilesRowIdBlock(long startingRowId, long filePosition, int positionCount) + private static class PositionAdaptation + implements ColumnAdaptation + { + @Override + public Block block(Page sourcePage, MaskDeletedRowsFunction maskDeletedRowsFunction, long filePosition, OptionalLong startRowId) + { + checkArgument(startRowId.isEmpty(), "startRowId should not be specified when using PositionAdaptation"); + return createRowNumberBlock(0, filePosition, sourcePage.getPositionCount()); + } + } + + private static Block createRowNumberBlock(long startingRowId, long filePosition, int positionCount) { long[] translatedRowIds = new long[positionCount]; for (int index = 0; index < positionCount; index++) {