Skip to content

Commit

Permalink
Optimize writing RLE runs in parquet column descriptors
Browse files Browse the repository at this point in the history
Use information about nullability of Blocks to write RLE runs
for repetition and definition levels more efficiently in parquet writer

BenchmarkParquetFormat#write UNCOMPRESSED
                          Before                          After
LINEITEM                  293.0MB/s ± 2869.6kB/s (0.96%)  312.9MB/s ± 2869.2kB/s (0.90%) (N = 10, α = 99.9%)
MAP_VARCHAR_DOUBLE        345.4MB/s ± 3275.7kB/s (0.93%)  359.6MB/s ± 5555.4kB/s (1.51%) (N = 10, α = 99.9%)
LARGE_MAP_VARCHAR_DOUBLE  402.0MB/s ± 6815.6kB/s (1.66%)  448.6MB/s ± 4808.3kB/s (1.05%) (N = 10, α = 99.9%)
MAP_INT_DOUBLE            606.2MB/s ± 2136.1kB/s (0.34%)  676.1MB/s ± 5620.8kB/s (0.81%) (N = 10, α = 99.9%)
LARGE_ARRAY_VARCHAR       257.8MB/s ± 9303.4kB/s (3.52%)  275.7MB/s ± 2583.1kB/s (0.91%) (N = 10, α = 99.9%)
  • Loading branch information
