diff --git a/presto-main/src/main/java/com/facebook/presto/execution/buffer/PagesSerde.java b/presto-main/src/main/java/com/facebook/presto/execution/buffer/PagesSerde.java index 74b77228f00c3..467d1cd4f2088 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/buffer/PagesSerde.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/buffer/PagesSerde.java @@ -28,14 +28,17 @@ import java.nio.ByteBuffer; import java.util.Optional; +import static com.facebook.presto.array.Arrays.ensureCapacity; import static com.facebook.presto.execution.buffer.PageCodecMarker.COMPRESSED; import static com.facebook.presto.execution.buffer.PageCodecMarker.ENCRYPTED; import static com.facebook.presto.execution.buffer.PagesSerdeUtil.readRawPage; import static com.facebook.presto.execution.buffer.PagesSerdeUtil.writeRawPage; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static io.airlift.slice.SizeOf.sizeOf; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; +import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; @NotThreadSafe public class PagesSerde @@ -47,6 +50,8 @@ public class PagesSerde private final Optional decompressor; private final Optional spillCipher; + private byte[] compressionBuffer; + public PagesSerde(BlockEncodingSerde blockEncodingSerde, Optional compressor, Optional decompressor, Optional spillCipher) { this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); @@ -62,15 +67,23 @@ public SerializedPage serialize(Page page) SliceOutput serializationBuffer = new DynamicSliceOutput(toIntExact(page.getSizeInBytes() + Integer.BYTES)); // block length is an int writeRawPage(page, serializationBuffer, blockEncodingSerde); Slice slice = serializationBuffer.slice(); + int uncompressedSize = serializationBuffer.size(); byte markers = PageCodecMarker.none(); if (compressor.isPresent()) { - ByteBuffer compressionBuffer = ByteBuffer.allocate(compressor.get().maxCompressedLength(uncompressedSize)); - compressor.get().compress(slice.toByteBuffer(), compressionBuffer); - compressionBuffer.flip(); - if ((((double) compressionBuffer.remaining()) / uncompressedSize) <= MINIMUM_COMPRESSION_RATIO) { - slice = Slices.wrappedBuffer(compressionBuffer); + int maxCompressedSize = compressor.get().maxCompressedLength(uncompressedSize); + compressionBuffer = ensureCapacity(compressionBuffer, maxCompressedSize); + int compressedSize = compressor.get().compress( + (byte[]) slice.getBase(), + (int) (slice.getAddress() - ARRAY_BYTE_BASE_OFFSET), + uncompressedSize, + compressionBuffer, + 0, + maxCompressedSize); + + if (compressedSize / (double) uncompressedSize <= MINIMUM_COMPRESSION_RATIO) { + slice = Slices.copyOf(Slices.wrappedBuffer(compressionBuffer, 0, compressedSize)); markers = COMPRESSED.set(markers); } } @@ -114,4 +127,14 @@ public Page deserialize(SerializedPage serializedPage) return readRawPage(serializedPage.getPositionCount(), slice.getInput(), blockEncodingSerde); } + + public long getSizeInBytes() + { + return compressionBuffer == null ? 0 : compressionBuffer.length; + } + + public long getRetainedSizeInBytes() + { + return sizeOf(compressionBuffer); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PartitionedOutputOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/PartitionedOutputOperator.java index d3b9621b8e5ee..b34c4045efd03 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PartitionedOutputOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PartitionedOutputOperator.java @@ -350,7 +350,7 @@ public long getSizeInBytes() { // We use a foreach loop instead of streams // as it has much better performance. - long sizeInBytes = 0; + long sizeInBytes = serde.getSizeInBytes(); for (PageBuilder pageBuilder : pageBuilders) { sizeInBytes += pageBuilder.getSizeInBytes(); } @@ -362,7 +362,7 @@ public long getSizeInBytes() */ public long getRetainedSizeInBytes() { - long sizeInBytes = 0; + long sizeInBytes = serde.getRetainedSizeInBytes(); for (PageBuilder pageBuilder : pageBuilders) { sizeInBytes += pageBuilder.getRetainedSizeInBytes(); } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkCompressToByteBuffer.java b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkCompressToByteBuffer.java new file mode 100644 index 0000000000000..b93f521470107 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkCompressToByteBuffer.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.operator; + +import io.airlift.compress.Compressor; +import io.airlift.compress.lz4.Lz4Compressor; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.nio.ByteBuffer; +import java.util.Random; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +@State(Scope.Thread) +@OutputTimeUnit(MICROSECONDS) +@Fork(3) +@Warmup(iterations = 20, time = 500, timeUnit = MILLISECONDS) +@Measurement(iterations = 20, time = 500, timeUnit = MILLISECONDS) +@BenchmarkMode(Mode.AverageTime) +public class BenchmarkCompressToByteBuffer +{ + @Benchmark + public void compressToByteBuffer(BenchmarkData data) + { + data.byteBuffer.mark(); + data.COMPRESSOR.compress(data.slice.toByteBuffer(), data.byteBuffer); + data.byteBuffer.reset(); + } + + @Benchmark + public void compressToByteArray(BenchmarkData data) + { + data.COMPRESSOR.compress((byte[]) data.slice.getBase(), 0, data.slice.length(), data.bytes, 0, data.MAX_COMPRESSED_SIZE); + } + + @State(Scope.Thread) + public static class BenchmarkData + { + private static final Random RANDOM = new Random(0); + private static final Compressor COMPRESSOR = new Lz4Compressor(); + private static final int UNCOMPRESSED_SIZE = 1_000_000; + private static final int MAX_COMPRESSED_SIZE = COMPRESSOR.maxCompressedLength(UNCOMPRESSED_SIZE); + + private final byte[] byteValues = new byte[UNCOMPRESSED_SIZE]; + private final Slice slice = Slices.wrappedBuffer(byteValues); + + private final byte[] bytes = new byte[MAX_COMPRESSED_SIZE]; + private final ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_COMPRESSED_SIZE); + + @Setup + public void setup() + { + // Generate discontinuous runs of random values and 0's to avoid LZ4 enters uncompressible fast-path + int runLength = UNCOMPRESSED_SIZE / 10; + byte[] randomBytes = new byte[runLength]; + for (int i = 0; i < 10; i += 2) { + RANDOM.nextBytes(randomBytes); + System.arraycopy(randomBytes, 0, byteValues, i * runLength, runLength); + } + } + } + + public static void main(String[] args) + throws RunnerException + { + Options options = new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + BenchmarkCompressToByteBuffer.class.getSimpleName() + ".*") + .build(); + new Runner(options).run(); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkPartitionedOutputOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkPartitionedOutputOperator.java index 7eec1e79f9f13..21833b6c819b2 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkPartitionedOutputOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkPartitionedOutputOperator.java @@ -37,6 +37,7 @@ import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; @@ -73,9 +74,9 @@ @State(Scope.Thread) @OutputTimeUnit(MILLISECONDS) -@Fork(2) -@Warmup(iterations = 20, time = 500, timeUnit = MILLISECONDS) -@Measurement(iterations = 20, time = 500, timeUnit = MILLISECONDS) +@Fork(3) +@Warmup(iterations = 10, time = 500, timeUnit = MILLISECONDS) +@Measurement(iterations = 10, time = 500, timeUnit = MILLISECONDS) @BenchmarkMode(Mode.AverageTime) public class BenchmarkPartitionedOutputOperator { @@ -101,6 +102,9 @@ public static class BenchmarkData private static final ExecutorService EXECUTOR = newCachedThreadPool(daemonThreadsNamed("test-EXECUTOR-%s")); private static final ScheduledExecutorService SCHEDULER = newScheduledThreadPool(1, daemonThreadsNamed("test-%s")); + @Param({"true", "false"}) + private boolean enableCompression; + private final Page dataPage = createPage(); private int getPageCount() @@ -116,7 +120,7 @@ public Page getDataPage() private PartitionedOutputOperator createPartitionedOutputOperator() { PartitionFunction partitionFunction = new LocalPartitionGenerator(new InterpretedHashGenerator(ImmutableList.of(BIGINT), new int[] {0}), PARTITION_COUNT); - PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new BlockEncodingManager(new TypeRegistry()), false); + PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new BlockEncodingManager(new TypeRegistry()), enableCompression); OutputBuffers buffers = createInitialEmptyOutputBuffers(PARTITIONED); for (int partition = 0; partition < PARTITION_COUNT; partition++) { buffers = buffers.withBuffer(new OutputBuffers.OutputBufferId(partition), partition);