Skip to content

Commit

Permalink
Reuse compressionBuffer when serializing pages
Browse files Browse the repository at this point in the history
BenchmarkPartitionedOutputOperator shows 8% gain in elapsed time:

Always allocate compressionBuffer (existing behavior):
Benchmark                                   Mode  Cnt     Score     Error  Units
BenchmarkPartitionedOutputOperator.addPage  avgt   20  4749.447 ± 380.463  ms/op

Reuse compressionBuffer (current commit):
Benchmark                                   Mode  Cnt     Score     Error  Units
BenchmarkPartitionedOutputOperator.addPage  avgt   30  4380.171 ± 148.011  ms/op

BenchmarkCompressToByteBuffer shows using ByteBuffer and byte[] as compression
buffer has similar performance. We used byte[] because the code is
cleaner.

Benchmark                                          Mode  Cnt    Score   Error  Units
BenchmarkCompressToByteBuffer.compressToByteArray  avgt   60  181.677 ± 9.927  us/op
BenchmarkCompressToByteBuffer.compressToByteBuffer  avgt   60  189.371 ± 3.747  us/op
  • Loading branch information
Ying Su authored and mbasmanova committed Sep 3, 2019
1 parent 97b9c11 commit 60a5a5c
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@
import java.nio.ByteBuffer;
import java.util.Optional;

import static com.facebook.presto.array.Arrays.ensureCapacity;
import static com.facebook.presto.execution.buffer.PageCodecMarker.COMPRESSED;
import static com.facebook.presto.execution.buffer.PageCodecMarker.ENCRYPTED;
import static com.facebook.presto.execution.buffer.PagesSerdeUtil.readRawPage;
import static com.facebook.presto.execution.buffer.PagesSerdeUtil.writeRawPage;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;

@NotThreadSafe
public class PagesSerde
Expand All @@ -47,6 +50,8 @@ public class PagesSerde
private final Optional<Decompressor> decompressor;
private final Optional<SpillCipher> spillCipher;

private byte[] compressionBuffer;

public PagesSerde(BlockEncodingSerde blockEncodingSerde, Optional<Compressor> compressor, Optional<Decompressor> decompressor, Optional<SpillCipher> spillCipher)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
Expand All @@ -62,15 +67,23 @@ public SerializedPage serialize(Page page)
SliceOutput serializationBuffer = new DynamicSliceOutput(toIntExact(page.getSizeInBytes() + Integer.BYTES)); // block length is an int
writeRawPage(page, serializationBuffer, blockEncodingSerde);
Slice slice = serializationBuffer.slice();

int uncompressedSize = serializationBuffer.size();
byte markers = PageCodecMarker.none();

if (compressor.isPresent()) {
ByteBuffer compressionBuffer = ByteBuffer.allocate(compressor.get().maxCompressedLength(uncompressedSize));
compressor.get().compress(slice.toByteBuffer(), compressionBuffer);
compressionBuffer.flip();
if ((((double) compressionBuffer.remaining()) / uncompressedSize) <= MINIMUM_COMPRESSION_RATIO) {
slice = Slices.wrappedBuffer(compressionBuffer);
int maxCompressedSize = compressor.get().maxCompressedLength(uncompressedSize);
compressionBuffer = ensureCapacity(compressionBuffer, maxCompressedSize);
int compressedSize = compressor.get().compress(
(byte[]) slice.getBase(),
(int) (slice.getAddress() - ARRAY_BYTE_BASE_OFFSET),
uncompressedSize,
compressionBuffer,
0,
maxCompressedSize);

if (compressedSize / (double) uncompressedSize <= MINIMUM_COMPRESSION_RATIO) {
slice = Slices.copyOf(Slices.wrappedBuffer(compressionBuffer, 0, compressedSize));
markers = COMPRESSED.set(markers);
}
}
Expand Down Expand Up @@ -114,4 +127,14 @@ public Page deserialize(SerializedPage serializedPage)

return readRawPage(serializedPage.getPositionCount(), slice.getInput(), blockEncodingSerde);
}

public long getSizeInBytes()
{
return compressionBuffer == null ? 0 : compressionBuffer.length;
}

