Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse compressionBuffer when serializing pages #13232

Merged
merged 1 commit into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@
import java.nio.ByteBuffer;
import java.util.Optional;

import static com.facebook.presto.array.Arrays.ensureCapacity;
import static com.facebook.presto.execution.buffer.PageCodecMarker.COMPRESSED;
import static com.facebook.presto.execution.buffer.PageCodecMarker.ENCRYPTED;
import static com.facebook.presto.execution.buffer.PagesSerdeUtil.readRawPage;
import static com.facebook.presto.execution.buffer.PagesSerdeUtil.writeRawPage;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;

@NotThreadSafe
public class PagesSerde
Expand All @@ -47,6 +50,8 @@ public class PagesSerde
private final Optional<Decompressor> decompressor;
private final Optional<SpillCipher> spillCipher;

private byte[] compressionBuffer;
Copy link
Contributor

@pettyjamesm pettyjamesm Aug 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry about this approach is basically the same as I mentioned on a previous PR: this buffer will grow to the compressor.maxCompressedLength(uncompressedSize) of the largest input it receives and will not be eligible for garbage collection until the whole PagesSerde instance is. This will be true for all PagesSerde instances, so while adding this buffer reduces GC pressure by allocating less (and spending less time spent zeroing memory), it could risk increasing overall memory usage and cause out of memory problems when heap space is constrained.

This is less of a concern if all pages serialized are small, but serializing one large page will having a lasting effect on the untracked memory footprint of a PagesSerde.

I see a few options to mitigate this problem:

  • Restrict the maximum size of serialized pages to set an upper bound on memory pressure
  • Restrict the maximum size of the compressionBuffer. If maxCompressedLength is greater than this limit, a new byte array is created and not stored for future use.
  • Add a method that allows callers to signal that the buffer should be released. This will require code changes for all usages of PagesSerde but would make it easier to account for query memory tracking.
  • Pass a shared buffer pool into the PagesSerde#serialize method. This would allow buffer reuse between instances (even fewer total allocations) and decouple the lifespan of the buffer from the lifespan of the PagesSerde.

@mbasmanova thoughts on the options mentioned above?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a legit point. "Restrict the maximum size" sounds a reasonable approach. Can we profile the distribution of maxCompressedLength in production? This can give us the insight to decide the max size we may need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pettyjamesm @highker I'm seeing that we limit the size of the pages being serialized to 1MB. Perhaps, we could add a checkArgument(page.getSizeInBytes() <= DEFAULT_MAX_PAGE_SIZE_IN_BYTES) to serialize method. What do you think?

public static final int DEFAULT_MAX_PAGE_SIZE_IN_BYTES = 1024 * 1024;

        List<SerializedPage> serializedPages = splitPage(page, DEFAULT_MAX_PAGE_SIZE_IN_BYTES).stream()
                .map(serde::serialize)
                .collect(toImmutableList());

Copy link
Contributor Author

@yingsu00 yingsu00 Aug 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi all, thanks for your inputs.

Currently PagesSerde objects are used by PartitionedOutputOperator, TaskOutputOperator, ExchangeOperator, MergeOperator and Query class. The first two operators deal with the serialization while the last 3 deals with deserialization and not in the scope of this PR. By looking at the usse cases of PartitionedOutputOperator and TaskOutputOperator, I don't think keeping the compression buffer as a member of PagesSerde a big problem:

  1. There is only one PagesSerde object per operator, not per table column or per destination partition. And there's only one compressionBuffer per PagesSerde.

  2. The sizes of the SerializedPage being compressed would be pretty much uniform, closing to DEFAULT_MAX_PAGE_SIZE_IN_BYTES (1MB by default). For the new OptimizedPartitionedOutputOperator this size is almost always close to DEFAULT_MAX_PAGE_SIZE_IN_BYTES. The old PartitionedOutputOperator has slightly higher variance but the size is still quite predictable. This is because we always accumulate data up to this size before the serialization and flushing happens, except for the last page. And the compressed size should be less than or equal to the uncompressed size, which is bounded by 1MB almost all time. There are only a fixed number of threads on each worker node and the overhead would be at the level of hundreds of MB per node.

  3. The PagesSerde objects only lives through these operators but not the global life time of Presto process. The PagesSerde objects and the compression buffer will be collected after the lifetime of these operators (stage execution) ends. If we don't keep the compression buffer as a member, the operators would nonetheless keep trying to allocate this 1MB buffer for every SerializedPage over and over again throughout the lifetime of the operator. while the proposed change only maintains this 1MB buffer during the same period of time. Sure the compression buffer becomes long lived memory and can't be GCed after the compression is done and before the operator dies, but overhead is small and predictable as we discussed in previous 2 points. I would expect the thousands or even millions of allocation of this buffer contribute to OOM problem more than what materializing this buffer does.

