Skip to content

Commit

Permalink
Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to co…
Browse files Browse the repository at this point in the history
…nsume the response (#620) (#623)

Signed-off-by: Andriy Redko <[email protected]>
(cherry picked from commit 86b6988)
  • Loading branch information
reta authored Sep 15, 2023
1 parent 03de404 commit ed66cf6
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Fixed
- Fix PutMappingRequest by removing unsupported fields ([#597](https://github.com/opensearch-project/opensearch-java/pull/597))
- [BUG] JarHell caused by latest software.amazon.awssdk 2.20.141 ([#616](https://github.com/opensearch-project/opensearch-java/pull/616))
- Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#620](https://github.com/opensearch-project/opensearch-java/pull/620))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,29 @@ protected void data(final ByteBuffer src, final boolean endOfStream) throws IOEx
return;
}

int len = src.limit();
if (len < 0) {
len = 4096;
} else if (len > bufferLimitBytes) {
throw new ContentTooLongException(
"entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]"
);
}

ByteArrayBuffer buffer = bufferRef.get();
if (buffer == null) {
buffer = new ByteArrayBuffer(bufferLimitBytes);
buffer = new ByteArrayBuffer(len);
if (bufferRef.compareAndSet(null, buffer) == false) {
buffer = bufferRef.get();
}
}

int len = src.limit();
if (buffer.length() + len > bufferLimitBytes) {
throw new ContentTooLongException(
"entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]"
);
}

if (len < 0) {
len = 4096;
}

if (src.hasArray()) {
buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
} else {
Expand Down Expand Up @@ -136,4 +140,12 @@ public void releaseResources() {
buffer = null;
}
}

/**
* Gets current byte buffer instance
* @return byte buffer instance
*/
ByteArrayBuffer getBuffer() {
return bufferRef.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.transport.httpclient5.internal;

import org.apache.hc.core5.http.ContentTooLongException;

import com.carrotsearch.randomizedtesting.RandomizedTest;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;

public class HeapBufferedAsyncEntityConsumerTest extends RandomizedTest {
private static final int BUFFER_LIMIT = 100 * 1024 * 1024 /* 100Mb */;
private HeapBufferedAsyncEntityConsumer consumer;

@Before
public void setUp() {
consumer = new HeapBufferedAsyncEntityConsumer(BUFFER_LIMIT);
}

@After
public void tearDown() {
consumer.releaseResources();
}

@Test
public void testConsumerAllocatesBufferLimit() throws IOException {
consumer.consume(randomByteBufferOfLength(1000).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(1000));
}

@Test
public void testConsumerAllocatesEmptyBuffer() throws IOException {
consumer.consume(ByteBuffer.allocate(0).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(0));
}

@Test
public void testConsumerExpandsBufferLimits() throws IOException {
consumer.consume(randomByteBufferOfLength(1000).flip());
consumer.consume(randomByteBufferOfLength(2000).flip());
consumer.consume(randomByteBufferOfLength(3000).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(6000));
}

@Test
public void testConsumerAllocatesLimit() throws IOException {
consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(BUFFER_LIMIT));
}

@Test
public void testConsumerFailsToAllocateOverLimit() throws IOException {
assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT + 1).flip()));
}

@Test
public void testConsumerFailsToExpandOverLimit() throws IOException {
consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip());
assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(1).flip()));
}

private static ByteBuffer randomByteBufferOfLength(int length) {
return ByteBuffer.allocate(length).put(randomBytesOfLength(length));
}
}

0 comments on commit ed66cf6

Please sign in to comment.