Skip to content

Commit

Permalink
Replace class isolation with manual dispatch
Browse files Browse the repository at this point in the history
To avoid block type pollution, use manual dispatch
over Block class instead of class isolation.
Before invoking dedicated PositionsAppender the
input Block is flattened so the dedicated appender
either gets flat Block or RLE Block with flat value.
Each dedicated, type-specific, PositionsAppender is
implemented without using BlockBuilder to increase
performance and to avoid jit inlining issues.

Microbenchmark results:
Benchmark                                   channelCount  enableCompression  nullRate  partitionCount  positionCount  type                                                     baseline  no        isolation      rle  no  isolation  rle  %
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         2               8192           BIGINT                                                   1301.217  485.526   -62.68677707
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT                                                   1324.132  541.814   -59.08157193
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT_PARTITION_CHANNEL_SKEWED                          1338.207  548.653   -59.00088701
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           DICTIONARY_BIGINT                                        1552.893  649.265   -58.18997188
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT_PARTITION_CHANNEL_20_PERCENT                      845.798   326.112   -61.44327605
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_20_PERCENT           656.293   437.755   -33.2988467
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_50_PERCENT           759.82    457.409   -39.80034745
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_80_PERCENT           850.399   472.21    -44.47194787
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT          991.474   507.408   -48.82286374
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT_MINUS_1  919.57    480.823   -47.71219157
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT_PARTITION_CHANNEL_RLE                             825.266   359.845   -56.39648307
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BIGINT_PARTITION_CHANNEL_RLE_NULL                        3.438     1.603     -53.37405468
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           LONG_DECIMAL                                             1560.603  633.44    -59.41056117
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           DICTIONARY_LONG_DECIMAL                                  1983.322  757.596   -61.80166408
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           INTEGER                                                  1362.036  508.811   -62.64335157
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           DICTIONARY_INTEGER                                       1548.895  585.053   -62.22771718
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           SMALLINT                                                 1290.688  482.476   -62.61869639
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           DICTIONARY_SMALLINT                                      1531.761  564.357   -63.15632791
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           BOOLEAN                                                  1371.602  493.602   -64.01273839
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           DICTIONARY_BOOLEAN                                       1451.336  595.603   -58.96174284
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           VARCHAR                                                  4027.003  3181.661  -20.99183934
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           DICTIONARY_VARCHAR                                       4226.032  3324.484  -21.33320335
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           ARRAY_BIGINT                                             738.844   505.346   -31.60315303
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           ARRAY_VARCHAR                                            2482.31   2247.198  -9.471500336
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           ARRAY_ARRAY_BIGINT                                       3804.695  3487.887  -8.326764695
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           MAP_BIGINT_BIGINT                                        2389.073  2509.251  5.030319291
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           MAP_BIGINT_MAP_BIGINT_BIGINT                             9909.356  9710.362  -2.008142608
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           ROW_BIGINT_BIGINT                                        993.453   646.287   -34.94538745
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           ROW_ARRAY_BIGINT_ARRAY_BIGINT                            2202.165  2205.541  0.1533036807
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         16              8192           ROW_RLE_BIGINT_BIGINT                                    878.079   788.256   -10.2294896
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0         256             8192           BIGINT                                                   1670.45   943.49    -43.5188123
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       2               8192           BIGINT                                                   1602.083  855.094   -46.62611113
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT                                                   1450.967  695.945   -52.03578028
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT_PARTITION_CHANNEL_SKEWED                          1749.628  992.395   -43.27965716
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           DICTIONARY_BIGINT                                        1768.296  794.583   -55.06504567
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT_PARTITION_CHANNEL_20_PERCENT                      1079.607  595.375   -44.85261767
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_20_PERCENT           909.682   708.798   -22.08288171
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_50_PERCENT           1007.795  715.258   -29.02743117
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_80_PERCENT           1108.153  742.894   -32.96106224
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT          1235.82   776.522   -37.16544481
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT_PARTITION_CHANNEL_DICTIONARY_100_PERCENT_MINUS_1  1163.168  748.659   -35.63621076
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT_PARTITION_CHANNEL_RLE                             1251.835  778.291   -37.82798851
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BIGINT_PARTITION_CHANNEL_RLE_NULL                        5.362     3.367     -37.20626632
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           LONG_DECIMAL                                             1702.922  772.704   -54.62481546
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           DICTIONARY_LONG_DECIMAL                                  2086.265  889.509   -57.36356599
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           INTEGER                                                  1462.114  636.766   -56.44894995
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           DICTIONARY_INTEGER                                       1765.352  745.925   -57.74638712
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           SMALLINT                                                 1507.112  623.829   -58.60765491
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           DICTIONARY_SMALLINT                                      1718.672  723.018   -57.93158904
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           BOOLEAN                                                  1437.562  563.497   -60.80189933
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           DICTIONARY_BOOLEAN                                       1581.486  654.235   -58.63162873
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           VARCHAR                                                  3723.315  2907.568  -21.90915891
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           DICTIONARY_VARCHAR                                       3869.583  3013.539  -22.12238373
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           ARRAY_BIGINT                                             905.913   692.206   -23.59023438
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           ARRAY_VARCHAR                                            2218.627  1844.282  -16.8728227
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           ARRAY_ARRAY_BIGINT                                       3499.471  3289.49   -6.000364055
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           MAP_BIGINT_BIGINT                                        2331.617  2309.136  -0.9641806523
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           MAP_BIGINT_MAP_BIGINT_BIGINT                             7705.042  7388.883  -4.103274194
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           ROW_BIGINT_BIGINT                                        942.5     908.927   -3.562122016
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           ROW_ARRAY_BIGINT_ARRAY_BIGINT                            2341.541  2318.315  -0.9919108826
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       16              8192           ROW_RLE_BIGINT_BIGINT                                    827.48    813.803   -1.652849616
BenchmarkPartitionedOutputOperator.addPage  1             FALSE              0.2       256             8192           BIGINT                                                   1861.543  1042.849  -43.97932253
BenchmarkPartitionedOutputOperator.addPage  2             FALSE              0         2               8192           BIGINT                                                   1545.104  604.727   -60.8617284
  • Loading branch information