raunaqmorarka committed May 28, 2024
1 parent 2cb0023 commit db64b88
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ public ValuesCount writeDefinitionLevels(int positionsCount)
checkValidPosition(offset, positionsCount, block.getPositionCount());
int nonNullsCount = 0;
if (!block.mayHaveNull()) {
for (int position = offset; position < offset + positionsCount; position++) {
encoder.writeInteger(maxDefinitionLevel);
}
encoder.writeRepeatInteger(maxDefinitionLevel, positionsCount);
nonNullsCount = positionsCount;
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ public void writeRepetitionLevels(int parentLevel)
public void writeRepetitionLevels(int parentLevel, int positionsCount)
{
checkValidPosition(offset, positionsCount, block.getPositionCount());
for (int i = 0; i < positionsCount; i++) {
encoder.writeInteger(parentLevel);
}
encoder.writeRepeatInteger(parentLevel, positionsCount);
offset += positionsCount;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ public interface ColumnDescriptorValuesWriter
*/
void writeInteger(int value);

/**
* @param value the value to encode
* @param valueRepetitions number of times the input value is repeated in the input stream
*/
void writeRepeatInteger(int value, int valueRepetitions);

/**
* used to decide if we want to work to the next page
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public void reset() {}
@Override
public void writeInteger(int v) {}

@Override
public void writeRepeatInteger(int value, int valueRepetitions) {}

@Override
public BytesInput getBytes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,42 +133,69 @@ private void reset(boolean resetBaos)
public void writeInt(int value)
throws IOException
{
if (value == previousValue) {
// keep track of how many times we've seen this value
// consecutively
++repeatCount;
writeRepeatedInteger(value, 1);
}

if (repeatCount >= 8) {
// we've seen this at least 8 times, we're
// certainly going to write an rle-run,
// so just keep on counting repeats for now
return;
}
public void writeRepeatedInteger(int value, int valueRepetitions)
throws IOException
{
if (valueRepetitions == 0) {
return;
}
else {
// This is a new value, check if it signals the end of
// an rle-run
// Process 1st occurrence of new value
if (value != previousValue) {
// This is a new value, check if it signals the end of an rle-run
if (repeatCount >= 8) {
// it does! write an rle-run
writeRleRun();
}

// this is a new value so we've only seen it once
repeatCount = 1;
valueRepetitions--;
// start tracking this value for repeats
previousValue = value;

bufferedValues[numBufferedValues++] = value;
if (numBufferedValues == 8) {
// we've encountered less than 8 repeated values, so
// either start a new bit-packed-run or append to the
// current bit-packed-run
writeOrAppendBitPackedRun();
// we're going to see this value at least 8 times, so
// just count remaining repeats for an rle-run
if (valueRepetitions >= 8) {
repeatCount = valueRepetitions;
return;
}
}
}

// We have not seen enough repeats to justify an rle-run yet,
// so buffer this value in case we decide to write a bit-packed-run
bufferedValues[numBufferedValues] = value;
++numBufferedValues;
// Process remaining repetitions of value
while (valueRepetitions > 0) {
repeatCount++;
valueRepetitions--;
if (repeatCount >= 8) {
// we've seen this at least 8 times, we're
// certainly going to write an rle-run,
// so just keep on counting repeats for now
repeatCount += valueRepetitions;
return;
}

if (numBufferedValues == 8) {
// we've encountered less than 8 repeated values, so
// either start a new bit-packed-run or append to the
// current bit-packed-run
writeOrAppendBitPackedRun();
bufferedValues[numBufferedValues++] = value;
if (numBufferedValues == 8) {
// we've encountered less than 8 repeated values, so
// either start a new bit-packed-run or append to the
// current bit-packed-run
writeOrAppendBitPackedRun();
if (valueRepetitions >= 8) {
// we're going to see this value at least 8 times, so
// just count remaining repeats for an rle-run
repeatCount = valueRepetitions;
return;
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ public void writeInteger(int value)
}
}

@Override
public void writeRepeatInteger(int value, int valueRepetitions)
{
try {
encoder.writeRepeatedInteger(value, valueRepetitions);
}
catch (IOException e) {
throw new ParquetEncodingException(e);
}
}

@Override
public long getBufferedSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,49 @@ public final class TestData
{
private TestData() {}

public enum UnsignedIntsGenerator
{
RANDOM {
@Override
public int[] getData(int size, int bitWidth)
{
Random random = new Random((long) size * bitWidth);
int[] values = new int[size];
for (int i = 0; i < size; i++) {
values[i] = randomUnsignedInt(random, bitWidth);
}
return values;
}
},
MIXED_AND_GROUPS_SMALL {
@Override
public int[] getData(int size, int bitWidth)
{
Random random = new Random((long) size * bitWidth);
return generateMixedData(random, size, 13, bitWidth);
}
},
MIXED_AND_GROUPS_LARGE {
@Override
public int[] getData(int size, int bitWidth)
{
Random random = new Random((long) size * bitWidth);
return generateMixedData(random, size, 67, bitWidth);
}
},
MIXED_AND_GROUPS_HUGE {
@Override
public int[] getData(int size, int bitWidth)
{
Random random = new Random((long) size * bitWidth);
return generateMixedData(random, size, 997, bitWidth);
}
},
/**/;

public abstract int[] getData(int size, int bitWidth);
}

// Based on org.apache.parquet.schema.Types.BasePrimitiveBuilder.maxPrecision to determine the max decimal precision supported by INT32/INT64
public static int maxPrecision(int numBytes)
{
Expand Down Expand Up @@ -98,29 +141,6 @@ public static boolean[] generateMixedData(Random r, int size, int maxGroupSize)
return result;
}

public static int[] generateMixedData(Random r, int size, int maxGroupSize, int bitWidth)
{
IntList mixedList = new IntArrayList();
while (mixedList.size() < size) {
boolean isGroup = r.nextBoolean();
int groupSize = r.nextInt(maxGroupSize);
if (isGroup) {
int value = randomInt(r, bitWidth);
for (int i = 0; i < groupSize; i++) {
mixedList.add(value);
}
}
else {
for (int i = 0; i < groupSize; i++) {
mixedList.add(randomInt(r, bitWidth));
}
}
}
int[] result = new int[size];
mixedList.getElements(0, result, 0, size);
return result;
}

public static Slice randomBigInteger(Random r)
{
BigInteger bigInteger = new BigInteger(126, r);
Expand Down Expand Up @@ -203,6 +223,29 @@ public static byte[][] randomAsciiData(int size, int minLength, int maxLength)
return data;
}

private static int[] generateMixedData(Random r, int size, int maxGroupSize, int bitWidth)
{
IntList mixedList = new IntArrayList();
while (mixedList.size() < size) {
boolean isGroup = r.nextBoolean();
int groupSize = r.nextInt(maxGroupSize);
if (isGroup) {
int value = randomUnsignedInt(r, bitWidth);
for (int i = 0; i < groupSize; i++) {
mixedList.add(value);
}
}
else {
for (int i = 0; i < groupSize; i++) {
mixedList.add(randomUnsignedInt(r, bitWidth));
}
}
}
int[] result = new int[size];
mixedList.getElements(0, result, 0, size);
return result;
}

private static int propagateSignBit(int value, int bitsToPad)
{
return value << bitsToPad >> bitsToPad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Random;

import static io.trino.jmh.Benchmarks.benchmark;
import static io.trino.parquet.reader.TestData.generateMixedData;
import static io.trino.parquet.reader.TestData.randomUnsignedInt;
import static io.trino.parquet.reader.TestData.UnsignedIntsGenerator;
import static java.lang.Math.min;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -57,7 +55,7 @@ public class BenchmarkRleBitPackingDecoder
private int[] output;

@Param
public DataSet dataSet;
public UnsignedIntsGenerator dataSet;

@Param({
// This encoding is not meant to store big numbers so 2^20 is enough
Expand All @@ -67,49 +65,6 @@ public class BenchmarkRleBitPackingDecoder
})
public int bitWidth;

public enum DataSet
{
RANDOM {
@Override
int[] getData(int size, int bitWidth)
{
Random random = new Random((long) size * bitWidth);
int[] values = new int[size];
for (int i = 0; i < size; i++) {
values[i] = randomUnsignedInt(random, bitWidth);
}
return values;
}
},
MIXED_AND_GROUPS_SMALL {
@Override
int[] getData(int size, int bitWidth)
{
Random random = new Random((long) size * bitWidth);
return generateMixedData(random, size, 23, bitWidth);
}
},
MIXED_AND_GROUPS_LARGE {
@Override
int[] getData(int size, int bitWidth)
{
Random random = new Random((long) size * bitWidth);
return generateMixedData(random, size, 127, bitWidth);
}
},
MIXED_AND_GROUPS_HUGE {
@Override
int[] getData(int size, int bitWidth)
{
Random random = new Random((long) size * bitWidth);
return generateMixedData(random, size, 2111, bitWidth);
}
},
/**/;

abstract int[] getData(int size, int bitWidth);
}

@Setup
public void setup()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

import java.io.IOException;

import static io.trino.parquet.reader.TestData.UnsignedIntsGenerator;

public class TestRleBitPackingDecoderBenchmark
{
@Test
public void testRleBitPackingDecoderBenchmark()
throws IOException
{
for (int bitWidth = 1; bitWidth <= 20; bitWidth++) {
for (BenchmarkRleBitPackingDecoder.DataSet dataSet : BenchmarkRleBitPackingDecoder.DataSet.values()) {
for (UnsignedIntsGenerator dataSet : UnsignedIntsGenerator.values()) {
BenchmarkRleBitPackingDecoder benchmark = new BenchmarkRleBitPackingDecoder();
benchmark.bitWidth = bitWidth;
benchmark.dataSet = dataSet;
Expand Down
Loading

0 comments on commit db64b88

Please sign in to comment.