I agree we should track the memory in systemMemoryContext. I will make the change in next iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the limitation on size and the addition of adding memory tracking, I’m no longer especially concerned with the approach.

I’d rather see a buffer reuse abstraction passed, but that can come later. I’m still in favor of that strategy for a couple reasons:

  • it can easily extend the benefit to encryption as well as compression (assuming the encryption implementation in prestosql is ported back over)
  • it can reduce the total allocation rate by sharing and recycling buffers between instances as well as centrally manage amount of memory used
  • some deployments may be less interested in the performance benefit and more averse to the (admittedly low in this case) risk of cross-query data access. Those deployments could opt to use a “never reuse” pool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pettyjamesm By "buffer reuse abstraction passed", did you mean pass a shared buffer pool into the PagesSerde#serialize method? We have some work on the global array pool, and hopefully we'll have time to pick it up this half. For this PR, I can create a config property "experimental.repartition_reuse_compression_buffer" and default it to true.

However for this PR, there was no cross-query data access by reusing the compression buffer. The buffer is owned by the PagesSerde object, which is in term owned by the operator, and the operator is created per query and not reused by many queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that’s correct. I don’t think adding the config flag is necessary in this PR since the PagesSerDe instance has an implicit per query lifespan already, but it seems obvious that the caller has more context about the lifespan of buffers than embedding the buffer as a field as you’ve done here. For instance, the shared compression buffer needn’t make a copy out of the shared buffer in the case of say, spill to disk since the spiller knows that it can be released after being written to disk but the PagesSerDe doesn’t know that context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pettyjamesm. With the global array pool things will make more sense but that requires much more careful work. Back to this PR, do you think we need to do anything else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I’m satisfied with the current state of tracking the buffer memory and enforcing an upper bound on its size.


public PagesSerde(BlockEncodingSerde blockEncodingSerde, Optional<Compressor> compressor, Optional<Decompressor> decompressor, Optional<SpillCipher> spillCipher)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
Expand All @@ -62,15 +67,23 @@ public SerializedPage serialize(Page page)
SliceOutput serializationBuffer = new DynamicSliceOutput(toIntExact(page.getSizeInBytes() + Integer.BYTES)); // block length is an int
writeRawPage(page, serializationBuffer, blockEncodingSerde);
Slice slice = serializationBuffer.slice();

int uncompressedSize = serializationBuffer.size();
byte markers = PageCodecMarker.none();

if (compressor.isPresent()) {
ByteBuffer compressionBuffer = ByteBuffer.allocate(compressor.get().maxCompressedLength(uncompressedSize));
compressor.get().compress(slice.toByteBuffer(), compressionBuffer);
compressionBuffer.flip();
if ((((double) compressionBuffer.remaining()) / uncompressedSize) <= MINIMUM_COMPRESSION_RATIO) {
slice = Slices.wrappedBuffer(compressionBuffer);
int maxCompressedSize = compressor.get().maxCompressedLength(uncompressedSize);
compressionBuffer = ensureCapacity(compressionBuffer, maxCompressedSize);
int compressedSize = compressor.get().compress(
(byte[]) slice.getBase(),
(int) (slice.getAddress() - ARRAY_BYTE_BASE_OFFSET),
uncompressedSize,
compressionBuffer,
0,
maxCompressedSize);

if (compressedSize / (double) uncompressedSize <= MINIMUM_COMPRESSION_RATIO) {
slice = Slices.copyOf(Slices.wrappedBuffer(compressionBuffer, 0, compressedSize));
markers = COMPRESSED.set(markers);
}
}
Expand Down Expand Up @@ -114,4 +127,14 @@ public Page deserialize(SerializedPage serializedPage)

return readRawPage(serializedPage.getPositionCount(), slice.getInput(), blockEncodingSerde);
}

public long getSizeInBytes()
{
return compressionBuffer == null ? 0 : compressionBuffer.length;
}

