Skip to content

Commit

Permalink
Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to co…
Browse files Browse the repository at this point in the history
…nsume the response (opensearch-project#9993)

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored and vikasvb90 committed Oct 10, 2023
1 parent ac23601 commit 9937bfb
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Fixed
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
- Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
- Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,29 @@ protected void data(final ByteBuffer src, final boolean endOfStream) throws IOEx
return;
}

int len = src.limit();
if (len < 0) {
len = 4096;
} else if (len > bufferLimitBytes) {
throw new ContentTooLongException(
"entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]"
);
}

ByteArrayBuffer buffer = bufferRef.get();
if (buffer == null) {
buffer = new ByteArrayBuffer(bufferLimitBytes);
buffer = new ByteArrayBuffer(len);
if (bufferRef.compareAndSet(null, buffer) == false) {
buffer = bufferRef.get();
}
}

int len = src.limit();
if (buffer.length() + len > bufferLimitBytes) {
throw new ContentTooLongException(
"entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]"
);
}

if (len < 0) {
len = 4096;
}

if (src.hasArray()) {
buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
} else {
Expand Down Expand Up @@ -136,4 +140,12 @@ public void releaseResources() {
buffer = null;
}
}

/**
* Gets current byte buffer instance
* @return byte buffer instance
*/
ByteArrayBuffer getBuffer() {
return bufferRef.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.nio;

import org.apache.hc.core5.http.ContentTooLongException;
import org.opensearch.client.RestClientTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.ByteBuffer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;

public class HeapBufferedAsyncEntityConsumerTests extends RestClientTestCase {
private static final int BUFFER_LIMIT = 100 * 1024 * 1024 /* 100Mb */;
private HeapBufferedAsyncEntityConsumer consumer;

@Before
public void setUp() {
consumer = new HeapBufferedAsyncEntityConsumer(BUFFER_LIMIT);
}

@After
public void tearDown() {
consumer.releaseResources();
}

public void testConsumerAllocatesBufferLimit() throws IOException {
consumer.consume(randomByteBufferOfLength(1000).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(1000));
}

public void testConsumerAllocatesEmptyBuffer() throws IOException {
consumer.consume(ByteBuffer.allocate(0).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(0));
}

public void testConsumerExpandsBufferLimits() throws IOException {
consumer.consume(randomByteBufferOfLength(1000).flip());
consumer.consume(randomByteBufferOfLength(2000).flip());
consumer.consume(randomByteBufferOfLength(3000).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(6000));
}

public void testConsumerAllocatesLimit() throws IOException {
consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(BUFFER_LIMIT));
}

public void testConsumerFailsToAllocateOverLimit() throws IOException {
assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT + 1).flip()));
}

public void testConsumerFailsToExpandOverLimit() throws IOException {
consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip());
assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(1).flip()));
}

private static ByteBuffer randomByteBufferOfLength(int length) {
return ByteBuffer.allocate(length).put(randomBytesOfLength(length));
}
}

0 comments on commit 9937bfb

Please sign in to comment.