Skip to content

Commit

Permalink
Add RLE support to PagePartitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-stec authored and sopel39 committed May 26, 2022
1 parent 8e6ff07 commit 46a36ad
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,29 @@
import io.trino.spi.type.FixedWidthType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VariableWidthType;
import io.trino.type.BlockTypeOperators;

import static java.util.Objects.requireNonNull;

public class PositionsAppenderFactory
{
private final BlockTypeOperators blockTypeOperators;

public PositionsAppenderFactory(BlockTypeOperators blockTypeOperators)
{
this.blockTypeOperators = requireNonNull(blockTypeOperators, "blockTypeOperators is null");
}

public PositionsAppender create(Type type, int expectedPositions, long maxPageSizeInBytes)
{
return new UnnestingPositionsAppender(createPrimitiveAppender(type, expectedPositions, maxPageSizeInBytes));
if (!type.isComparable()) {
return new UnnestingPositionsAppender(createPrimitiveAppender(type, expectedPositions, maxPageSizeInBytes));
}

return new UnnestingPositionsAppender(
new RleAwarePositionsAppender(
blockTypeOperators.getEqualOperator(type),
createPrimitiveAppender(type, expectedPositions, maxPageSizeInBytes)));
}

private PositionsAppender createPrimitiveAppender(Type type, int expectedPositions, long maxPageSizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.output;

import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.type.BlockTypeOperators.BlockPositionEqual;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.openjdk.jol.info.ClassLayout;

import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

/**
* {@link PositionsAppender} that will produce {@link RunLengthEncodedBlock} output if possible,
* that is all inputs are {@link RunLengthEncodedBlock} blocks with the same value.
*/
public class RleAwarePositionsAppender
implements PositionsAppender
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(RleAwarePositionsAppender.class).instanceSize();
private static final int NO_RLE = -1;

private final BlockPositionEqual equalOperator;
private final PositionsAppender delegate;

@Nullable
private Block rleValue;

// NO_RLE means flat state, 0 means initial empty state, positive means RLE state and the current RLE position count.
private int rlePositionCount;

public RleAwarePositionsAppender(BlockPositionEqual equalOperator, PositionsAppender delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.equalOperator = requireNonNull(equalOperator, "equalOperator is null");
}

@Override
public void append(IntArrayList positions, Block source)
{
// RleAwarePositionsAppender should be used with FlatteningPositionsAppender that makes sure
// append is called only with flat block
checkArgument(!(source instanceof RunLengthEncodedBlock));
switchToFlat();
delegate.append(positions, source);
}

@Override
public void appendRle(RunLengthEncodedBlock source)
{
if (source.getPositionCount() == 0) {
return;
}

if (rlePositionCount == 0) {
// initial empty state, switch to RLE state
rleValue = source.getValue();
rlePositionCount = source.getPositionCount();
}
else if (rleValue != null) {
// we are in the RLE state
if (equalOperator.equalNullSafe(rleValue, 0, source.getValue(), 0)) {
// the values match. we can just add positions.
this.rlePositionCount += source.getPositionCount();
return;
}
// RLE values do not match. switch to flat state
switchToFlat();
delegate.appendRle(source);
}
else {
// flat state
delegate.appendRle(source);
}
}

@Override
public Block build()
{
Block result;
if (rleValue != null) {
result = new RunLengthEncodedBlock(rleValue, rlePositionCount);
}
else {
result = delegate.build();
}

reset();
return result;
}

private void reset()
{
rleValue = null;
rlePositionCount = 0;
}

@Override
public long getRetainedSizeInBytes()
{
long retainedRleSize = rleValue != null ? rleValue.getRetainedSizeInBytes() : 0;
return INSTANCE_SIZE + retainedRleSize + delegate.getRetainedSizeInBytes();
}

@Override
public long getSizeInBytes()
{
long rleSize = rleValue != null ? rleValue.getSizeInBytes() : 0;
return rleSize + delegate.getSizeInBytes();
}

private void switchToFlat()
{
if (rleValue != null) {
// we are in the RLE state, flatten all RLE blocks
delegate.appendRle(new RunLengthEncodedBlock(rleValue, rlePositionCount));
rleValue = null;
}
rlePositionCount = NO_RLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public class LocalExecutionPlanner
private final BlockTypeOperators blockTypeOperators;
private final TableExecuteContextManager tableExecuteContextManager;
private final ExchangeManagerRegistry exchangeManagerRegistry;
private final PositionsAppenderFactory positionsAppenderFactory = new PositionsAppenderFactory();
private final PositionsAppenderFactory positionsAppenderFactory;

private final NonEvictableCache<FunctionKey, AccumulatorFactory> accumulatorFactoryCache = buildNonEvictableCache(CacheBuilder.newBuilder()
.maximumSize(1000)
Expand Down Expand Up @@ -465,6 +465,7 @@ public LocalExecutionPlanner(
this.blockTypeOperators = requireNonNull(blockTypeOperators, "blockTypeOperators is null");
this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");
this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
this.positionsAppenderFactory = new PositionsAppenderFactory(blockTypeOperators);
}

public LocalExecutionPlan plan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.trino.sql.planner.HashBucketFunction;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingTaskContext;
import io.trino.type.BlockTypeOperators;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -110,7 +111,7 @@
@BenchmarkMode(Mode.AverageTime)
public class BenchmarkPartitionedOutputOperator
{
private static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory();
private static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory(new BlockTypeOperators());

@Benchmark
public void addPage(BenchmarkData data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.spi.type.Type;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.testing.TestingTaskContext;
import io.trino.type.BlockTypeOperators;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -497,7 +498,7 @@ public void partitionPage(PagePartitioner pagePartitioner, Page page)

public static class PagePartitionerBuilder
{
public static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory();
public static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory(new BlockTypeOperators());
private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
private final OutputBuffer outputBuffer;
Expand Down
Loading

0 comments on commit 46a36ad

Please sign in to comment.