lukasz-stec authored and sopel39 committed May 26, 2022
1 parent e3749a3 commit 65dd2af
Show file tree
Hide file tree
Showing 18 changed files with 1,862 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,174 @@
package io.trino.operator.output;

import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.ByteArrayBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.openjdk.jol.info.ClassLayout;

import java.util.Arrays;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize;
import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize;
import static java.lang.Math.max;

public class BytePositionsAppender
implements PositionsAppender
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(BytePositionsAppender.class).instanceSize();
private static final Block NULL_VALUE_BLOCK = new ByteArrayBlock(1, Optional.of(new boolean[] {true}), new byte[1]);

private boolean initialized;
private int initialEntryCount;

private int positionCount;
private boolean hasNullValue;
private boolean hasNonNullValue;

// it is assumed that these arrays are the same length
private boolean[] valueIsNull = new boolean[0];
private byte[] values = new byte[0];

private long retainedSizeInBytes;
private long sizeInBytes;

public BytePositionsAppender(int expectedEntries)
{
this.initialEntryCount = max(expectedEntries, 1);

updateRetainedSize();
}

@Override
public void appendTo(IntArrayList positions, Block block, BlockBuilder blockBuilder)
public void append(IntArrayList positions, Block block)
{
if (positions.isEmpty()) {
return;
}
// performance of this method depends on block being always the same, flat type
checkArgument(block instanceof ByteArrayBlock);
int[] positionArray = positions.elements();
int positionsSize = positions.size();
ensureCapacity(positionCount + positionsSize);

if (block.mayHaveNull()) {
for (int i = 0; i < positions.size(); i++) {
for (int i = 0; i < positionsSize; i++) {
int position = positionArray[i];
if (block.isNull(position)) {
blockBuilder.appendNull();
boolean isNull = block.isNull(position);
int positionIndex = positionCount + i;
if (isNull) {
valueIsNull[positionIndex] = true;
hasNullValue = true;
}
else {
blockBuilder.writeByte(block.getByte(position, 0)).closeEntry();
values[positionIndex] = block.getByte(position, 0);
hasNonNullValue = true;
}
}
positionCount += positionsSize;
}
else {
for (int i = 0; i < positions.size(); i++) {
blockBuilder.writeByte(block.getByte(positionArray[i], 0)).closeEntry();
for (int i = 0; i < positionsSize; i++) {
int position = positionArray[i];
values[positionCount + i] = block.getByte(position, 0);
}
positionCount += positionsSize;
hasNonNullValue = true;
}

updateSize(positionsSize);
}

@Override
public void appendRle(RunLengthEncodedBlock block)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
int sourcePosition = 0;
ensureCapacity(positionCount + rlePositionCount);
if (block.isNull(sourcePosition)) {
Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true);
hasNullValue = true;
}
else {
byte value = block.getByte(sourcePosition, 0);
Arrays.fill(values, positionCount, positionCount + rlePositionCount, value);
hasNonNullValue = true;
}
positionCount += rlePositionCount;

updateSize(rlePositionCount);
}

@Override
public Block build()
{
if (!hasNonNullValue) {
return new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
}
ByteArrayBlock result = new ByteArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
reset();
return result;
}

@Override
public long getRetainedSizeInBytes()
{
return retainedSizeInBytes;
}

@Override
public long getSizeInBytes()
{
return sizeInBytes;
}

private void reset()
{
initialEntryCount = calculateBlockResetSize(positionCount);
initialized = false;
valueIsNull = new boolean[0];
values = new byte[0];
positionCount = 0;
sizeInBytes = 0;
hasNonNullValue = false;
hasNullValue = false;
updateRetainedSize();
}

private void ensureCapacity(int capacity)
{
if (values.length >= capacity) {
return;
}

int newSize;
if (initialized) {
newSize = calculateNewArraySize(values.length);
}
else {
newSize = initialEntryCount;
initialized = true;
}
newSize = Math.max(newSize, capacity);

valueIsNull = Arrays.copyOf(valueIsNull, newSize);
values = Arrays.copyOf(values, newSize);
updateRetainedSize();
}

private void updateSize(long positionsSize)
{
sizeInBytes += ByteArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize;
}

private void updateRetainedSize()
{
retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,187 @@
package io.trino.operator.output;

import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.Int128ArrayBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.openjdk.jol.info.ClassLayout;

import java.util.Arrays;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize;
import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize;
import static java.lang.Math.max;

public class Int128PositionsAppender
implements PositionsAppender
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(Int128PositionsAppender.class).instanceSize();
private static final Block NULL_VALUE_BLOCK = new Int128ArrayBlock(1, Optional.of(new boolean[] {true}), new long[2]);

private boolean initialized;
private int initialEntryCount;

private int positionCount;
private boolean hasNullValue;
private boolean hasNonNullValue;

// it is assumed that these arrays are the same length
private boolean[] valueIsNull = new boolean[0];
private long[] values = new long[0];

private long retainedSizeInBytes;
private long sizeInBytes;

public Int128PositionsAppender(int expectedEntries)
{
this.initialEntryCount = max(expectedEntries, 1);

updateRetainedSize();
}

@Override
public void appendTo(IntArrayList positions, Block block, BlockBuilder blockBuilder)
public void append(IntArrayList positions, Block block)
{
if (positions.isEmpty()) {
return;
}
// performance of this method depends on block being always the same, flat type
checkArgument(block instanceof Int128ArrayBlock);
int[] positionArray = positions.elements();
int positionsSize = positions.size();
ensureCapacity(positionCount + positionsSize);

if (block.mayHaveNull()) {
for (int i = 0; i < positions.size(); i++) {
int positionIndex = positionCount * 2;
for (int i = 0; i < positionsSize; i++) {
int position = positionArray[i];
if (block.isNull(position)) {
blockBuilder.appendNull();
boolean isNull = block.isNull(position);

if (isNull) {
valueIsNull[positionCount + i] = true;
hasNullValue = true;
}
else {
blockBuilder.writeLong(block.getLong(position, 0));
blockBuilder.writeLong(block.getLong(position, SIZE_OF_LONG));
blockBuilder.closeEntry();
values[positionIndex] = block.getLong(position, 0);
values[positionIndex + 1] = block.getLong(position, SIZE_OF_LONG);
hasNonNullValue = true;
}
positionIndex += 2;
}
positionCount += positionsSize;
}
else {
for (int i = 0; i < positions.size(); i++) {
int positionIndex = positionCount * 2;
for (int i = 0; i < positionsSize; i++) {
int position = positionArray[i];
blockBuilder.writeLong(block.getLong(position, 0));
blockBuilder.writeLong(block.getLong(position, SIZE_OF_LONG));
blockBuilder.closeEntry();
values[positionIndex] = block.getLong(position, 0);
values[positionIndex + 1] = block.getLong(position, SIZE_OF_LONG);
positionIndex += 2;
}
positionCount += positionsSize;
hasNonNullValue = true;
}

updateSize(positionsSize);
}

@Override
public void appendRle(RunLengthEncodedBlock block)
{
int rlePositionCount = block.getPositionCount();
if (rlePositionCount == 0) {
return;
}
int sourcePosition = 0;
ensureCapacity(positionCount + rlePositionCount);
if (block.isNull(sourcePosition)) {
Arrays.fill(valueIsNull, positionCount, positionCount + rlePositionCount, true);
hasNullValue = true;
}
else {
long valueHigh = block.getLong(sourcePosition, 0);
long valueLow = block.getLong(sourcePosition, SIZE_OF_LONG);
int positionIndex = positionCount * 2;
for (int i = 0; i < rlePositionCount; i++) {
values[positionIndex] = valueHigh;
values[positionIndex + 1] = valueLow;
positionIndex += 2;
}
hasNonNullValue = true;
}
positionCount += rlePositionCount;

updateSize(rlePositionCount);
}

@Override
public Block build()
{
if (!hasNonNullValue) {
return new RunLengthEncodedBlock(NULL_VALUE_BLOCK, positionCount);
}
Int128ArrayBlock result = new Int128ArrayBlock(positionCount, hasNullValue ? Optional.of(valueIsNull) : Optional.empty(), values);
reset();
return result;
}

@Override
public long getRetainedSizeInBytes()
{
return retainedSizeInBytes;
}

@Override
public long getSizeInBytes()
{
return sizeInBytes;
}

private void reset()
{
initialEntryCount = calculateBlockResetSize(positionCount);
initialized = false;
valueIsNull = new boolean[0];
values = new long[0];
positionCount = 0;
sizeInBytes = 0;
hasNonNullValue = false;
hasNullValue = false;
updateRetainedSize();
}

private void ensureCapacity(int capacity)
{
if (valueIsNull.length >= capacity) {
return;
}

int newSize;
if (initialized) {
newSize = calculateNewArraySize(valueIsNull.length);
}
else {
newSize = initialEntryCount;
initialized = true;
}
newSize = Math.max(newSize, capacity);

valueIsNull = Arrays.copyOf(valueIsNull, newSize);
values = Arrays.copyOf(values, newSize * 2);
updateRetainedSize();
}

private void updateSize(long positionsSize)
{
sizeInBytes += Int128ArrayBlock.SIZE_IN_BYTES_PER_POSITION * positionsSize;
}

private void updateRetainedSize()
{
retainedSizeInBytes = INSTANCE_SIZE + sizeOf(valueIsNull) + sizeOf(values);
}
}
Loading

0 comments on commit 65dd2af

Please sign in to comment.