-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reuse compressionBuffer when serializing pages #13232
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in commit message: elpased -> elapsed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yingsu00 Nice.
CC: @pettyjamesm
if ((((double) compressionBuffer.remaining()) / uncompressedSize) <= MINIMUM_COMPRESSION_RATIO) { | ||
slice = Slices.wrappedBuffer(compressionBuffer); | ||
int maxCompressedSize = compressor.get().maxCompressedLength(uncompressedSize); | ||
ensureCompressionBufferCapacity(maxCompressedSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would you use Arrays.ensureCapacity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Masha, I couldn't find this method in Arrays. Did you mean ArrayList.ensureCapacity? We're not using ArrayList here though..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may need to rebase to latest master. The class is com.facebook.presto.orc.reader.Arrays
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. But this class is in presto-orc and PagesSerde is in presto-main and can't import that class. It seems this class could be moved to presto-spi or airlift-equivalent package since other operators might be able to use the methods in them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. How would you feel about moving it to presto-arrays?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually presto-arrays looks a better place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it to presto-arrays. #13284
Once it's in, I'll change the ensureCompressionBufferCapacity() call to use Arrays class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR to use Arrays.ensureCapacity()
presto-main/src/test/java/com/facebook/presto/operator/BenchmarkByteBuffer.java
Outdated
Show resolved
Hide resolved
@yingsu00 I feel like this change is worth mentioning in the release notes. Would you add a line in the PR's description? |
@Setup | ||
public void setup() | ||
{ | ||
RANDOM.nextBytes(byteValues); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth noting that using random bytes for the test data will trigger LZ4's "uncompressible fast-path", so it's likely your benchmark isn't accurately including normal compression performance characteristics.
Regarding byte[]
vs ByteBuffer
, I also found the performance to be comparable between the two and using either should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out the "uncompressible fast-path". I changed it to interleaved runs of random values and 0's and now the benchmark results are
Benchmark Mode Cnt Score Error Units
BenchmarkCompressToByteBuffer.compressToByteArray avgt 60 181.677 ± 9.927 us/op
BenchmarkCompressToByteBuffer.compressToByteBuffer avgt 60 189.371 ± 3.747 us/op
slice = Slices.wrappedBuffer(compressionBuffer); | ||
int maxCompressedSize = compressor.get().maxCompressedLength(uncompressedSize); | ||
ensureCompressionBufferCapacity(maxCompressedSize); | ||
int compressedSize = compressor.get().compress((byte[]) slice.getBase(), 0, uncompressedSize, compressionBuffer, 0, maxCompressedSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While using 0
as the offset may be safe in this particular context, you might consider changing it to this code so it's more resilient to future changes or someone copy/pasting this code without understanding it.
@@ -47,6 +47,8 @@ | |||
private final Optional<Decompressor> decompressor; | |||
private final Optional<SpillCipher> spillCipher; | |||
|
|||
private byte[] compressionBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My worry about this approach is basically the same as I mentioned on a previous PR: this buffer will grow to the compressor.maxCompressedLength(uncompressedSize)
of the largest input it receives and will not be eligible for garbage collection until the whole PagesSerde
instance is. This will be true for all PagesSerde
instances, so while adding this buffer reduces GC pressure by allocating less (and spending less time spent zeroing memory), it could risk increasing overall memory usage and cause out of memory problems when heap space is constrained.
This is less of a concern if all pages serialized are small, but serializing one large page will having a lasting effect on the untracked memory footprint of a PagesSerde.
I see a few options to mitigate this problem:
- Restrict the maximum size of serialized pages to set an upper bound on memory pressure
- Restrict the maximum size of the
compressionBuffer
. If maxCompressedLength is greater than this limit, a new byte array is created and not stored for future use. - Add a method that allows callers to signal that the buffer should be released. This will require code changes for all usages of
PagesSerde
but would make it easier to account for query memory tracking. - Pass a shared buffer pool into the
PagesSerde#serialize
method. This would allow buffer reuse between instances (even fewer total allocations) and decouple the lifespan of the buffer from the lifespan of thePagesSerde
.
@mbasmanova thoughts on the options mentioned above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a legit point. "Restrict the maximum size" sounds a reasonable approach. Can we profile the distribution of maxCompressedLength
in production? This can give us the insight to decide the max size we may need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pettyjamesm @highker I'm seeing that we limit the size of the pages being serialized to 1MB. Perhaps, we could add a checkArgument(page.getSizeInBytes() <= DEFAULT_MAX_PAGE_SIZE_IN_BYTES)
to serialize
method. What do you think?
public static final int DEFAULT_MAX_PAGE_SIZE_IN_BYTES = 1024 * 1024;
List<SerializedPage> serializedPages = splitPage(page, DEFAULT_MAX_PAGE_SIZE_IN_BYTES).stream()
.map(serde::serialize)
.collect(toImmutableList());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi all, thanks for your inputs.
Currently PagesSerde objects are used by PartitionedOutputOperator, TaskOutputOperator, ExchangeOperator, MergeOperator and Query class. The first two operators deal with the serialization while the last 3 deals with deserialization and not in the scope of this PR. By looking at the usse cases of PartitionedOutputOperator and TaskOutputOperator, I don't think keeping the compression buffer as a member of PagesSerde a big problem:
-
There is only one PagesSerde object per operator, not per table column or per destination partition. And there's only one compressionBuffer per PagesSerde.
-
The sizes of the SerializedPage being compressed would be pretty much uniform, closing to DEFAULT_MAX_PAGE_SIZE_IN_BYTES (1MB by default). For the new OptimizedPartitionedOutputOperator this size is almost always close to DEFAULT_MAX_PAGE_SIZE_IN_BYTES. The old PartitionedOutputOperator has slightly higher variance but the size is still quite predictable. This is because we always accumulate data up to this size before the serialization and flushing happens, except for the last page. And the compressed size should be less than or equal to the uncompressed size, which is bounded by 1MB almost all time. There are only a fixed number of threads on each worker node and the overhead would be at the level of hundreds of MB per node.
-
The PagesSerde objects only lives through these operators but not the global life time of Presto process. The PagesSerde objects and the compression buffer will be collected after the lifetime of these operators (stage execution) ends. If we don't keep the compression buffer as a member, the operators would nonetheless keep trying to allocate this 1MB buffer for every SerializedPage over and over again throughout the lifetime of the operator. while the proposed change only maintains this 1MB buffer during the same period of time. Sure the compression buffer becomes long lived memory and can't be GCed after the compression is done and before the operator dies, but overhead is small and predictable as we discussed in previous 2 points. I would expect the thousands or even millions of allocation of this buffer contribute to OOM problem more than what materializing this buffer does.
I agree we should track the memory in systemMemoryContext. I will make the change in next iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the limitation on size and the addition of adding memory tracking, I’m no longer especially concerned with the approach.
I’d rather see a buffer reuse abstraction passed, but that can come later. I’m still in favor of that strategy for a couple reasons:
- it can easily extend the benefit to encryption as well as compression (assuming the encryption implementation in prestosql is ported back over)
- it can reduce the total allocation rate by sharing and recycling buffers between instances as well as centrally manage amount of memory used
- some deployments may be less interested in the performance benefit and more averse to the (admittedly low in this case) risk of cross-query data access. Those deployments could opt to use a “never reuse” pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @pettyjamesm By "buffer reuse abstraction passed", did you mean pass a shared buffer pool into the PagesSerde#serialize method? We have some work on the global array pool, and hopefully we'll have time to pick it up this half. For this PR, I can create a config property "experimental.repartition_reuse_compression_buffer" and default it to true.
However for this PR, there was no cross-query data access by reusing the compression buffer. The buffer is owned by the PagesSerde object, which is in term owned by the operator, and the operator is created per query and not reused by many queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that’s correct. I don’t think adding the config flag is necessary in this PR since the PagesSerDe instance has an implicit per query lifespan already, but it seems obvious that the caller has more context about the lifespan of buffers than embedding the buffer as a field as you’ve done here. For instance, the shared compression buffer needn’t make a copy out of the shared buffer in the case of say, spill to disk since the spiller knows that it can be released after being written to disk but the PagesSerDe doesn’t know that context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pettyjamesm. With the global array pool things will make more sense but that requires much more careful work. Back to this PR, do you think we need to do anything else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, I’m satisfied with the current state of tracking the buffer memory and enforcing an upper bound on its size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some small comments, the only major concern is the memory hazard this shared buffer may pose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one comment only
presto-main/src/main/java/com/facebook/presto/execution/buffer/PagesSerde.java
Outdated
Show resolved
Hide resolved
@@ -47,6 +47,8 @@ | |||
private final Optional<Decompressor> decompressor; | |||
private final Optional<SpillCipher> spillCipher; | |||
|
|||
private byte[] compressionBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a legit point. "Restrict the maximum size" sounds a reasonable approach. Can we profile the distribution of maxCompressedLength
in production? This can give us the insight to decide the max size we may need.
f6039bb
to
e8429f7
Compare
BenchmarkPartitionedOutputOperator shows 8% gain in elapsed time: Always allocate compressionBuffer (existing behavior): Benchmark Mode Cnt Score Error Units BenchmarkPartitionedOutputOperator.addPage avgt 20 4749.447 ± 380.463 ms/op Reuse compressionBuffer (current commit): Benchmark Mode Cnt Score Error Units BenchmarkPartitionedOutputOperator.addPage avgt 30 4380.171 ± 148.011 ms/op BenchmarkCompressToByteBuffer shows using ByteBuffer and byte[] as compression buffer has similar performance. We used byte[] because the code is cleaner. Benchmark Mode Cnt Score Error Units BenchmarkCompressToByteBuffer.compressToByteArray avgt 60 181.677 ± 9.927 us/op BenchmarkCompressToByteBuffer.compressToByteBuffer avgt 60 189.371 ± 3.747 us/op
@mbasmanova Masha, I have rebased and updated this PR to use Arrays.ensureCapacity. Will you please take a look and merge it if you think it looks good? Thank you! |
@yingsu00 Thank you. |
BenchmarkPartitionedOutputOperator shows 8% gain in elapsed time:
Always allocate compressionBuffer (existing behavior):
Benchmark Mode Cnt Score Error Units
BenchmarkPartitionedOutputOperator.addPage avgt 20 4749.447 ± 380.463 ms/op
Reuse compressionBuffer (current commit):
Benchmark Mode Cnt Score Error Units
BenchmarkPartitionedOutputOperator.addPage avgt 30 4380.171 ± 148.011 ms/op
BenchmarkCompressToByteBuffer shows using ByteBuffer and byte[] as compression
buffer has similar performance. We used byte[] because the code is cleaner.
Benchmark Mode Cnt Score Error Units
BenchmarkCompressToByteBuffer.compressToByteArray avgt 60 181.677 ± 9.927 us/op
BenchmarkCompressToByteBuffer.compressToByteBuffer avgt 60 189.371 ± 3.747 us/op