public long getRetainedSizeInBytes()
{
return sizeOf(compressionBuffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public long getSizeInBytes()
{
// We use a foreach loop instead of streams
// as it has much better performance.
long sizeInBytes = 0;
long sizeInBytes = serde.getSizeInBytes();
for (PageBuilder pageBuilder : pageBuilders) {
sizeInBytes += pageBuilder.getSizeInBytes();
}
Expand All @@ -362,7 +362,7 @@ public long getSizeInBytes()
*/
public long getRetainedSizeInBytes()
{
long sizeInBytes = 0;
long sizeInBytes = serde.getRetainedSizeInBytes();
for (PageBuilder pageBuilder : pageBuilders) {
sizeInBytes += pageBuilder.getRetainedSizeInBytes();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import io.airlift.compress.Compressor;
import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;

import java.nio.ByteBuffer;
import java.util.Random;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

@State(Scope.Thread)
@OutputTimeUnit(MICROSECONDS)
@Fork(3)
@Warmup(iterations = 20, time = 500, timeUnit = MILLISECONDS)
@Measurement(iterations = 20, time = 500, timeUnit = MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
public class BenchmarkCompressToByteBuffer
{
@Benchmark
public void compressToByteBuffer(BenchmarkData data)
{
data.byteBuffer.mark();
data.COMPRESSOR.compress(data.slice.toByteBuffer(), data.byteBuffer);
data.byteBuffer.reset();
}

@Benchmark
public void compressToByteArray(BenchmarkData data)
{
data.COMPRESSOR.compress((byte[]) data.slice.getBase(), 0, data.slice.length(), data.bytes, 0, data.MAX_COMPRESSED_SIZE);
}

@State(Scope.Thread)
public static class BenchmarkData
{
private static final Random RANDOM = new Random(0);
private static final Compressor COMPRESSOR = new Lz4Compressor();
private static final int UNCOMPRESSED_SIZE = 1_000_000;
private static final int MAX_COMPRESSED_SIZE = COMPRESSOR.maxCompressedLength(UNCOMPRESSED_SIZE);

private final byte[] byteValues = new byte[UNCOMPRESSED_SIZE];
private final Slice slice = Slices.wrappedBuffer(byteValues);

private final byte[] bytes = new byte[MAX_COMPRESSED_SIZE];
private final ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_COMPRESSED_SIZE);

@Setup
public void setup()
{
// Generate discontinuous runs of random values and 0's to avoid LZ4 enters uncompressible fast-path
int runLength = UNCOMPRESSED_SIZE / 10;
byte[] randomBytes = new byte[runLength];
for (int i = 0; i < 10; i += 2) {
RANDOM.nextBytes(randomBytes);
System.arraycopy(randomBytes, 0, byteValues, i * runLength, runLength);
}
}
}

public static void main(String[] args)
throws RunnerException
{
Options options = new OptionsBuilder()
.verbosity(VerboseMode.NORMAL)
.include(".*" + BenchmarkCompressToByteBuffer.class.getSimpleName() + ".*")
.build();
new Runner(options).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
Expand Down Expand Up @@ -73,9 +74,9 @@

@State(Scope.Thread)
@OutputTimeUnit(MILLISECONDS)
@Fork(2)
@Warmup(iterations = 20, time = 500, timeUnit = MILLISECONDS)
@Measurement(iterations = 20, time = 500, timeUnit = MILLISECONDS)
@Fork(3)
@Warmup(iterations = 10, time = 500, timeUnit = MILLISECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = MILLISECONDS)
@BenchmarkMode(Mode.AverageTime)
public class BenchmarkPartitionedOutputOperator
{
Expand All @@ -101,6 +102,9 @@ public static class BenchmarkData
private static final ExecutorService EXECUTOR = newCachedThreadPool(daemonThreadsNamed("test-EXECUTOR-%s"));
private static final ScheduledExecutorService SCHEDULER = newScheduledThreadPool(1, daemonThreadsNamed("test-%s"));

@Param({"true", "false"})
private boolean enableCompression;

private final Page dataPage = createPage();

private int getPageCount()
Expand All @@ -116,7 +120,7 @@ public Page getDataPage()
private PartitionedOutputOperator createPartitionedOutputOperator()
{
PartitionFunction partitionFunction = new LocalPartitionGenerator(new InterpretedHashGenerator(ImmutableList.of(BIGINT), new int[] {0}), PARTITION_COUNT);
PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new BlockEncodingManager(new TypeRegistry()), false);
PagesSerdeFactory serdeFactory = new PagesSerdeFactory(new BlockEncodingManager(new TypeRegistry()), enableCompression);
OutputBuffers buffers = createInitialEmptyOutputBuffers(PARTITIONED);
for (int partition = 0; partition < PARTITION_COUNT; partition++) {
buffers = buffers.withBuffer(new OutputBuffers.OutputBufferId(partition), partition);
Expand Down