public long getRetainedSizeInBytes()
{
return sizeOf(compressionBuffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public long getSizeInBytes()
{
// We use a foreach loop instead of streams
// as it has much better performance.
long sizeInBytes = 0;
long sizeInBytes = serde.getSizeInBytes();
for (PageBuilder pageBuilder : pageBuilders) {
sizeInBytes += pageBuilder.getSizeInBytes();
}
Expand All @@ -362,7 +362,7 @@ public long getSizeInBytes()
*/
public long getRetainedSizeInBytes()
{
long sizeInBytes = 0;
long sizeInBytes = serde.getRetainedSizeInBytes();
for (PageBuilder pageBuilder : pageBuilders) {
sizeInBytes += pageBuilder.getRetainedSizeInBytes();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import io.airlift.compress.Compressor;
import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;

import java.nio.ByteBuffer;
import java.util.Random;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

@State(Scope.Thread)
@OutputTimeUnit(MICROSECONDS)
@Fork(3)
@Warmup(iterations = 20, time = 500, timeUnit = MILLISECONDS)
@Measurement(iterations = 20, time = 500, timeUnit = MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
public class BenchmarkCompressToByteBuffer
{
@Benchmark
public void compressToByteBuffer(BenchmarkData data)
{
data.byteBuffer.mark();
data.COMPRESSOR.compress(data.slice.toByteBuffer(), data.byteBuffer);
data.byteBuffer.reset();
}

@Benchmark
public void compressToByteArray(BenchmarkData data)
{
data.COMPRESSOR.compress((byte[]) data.slice.getBase(), 0, data.slice.length(), data.bytes, 0, data.MAX_COMPRESSED_SIZE);
}

@State(Scope.Thread)
public static class BenchmarkData
{
private static final Random RANDOM = new Random(0);
private static final Compressor COMPRESSOR = new Lz4Compressor();
private static final int UNCOMPRESSED_SIZE = 1_000_000;
private static final int MAX_COMPRESSED_SIZE = COMPRESSOR.maxCompressedLength(UNCOMPRESSED_SIZE);

private final byte[] byteValues = new byte[UNCOMPRESSED_SIZE];
private final Slice slice = Slices.wrappedBuffer(byteValues);

private final byte[] bytes = new byte[MAX_COMPRESSED_SIZE];
private final ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_COMPRESSED_SIZE);

@Setup
public void setup()
{
// Generate discontinuous runs of random values and 0's to avoid LZ4 enters uncompressible fast-path
int runLength = UNCOMPRESSED_SIZE / 10;
byte[] randomBytes = new byte[runLength];
for (int i = 0; i < 10; i += 2) {
RANDOM.nextBytes(randomBytes);
System.arraycopy(randomBytes, 0, byteValues, i * runLength, runLength);
}
}
}

public static void main(String[] args)
throws RunnerException
{
Options options = new OptionsBuilder()
.verbosity(VerboseMode.NORMAL)
.include(".*" + BenchmarkCompressToByteBuffer.class.getSimpleName() + ".*")
.build();
new Runner(options).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
Expand Down Expand Up @@ -73,9 +74,9 @@

@State(Scope.Thread)
@OutputTimeUnit(MILLISECONDS)
@Fork(2)
@Warmup(iterations = 20, time = 500, timeUnit = MILLISECONDS)
@Measurement(iterations = 20, time = 500, timeUnit = MILLISECONDS)
@Fork(3)
@Warmup(iterations = 10, time = 500, timeUnit = MILLISECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
public class BenchmarkPartitionedOutputOperator
{
Expand All @@ -101,6 +102,9 @@ public static class BenchmarkData
private static final ExecutorService EXECUTOR = newCachedThreadPool(daemonThreadsNamed("test-EXECUTOR-%s"));
private static final ScheduledExecutorService SCHEDULER = newScheduledThreadPool(1, daemonThreadsNamed("test-%s"));

@Param({"true", "false"})
private boolean enableCompression;

private final Page dataPage = createPage();

private int getPageCount()
Expand All @@ -116,7 +120,7 @@ public Page getDataPage()
private PartitionedOutputOperator createPartitionedOutputOperator()
{
PartitionFunction partitionFunction = new LocalPartitionGenerator(new InterpretedHashGenerator(ImmutableList.of(BIGINT), new int[] {0}), PARTITION_COUNT);
PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new BlockEncodingManager(new TypeRegistry()), false);
PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new BlockEncodingManager(new TypeRegistry()), enableCompression);
OutputBuffers buffers = createInitialEmptyOutputBuffers(PARTITIONED);
for (int partition = 0; partition < PARTITION_COUNT; partition++) {
buffers = buffers.withBuffer(new OutputBuffers.OutputBufferId(partition), partition);
Expand Down

0 comments on commit 60a5a5c

Please sign in to comment.