diff --git a/CHANGELOG.md b/CHANGELOG.md index 7700287009..59bd8831f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Fixed - [BUG] JarHell caused by latest software.amazon.awssdk 2.20.141 ([#616](https://github.com/opensearch-project/opensearch-java/pull/616)) +- Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#620](https://github.com/opensearch-project/opensearch-java/pull/620)) ### Security diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java index aa4d73a237..e500cd9466 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java @@ -86,25 +86,29 @@ protected void data(final ByteBuffer src, final boolean endOfStream) throws IOEx return; } + int len = src.limit(); + if (len < 0) { + len = 4096; + } else if (len > bufferLimitBytes) { + throw new ContentTooLongException( + "entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]" + ); + } + ByteArrayBuffer buffer = bufferRef.get(); if (buffer == null) { - buffer = new ByteArrayBuffer(bufferLimitBytes); + buffer = new ByteArrayBuffer(len); if (bufferRef.compareAndSet(null, buffer) == false) { buffer = bufferRef.get(); } } - int len = src.limit(); if (buffer.length() + len > bufferLimitBytes) { throw new ContentTooLongException( "entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]" ); } - if (len < 0) { - len = 4096; - } - if (src.hasArray()) { buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining()); } else { @@ -136,4 +140,12 @@ public void releaseResources() { buffer = null; } } + + /** + * Gets current byte buffer instance + * @return byte buffer instance + */ + ByteArrayBuffer getBuffer() { + return bufferRef.get(); + } } diff --git a/java-client/src/test/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumerTest.java b/java-client/src/test/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumerTest.java new file mode 100644 index 0000000000..9da747659a --- /dev/null +++ b/java-client/src/test/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumerTest.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.ContentTooLongException; + +import com.carrotsearch.randomizedtesting.RandomizedTest; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class HeapBufferedAsyncEntityConsumerTest extends RandomizedTest { + private static final int BUFFER_LIMIT = 100 * 1024 * 1024 /* 100Mb */; + private HeapBufferedAsyncEntityConsumer consumer; + + @Before + public void setUp() { + consumer = new HeapBufferedAsyncEntityConsumer(BUFFER_LIMIT); + } + + @After + public void tearDown() { + consumer.releaseResources(); + } + + @Test + public void testConsumerAllocatesBufferLimit() throws IOException { + consumer.consume(randomByteBufferOfLength(1000).flip()); + assertThat(consumer.getBuffer().capacity(), equalTo(1000)); + } + + @Test + public void testConsumerAllocatesEmptyBuffer() throws IOException { + consumer.consume(ByteBuffer.allocate(0).flip()); + assertThat(consumer.getBuffer().capacity(), equalTo(0)); + } + + @Test + public void testConsumerExpandsBufferLimits() throws IOException { + consumer.consume(randomByteBufferOfLength(1000).flip()); + consumer.consume(randomByteBufferOfLength(2000).flip()); + consumer.consume(randomByteBufferOfLength(3000).flip()); + assertThat(consumer.getBuffer().capacity(), equalTo(6000)); + } + + @Test + public void testConsumerAllocatesLimit() throws IOException { + consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip()); + assertThat(consumer.getBuffer().capacity(), equalTo(BUFFER_LIMIT)); + } + + @Test + public void testConsumerFailsToAllocateOverLimit() throws IOException { + assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT + 1).flip())); + } + + @Test + public void testConsumerFailsToExpandOverLimit() throws IOException { + consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip()); + assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(1).flip())); + } + + private static ByteBuffer randomByteBufferOfLength(int length) { + return ByteBuffer.allocate(length).put(randomBytesOfLength(length)); + } +}