diff --git a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java
index 45c41eda4977..001cb9abcc10 100644
--- a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java
+++ b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java
@@ -109,7 +109,7 @@ public RetainableByteBuffer decode(ByteBuffer compressed)
RetainableByteBuffer result = acquire(length);
for (RetainableByteBuffer buffer : _inflateds)
{
- BufferUtil.append(result.getByteBuffer(), buffer.getByteBuffer());
+ result.append(buffer);
buffer.release();
}
_inflateds.clear();
diff --git a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java
index da534a781aaa..9a86f5623eae 100644
--- a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java
+++ b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java
@@ -1114,7 +1114,7 @@ else if (type != HttpTokens.Type.SPACE && type != HttpTokens.Type.HTAB)
if (state == State.EPILOGUE)
notifyComplete();
else
- throw new EOFException("unexpected EOF");
+ throw new EOFException("unexpected EOF in " + state);
}
}
catch (Throwable x)
diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
index d3425e341dcd..ac383bec0bb8 100644
--- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
+++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java
@@ -478,6 +478,12 @@ public boolean canRetain()
return retainable.canRetain();
}
+ @Override
+ public boolean isRetained()
+ {
+ return retainable.isRetained();
+ }
+
@Override
public void retain()
{
@@ -541,7 +547,7 @@ public boolean release()
private void put(ByteBuffer source)
{
- BufferUtil.append(delegate.getByteBuffer(), source);
+ delegate.append(source);
}
}
}
diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java
index dc3ecf7d5275..ccc5892f49a2 100644
--- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java
+++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java
@@ -438,7 +438,7 @@ public default void onClosed(Stream stream)
/**
*
A {@link Retainable} wrapper of a {@link DataFrame}.
*/
- public abstract static class Data implements Retainable
+ abstract class Data implements Retainable
{
public static Data eof(int streamId)
{
diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java
index 711e705ba55d..c58570c1459b 100644
--- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java
+++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java
@@ -44,7 +44,6 @@
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ArrayByteBufferPool;
-import org.eclipse.jetty.io.ByteBufferAggregator;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
@@ -63,8 +62,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RawHTTP2ProxyTest
@@ -245,7 +242,7 @@ public void onDataAvailable(Stream stream)
CountDownLatch latch1 = new CountDownLatch(1);
Stream stream1 = clientSession.newStream(new HeadersFrame(request1, null, false), new Stream.Listener()
{
- private final ByteBufferAggregator aggregator = new ByteBufferAggregator(client.getByteBufferPool(), true, data1.length, data1.length * 2);
+ private final RetainableByteBuffer.Aggregator aggregator = new RetainableByteBuffer.Aggregator(client.getByteBufferPool(), true, data1.length * 2);
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
@@ -262,17 +259,15 @@ public void onDataAvailable(Stream stream)
DataFrame frame = data.frame();
if (LOGGER.isDebugEnabled())
LOGGER.debug("CLIENT1 received {}", frame);
- assertFalse(aggregator.aggregate(frame.getByteBuffer()));
+ assertTrue(aggregator.append(frame.getByteBuffer()));
data.release();
if (!data.frame().isEndStream())
{
stream.demand();
return;
}
- RetainableByteBuffer buffer = aggregator.takeRetainableByteBuffer();
- assertNotNull(buffer);
- assertEquals(buffer1.slice(), buffer.getByteBuffer());
- buffer.release();
+ assertEquals(buffer1.slice(), aggregator.getByteBuffer());
+ aggregator.release();
latch1.countDown();
}
}).get(5, TimeUnit.SECONDS);
diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3StreamConnection.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3StreamConnection.java
index 29542da3d1eb..d311d2b6f038 100644
--- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3StreamConnection.java
+++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3StreamConnection.java
@@ -445,6 +445,12 @@ public boolean canRetain()
return retainable.canRetain();
}
+ @Override
+ public boolean isRetained()
+ {
+ return retainable.isRetained();
+ }
+
@Override
public void retain()
{
diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java
index d2e83791404a..9953e2ea04e8 100644
--- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java
+++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java
@@ -384,7 +384,7 @@ public default void onFailure(Stream.Server stream, long error, Throwable failur
*
* @see Stream#readData()
*/
- public abstract static class Data implements Retainable
+ abstract class Data implements Retainable
{
public static final Data EOF = new EOFData();
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
index 0fab64e92ae3..71aeda393a56 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
@@ -666,9 +666,19 @@ public Tracking(int minCapacity, int maxCapacity, int maxBucketSize)
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
}
+ public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
+ {
+ this(minCapacity, factor, maxCapacity, maxBucketSize, 0L, 0L);
+ }
+
public Tracking(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
- super(minCapacity, -1, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
+ this(minCapacity, -1, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
+ }
+
+ public Tracking(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
+ {
+ super(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory);
}
@Override
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java
index 2096be6e91bb..8bfdfe5a15ba 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAccumulator.java
@@ -28,9 +28,9 @@
*
* The method {@link #ensureBuffer(int, int)} is used to write directly to the last buffer stored in the buffer list,
* if there is less than a certain amount of space available in that buffer then a new one will be allocated and returned instead.
- * @see #ensureBuffer(int, int)
+ * @deprecated use {@link RetainableByteBuffer.Accumulator}
*/
-// TODO: rename to *Aggregator to avoid confusion with RBBP.Accumulator?
+@Deprecated(forRemoval = true)
public class ByteBufferAccumulator implements AutoCloseable
{
private final List _buffers = new ArrayList<>();
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java
index 1dd074703e25..7914c6792128 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferAggregator.java
@@ -26,7 +26,9 @@
* Once the buffer is full, the aggregator will not aggregate any more bytes until its buffer is taken out,
* after which a new aggregate/take buffer cycle can start.
*
The buffers are taken from the supplied {@link ByteBufferPool} or freshly allocated if one is not supplied.
+ * @deprecated Use {@link ByteBufferAccumulator}
*/
+@Deprecated
public class ByteBufferAggregator
{
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferAggregator.class);
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java
index 04df7e921ebe..4d24091dea2c 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferCallbackAccumulator.java
@@ -25,7 +25,9 @@
* these into a single {@link ByteBuffer} or byte array and succeed the callbacks.
*
*
This class is not thread safe and callers must do mutual exclusion.
+ * @deprecated use {@link ByteBufferAccumulator}
*/
+@Deprecated
public class ByteBufferCallbackAccumulator
{
private final List _entries = new ArrayList<>();
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
index f9d6965c2bc6..a46ea1f23dc0 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
@@ -127,12 +127,15 @@ private Buffer(ByteBuffer byteBuffer)
/**
*
Accumulates a sequence of {@link RetainableByteBuffer} that
- * are typically created during the generation of protocol bytes.
+ * are typically created during the generation of protocol bytes.
+ * The accumulated buffers are then used individually rather than
+ * as a single buffer (like {@link RetainableByteBuffer.Accumulator}.
*
{@code RetainableByteBuffer}s can be either
* {@link #append(RetainableByteBuffer) appended} to the sequence,
* or {@link #insert(int, RetainableByteBuffer) inserted} at a
* specific position in the sequence, and then
* {@link #release() released} when they are consumed.
+ * @see RetainableByteBuffer.Accumulator
*/
class Accumulator
{
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java
index 0358f8583969..7adad1f6bc1e 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ChunkAccumulator.java
@@ -27,7 +27,9 @@
/**
* An accumulator of {@link Content.Chunk}s used to facilitate minimal copy
* aggregation of multiple chunks.
+ * @deprecated use {@link Content.Source#asRetainableByteBuffer(Content.Source, ByteBufferPool, boolean, int)} instead.
*/
+@Deprecated
public class ChunkAccumulator
{
private static final ByteBufferPool NON_POOLING = new ByteBufferPool.NonPooling();
@@ -109,7 +111,7 @@ public RetainableByteBuffer take(ByteBufferPool pool, boolean direct)
for (Chunk chunk : _chunks)
{
offset += chunk.remaining();
- BufferUtil.append(buffer.getByteBuffer(), chunk.getByteBuffer());
+ buffer.append(chunk);
chunk.release();
}
assert offset == _length;
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
index 0a6fe5921c98..7e4dc5507d1f 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java
@@ -13,6 +13,7 @@
package org.eclipse.jetty.io;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -33,6 +34,7 @@
import org.eclipse.jetty.io.internal.ContentCopier;
import org.eclipse.jetty.io.internal.ContentSourceByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceConsumer;
+import org.eclipse.jetty.io.internal.ContentSourceRetainableByteBuffer;
import org.eclipse.jetty.io.internal.ContentSourceString;
import org.eclipse.jetty.util.Blocker;
import org.eclipse.jetty.util.BufferUtil;
@@ -192,7 +194,7 @@ static ByteBuffer asByteBuffer(Source source) throws IOException
*/
static CompletableFuture asByteArrayAsync(Source source, int maxSize)
{
- return new ChunkAccumulator().readAll(source, maxSize);
+ return asRetainableByteBuffer(source, null, false, maxSize).thenApply(rbb -> rbb.getByteBuffer().array());
}
/**
@@ -230,7 +232,31 @@ static CompletableFuture asByteBufferAsync(Source source, int maxSiz
*/
static CompletableFuture asRetainableByteBuffer(Source source, ByteBufferPool pool, boolean direct, int maxSize)
{
- return new ChunkAccumulator().readAll(source, pool, direct, maxSize);
+ Promise.Completable promise = new Promise.Completable<>()
+ {
+ @Override
+ public void succeeded(RetainableByteBuffer result)
+ {
+ result.retain();
+ super.succeeded(result);
+ }
+ };
+ asRetainableByteBuffer(source, pool, direct, maxSize, promise);
+ return promise;
+ }
+
+ /**
+ *
Reads, non-blocking, the whole content source into a {@link RetainableByteBuffer}.
+ *
+ * @param source the source to read
+ * @param pool The {@link ByteBufferPool} to acquire the buffer from, or null for a non {@link Retainable} buffer
+ * @param direct True if the buffer should be direct.
+ * @param maxSize The maximum size to read, or -1 for no limit
+ * @param promise the promise to notify when the whole content has been read into a RetainableByteBuffer.
+ */
+ static void asRetainableByteBuffer(Source source, ByteBufferPool pool, boolean direct, int maxSize, Promise promise)
+ {
+ new ContentSourceRetainableByteBuffer(source, pool, direct, maxSize, promise).run();
}
/**
@@ -470,6 +496,43 @@ default boolean rewind()
*/
public interface Sink
{
+ /**
+ *
Wraps the given {@link OutputStream} as a {@link Sink}.
+ * @param out The stream to wrap
+ * @return A sink wrapping the stream
+ */
+ static Sink from(OutputStream out)
+ {
+ return new Sink()
+ {
+ boolean closed;
+
+ @Override
+ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
+ {
+ if (closed)
+ {
+ callback.failed(new EOFException());
+ return;
+ }
+ try
+ {
+ BufferUtil.writeTo(byteBuffer, out);
+ if (last)
+ {
+ closed = true;
+ out.close();
+ }
+ callback.succeeded();
+ }
+ catch (Throwable t)
+ {
+ callback.failed(t);
+ }
+ }
+ };
+ }
+
/**
*
Wraps the given content sink with a buffering sink.
*
@@ -561,7 +624,7 @@ static void write(Sink sink, boolean last, String utf8Content, Callback callback
* to release the {@code ByteBuffer} back into a pool), or the
* {@link #release()} method overridden.
*/
- public interface Chunk extends Retainable
+ public interface Chunk extends RetainableByteBuffer
{
/**
*
An empty, non-last, chunk.
@@ -804,11 +867,6 @@ static boolean isFailure(Chunk chunk, boolean last)
return chunk != null && chunk.getFailure() != null && chunk.isLast() == last;
}
- /**
- * @return the ByteBuffer of this Chunk
- */
- ByteBuffer getByteBuffer();
-
/**
* Get a failure (which may be from a {@link Source#fail(Throwable) failure} or
* a {@link Source#fail(Throwable, boolean) warning}), if any, associated with the chunk.
@@ -831,68 +889,6 @@ default Throwable getFailure()
*/
boolean isLast();
- /**
- * @return the number of bytes remaining in this Chunk
- */
- default int remaining()
- {
- return getByteBuffer().remaining();
- }
-
- /**
- * @return whether this Chunk has remaining bytes
- */
- default boolean hasRemaining()
- {
- return getByteBuffer().hasRemaining();
- }
-
- /**
- *
Copies the bytes from this Chunk to the given byte array.
- *
- * @param bytes the byte array to copy the bytes into
- * @param offset the offset within the byte array
- * @param length the maximum number of bytes to copy
- * @return the number of bytes actually copied
- */
- default int get(byte[] bytes, int offset, int length)
- {
- ByteBuffer b = getByteBuffer();
- if (b == null || !b.hasRemaining())
- return 0;
- length = Math.min(length, b.remaining());
- b.get(bytes, offset, length);
- return length;
- }
-
- /**
- *
Skips, advancing the ByteBuffer position, the given number of bytes.
- *
- * @param length the maximum number of bytes to skip
- * @return the number of bytes actually skipped
- */
- default int skip(int length)
- {
- if (length == 0)
- return 0;
- ByteBuffer byteBuffer = getByteBuffer();
- length = Math.min(byteBuffer.remaining(), length);
- byteBuffer.position(byteBuffer.position() + length);
- return length;
- }
-
- /**
- * @return an immutable version of this Chunk
- */
- default Chunk asReadOnly()
- {
- if (getByteBuffer().isReadOnly())
- return this;
- if (canRetain())
- return asChunk(getByteBuffer().asReadOnlyBuffer(), isLast(), this);
- return from(getByteBuffer().asReadOnlyBuffer(), isLast());
- }
-
/**
*
Implementations of this interface may process {@link Chunk}s being copied by the
* {@link Content#copy(Source, Sink, Processor, Callback)} method, so that
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java
index 0e32781f6279..f9b1ab6bf1e4 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Retainable.java
@@ -62,6 +62,15 @@ default boolean canRetain()
return false;
}
+ /**
+ * @return whether this instance is retained
+ * @see ReferenceCounter#isRetained()
+ */
+ default boolean isRetained()
+ {
+ return false;
+ }
+
/**
*
Retains this resource, potentially incrementing a reference count if there are resources that will be released.
A pooled {@link ByteBuffer} which maintains a reference count that is
@@ -84,7 +89,7 @@ public ByteBuffer getByteBuffer()
@Override
public boolean isRetained()
{
- throw new UnsupportedOperationException();
+ return retainable.isRetained();
}
@Override
@@ -108,10 +113,31 @@ public boolean release()
}
/**
- * @return whether this instance is retained
- * @see ReferenceCounter#isRetained()
+ *
Returns a {@code RetainableByteBuffer} that wraps
+ * the given {@code ByteBuffer} and {@link Runnable} releaser.
+ *
+ * @param byteBuffer the {@code ByteBuffer} to wrap
+ * @param releaser a {@link Runnable} to call when the buffer is released.
+ * @return a {@code RetainableByteBuffer}
*/
- boolean isRetained();
+ static RetainableByteBuffer wrap(ByteBuffer byteBuffer, Runnable releaser)
+ {
+ return new AbstractRetainableByteBuffer(byteBuffer)
+ {
+ {
+ acquire();
+ }
+
+ @Override
+ public boolean release()
+ {
+ boolean released = super.release();
+ if (released)
+ releaser.run();
+ return released;
+ }
+ };
+ }
/**
* Get the wrapped, not {@code null}, {@code ByteBuffer}.
@@ -119,6 +145,22 @@ public boolean release()
*/
ByteBuffer getByteBuffer();
+ /**
+ * Creates a copy of this RetainableByteBuffer that is entirely independent, but
+ * backed by the same memory space, i.e.: modifying the ByteBuffer of the original
+ * also modifies the ByteBuffer of the copy and vice-versa.
+ * @return A copy of this RetainableByteBuffer
+ */
+ default RetainableByteBuffer copy()
+ {
+ return new AbstractRetainableByteBuffer(BufferUtil.copy(getByteBuffer()))
+ {
+ {
+ acquire();
+ }
+ };
+ }
+
/**
* @return whether the {@code ByteBuffer} is direct
*/
@@ -152,6 +194,23 @@ default int capacity()
}
/**
+ * @return the number of bytes left for appending in the {@code ByteBuffer}
+ */
+ default int space()
+ {
+ return capacity() - remaining();
+ }
+
+ /**
+ * @return whether the {@code ByteBuffer} has remaining bytes left for appending
+ */
+ default boolean isFull()
+ {
+ return space() == 0;
+ }
+
+ /**
+ * Clears the contained byte buffer to be empty in flush mode.
* @see BufferUtil#clear(ByteBuffer)
*/
default void clear()
@@ -159,6 +218,477 @@ default void clear()
BufferUtil.clear(getByteBuffer());
}
+ /**
+ * Copies the contents of the given byte buffer at the end of this buffer.
+ * @param bytes the byte buffer to copy from.
+ * @return true if all bytes of the given buffer were copied, false otherwise.
+ * @throws ReadOnlyBufferException if the buffer is read only
+ * @see BufferUtil#append(ByteBuffer, ByteBuffer)
+ */
+ default boolean append(ByteBuffer bytes) throws ReadOnlyBufferException
+ {
+ BufferUtil.append(getByteBuffer(), bytes);
+ return !bytes.hasRemaining();
+ }
+
+ /**
+ * Copies the contents of the given retainable byte buffer at the end of this buffer.
+ * @param bytes the retainable byte buffer to copy from.
+ * @return true if all bytes of the given buffer were copied, false otherwise.
+ * @throws ReadOnlyBufferException if the buffer is read only
+ * @see BufferUtil#append(ByteBuffer, ByteBuffer)
+ */
+ default boolean append(RetainableByteBuffer bytes) throws ReadOnlyBufferException
+ {
+ return bytes.remaining() == 0 || append(bytes.getByteBuffer());
+ }
+
+ /**
+ *
Copies the bytes from this Chunk to the given byte array.
+ *
+ * @param bytes the byte array to copy the bytes into
+ * @param offset the offset within the byte array
+ * @param length the maximum number of bytes to copy
+ * @return the number of bytes actually copied
+ */
+ default int get(byte[] bytes, int offset, int length)
+ {
+ ByteBuffer b = getByteBuffer();
+ if (b == null || !b.hasRemaining())
+ return 0;
+ length = Math.min(length, b.remaining());
+ b.get(bytes, offset, length);
+ return length;
+ }
+
+ /**
+ *
Skips, advancing the ByteBuffer position, the given number of bytes.
+ *
+ * @param length the maximum number of bytes to skip
+ * @return the number of bytes actually skipped
+ */
+ default int skip(int length)
+ {
+ if (length == 0)
+ return 0;
+ ByteBuffer byteBuffer = getByteBuffer();
+ length = Math.min(byteBuffer.remaining(), length);
+ byteBuffer.position(byteBuffer.position() + length);
+ return length;
+ }
+
+ /**
+ * Copies the contents of this retainable byte buffer at the end of the given byte buffer.
+ * @param toInfillMode the destination buffer.
+ * @see ByteBuffer#put(ByteBuffer)
+ */
+ default void putTo(ByteBuffer toInfillMode)
+ {
+ toInfillMode.put(getByteBuffer());
+ }
+
+ /**
+ * Asynchronously copies the contents of this retainable byte buffer into given sink.
+ * @param sink the destination sink.
+ * @param last true if this is the last write.
+ * @param callback the callback to call upon the write completion.
+ * @see org.eclipse.jetty.io.Content.Sink#write(boolean, ByteBuffer, Callback)
+ */
+ default void writeTo(Content.Sink sink, boolean last, Callback callback)
+ {
+ sink.write(last, getByteBuffer(), callback);
+ }
+
+ /**
+ * An aggregating {@link RetainableByteBuffer} that may grow when content is appended to it.
+ */
+ class Aggregator implements RetainableByteBuffer
+ {
+ private final ByteBufferPool _pool;
+ private final boolean _direct;
+ private final int _growBy;
+ private final int _maxCapacity;
+ private RetainableByteBuffer _buffer;
+
+ /**
+ * Construct an aggregating {@link RetainableByteBuffer} that may grow when content is appended to it.
+ * {@link RetainableByteBuffer}s with zero-copy if the {@link #append(RetainableByteBuffer)} API is used
+ * @param pool The pool from which to allocate buffers
+ * @param direct true if direct buffers should be used
+ * @param maxCapacity The maximum requested length of the accumulated buffers or -1 for 2GB limit.
+ * Note that the pool may provide a buffer that exceeds this capacity.
+ */
+ public Aggregator(ByteBufferPool pool, boolean direct, int maxCapacity)
+ {
+ this(pool, direct, -1, maxCapacity);
+ }
+
+ /**
+ * Construct an aggregating {@link RetainableByteBuffer} that may grow when content is appended to it.
+ * {@link RetainableByteBuffer}s with zero-copy if the {@link #append(RetainableByteBuffer)} API is used
+ * @param pool The pool from which to allocate buffers
+ * @param direct true if direct buffers should be used
+ * @param growBy the size to grow the buffer by or <= 0 for a heuristic
+ * @param maxCapacity The maximum requested length of the accumulated buffers or -1 for 2GB limit.
+ * Note that the pool may provide a buffer that exceeds this capacity.
+ */
+ public Aggregator(ByteBufferPool pool, boolean direct, int growBy, int maxCapacity)
+ {
+ _pool = pool == null ? new ByteBufferPool.NonPooling() : pool;
+ _direct = direct;
+ _maxCapacity = maxCapacity <= 0 ? Integer.MAX_VALUE : maxCapacity;
+
+ if (growBy <= 0)
+ {
+ _buffer = _pool.acquire(Math.min(1024, _maxCapacity), _direct);
+ _growBy = Math.min(_maxCapacity, _buffer.capacity());
+ }
+ else
+ {
+ if (growBy > _maxCapacity)
+ throw new IllegalArgumentException("growBy(%d) must be <= maxCapacity(%d)".formatted(growBy, _maxCapacity));
+
+ _growBy = growBy;
+ _buffer = _pool.acquire(_growBy, _direct);
+ }
+ }
+
+ @Override
+ public void clear()
+ {
+ if (isRetained())
+ {
+ _buffer.release();
+ _buffer = _pool.acquire(_growBy, _direct);
+ }
+ else
+ {
+ BufferUtil.clear(_buffer.getByteBuffer());
+ }
+ }
+
+ @Override
+ public boolean canRetain()
+ {
+ return _buffer.canRetain();
+ }
+
+ @Override
+ public boolean isRetained()
+ {
+ return _buffer.isRetained();
+ }
+
+ @Override
+ public void retain()
+ {
+ _buffer.retain();
+ }
+
+ @Override
+ public boolean release()
+ {
+ return _buffer.release();
+ }
+
+ @Override
+ public ByteBuffer getByteBuffer()
+ {
+ return _buffer.getByteBuffer();
+ }
+
+ @Override
+ public RetainableByteBuffer copy()
+ {
+ RetainableByteBuffer buffer = _buffer;
+ buffer.retain();
+ return new AbstractRetainableByteBuffer(buffer.getByteBuffer().slice())
+ {
+ {
+ acquire();
+ }
+
+ @Override
+ public boolean release()
+ {
+ if (super.release())
+ {
+ buffer.release();
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public int capacity()
+ {
+ return Math.max(_buffer.capacity(), _maxCapacity);
+ }
+
+ @Override
+ public boolean append(ByteBuffer bytes)
+ {
+ ensureSpace(bytes.remaining());
+ return RetainableByteBuffer.super.append(bytes);
+ }
+
+ @Override
+ public boolean append(RetainableByteBuffer bytes)
+ {
+ ensureSpace(bytes.remaining());
+ return RetainableByteBuffer.super.append(bytes);
+ }
+
+ private void ensureSpace(int spaceNeeded)
+ {
+ int capacity = _buffer.capacity();
+ int space = capacity - _buffer.remaining();
+ if (spaceNeeded <= space || capacity >= _maxCapacity)
+ return;
+
+ int newCapacity = Math.multiplyExact(1 + Math.addExact(capacity, spaceNeeded) / _growBy, _growBy);
+ if (newCapacity > _maxCapacity)
+ {
+ newCapacity = Math.addExact(capacity, spaceNeeded - space);
+ if (newCapacity > _maxCapacity)
+ newCapacity = _maxCapacity;
+ }
+
+ RetainableByteBuffer ensured = _pool.acquire(newCapacity, _direct);
+ ensured.append(_buffer);
+ _buffer.release();
+ _buffer = ensured;
+ }
+ }
+
+ /**
+ * An accumulating {@link RetainableByteBuffer} that may internally accumulate multiple other
+ * {@link RetainableByteBuffer}s with zero-copy if the {@link #append(RetainableByteBuffer)} API is used
+ */
+ class Accumulator implements RetainableByteBuffer
+ {
+ private final ReferenceCounter _retainable = new ReferenceCounter(1);
+ private final ByteBufferPool _pool;
+ private final boolean _direct;
+ private final long _maxLength;
+ private final List _buffers = new ArrayList<>();
+
+ /**
+ * Construct an accumulating {@link RetainableByteBuffer} that may internally accumulate multiple other
+ * {@link RetainableByteBuffer}s with zero-copy if the {@link #append(RetainableByteBuffer)} API is used
+ * @param pool The pool from which to allocate buffers
+ * @param direct true if direct buffers should be used
+ * @param maxLength The maximum length of the accumulated buffers or -1 for 2GB limit
+ */
+ public Accumulator(ByteBufferPool pool, boolean direct, long maxLength)
+ {
+ _pool = pool == null ? new ByteBufferPool.NonPooling() : pool;
+ _direct = direct;
+ _maxLength = maxLength < 0 ? Long.MAX_VALUE : maxLength;
+ }
+
+ @Override
+ public ByteBuffer getByteBuffer()
+ {
+ return switch (_buffers.size())
+ {
+ case 0 -> RetainableByteBuffer.EMPTY.getByteBuffer();
+ case 1 -> _buffers.get(0).getByteBuffer();
+ default ->
+ {
+ RetainableByteBuffer combined = copy(true);
+ _buffers.add(combined);
+ yield combined.getByteBuffer();
+ }
+ };
+ }
+
+ @Override
+ public RetainableByteBuffer copy()
+ {
+ return copy(false);
+ }
+
+ private RetainableByteBuffer copy(boolean take)
+ {
+ int length = remaining();
+ RetainableByteBuffer combinedBuffer = _pool.acquire(length, _direct);
+ ByteBuffer byteBuffer = combinedBuffer.getByteBuffer();
+ BufferUtil.flipToFill(byteBuffer);
+ for (RetainableByteBuffer buffer : _buffers)
+ {
+ byteBuffer.put(buffer.getByteBuffer().slice());
+ if (take)
+ buffer.release();
+ }
+ BufferUtil.flipToFlush(byteBuffer, 0);
+ if (take)
+ _buffers.clear();
+ return combinedBuffer;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @return {@link Integer#MAX_VALUE} if the length of this {@code Accumulator} is greater than {@link Integer#MAX_VALUE}
+ */
+ @Override
+ public int remaining()
+ {
+ long remainingLong = remainingLong();
+ return remainingLong > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.toIntExact(remainingLong);
+ }
+
+ public long remainingLong()
+ {
+ long length = 0;
+ for (RetainableByteBuffer buffer : _buffers)
+ length += buffer.remaining();
+ return length;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @return {@link Integer#MAX_VALUE} if the maxLength of this {@code Accumulator} is greater than {@link Integer#MAX_VALUE}.
+ */
+ @Override
+ public int capacity()
+ {
+ long capacityLong = capacityLong();
+ return capacityLong > Integer.MAX_VALUE ? Integer.MAX_VALUE : Math.toIntExact(capacityLong);
+ }
+
+ public long capacityLong()
+ {
+ return _maxLength;
+ }
+
+ @Override
+ public boolean canRetain()
+ {
+ return _retainable.canRetain();
+ }
+
+ @Override
+ public boolean isRetained()
+ {
+ return _retainable.isRetained();
+ }
+
+ @Override
+ public void retain()
+ {
+ _retainable.retain();
+ }
+
+ @Override
+ public boolean release()
+ {
+ if (_retainable.release())
+ {
+ clear();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void clear()
+ {
+ for (RetainableByteBuffer buffer : _buffers)
+ buffer.release();
+ _buffers.clear();
+ }
+
+ @Override
+ public boolean append(ByteBuffer bytes)
+ {
+ int remaining = bytes.remaining();
+ if (remaining == 0)
+ return true;
+
+ long currentlyRemaining = _maxLength - remainingLong();
+ if (currentlyRemaining >= remaining)
+ {
+ RetainableByteBuffer rbb = RetainableByteBuffer.wrap(bytes.slice());
+ bytes.position(bytes.limit());
+ _buffers.add(rbb);
+ return true;
+ }
+ else
+ {
+ ByteBuffer slice = bytes.slice();
+ slice.limit((int)(slice.position() + currentlyRemaining));
+ RetainableByteBuffer rbb = RetainableByteBuffer.wrap(slice);
+ bytes.position((int)(bytes.position() + currentlyRemaining));
+ _buffers.add(rbb);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean append(RetainableByteBuffer retainableBytes)
+ {
+ ByteBuffer bytes = retainableBytes.getByteBuffer();
+ int remaining = bytes.remaining();
+ if (remaining == 0)
+ return true;
+
+ long currentlyRemaining = _maxLength - remainingLong();
+ if (currentlyRemaining >= remaining)
+ {
+ retainableBytes.retain();
+ RetainableByteBuffer rbb = RetainableByteBuffer.wrap(bytes.slice(), retainableBytes);
+ bytes.position(bytes.limit());
+ _buffers.add(rbb);
+ return true;
+ }
+ else
+ {
+ retainableBytes.retain();
+ ByteBuffer slice = bytes.slice();
+ slice.limit((int)(slice.position() + currentlyRemaining));
+ RetainableByteBuffer rbb = RetainableByteBuffer.wrap(slice, retainableBytes);
+ bytes.position((int)(bytes.position() + currentlyRemaining));
+ _buffers.add(rbb);
+ return false;
+ }
+ }
+
+ @Override
+ public void putTo(ByteBuffer toInfillMode)
+ {
+ for (RetainableByteBuffer buffer : _buffers)
+ buffer.putTo(toInfillMode);
+ }
+
+ @Override
+ public void writeTo(Content.Sink sink, boolean last, Callback callback)
+ {
+ switch (_buffers.size())
+ {
+ case 0 -> callback.succeeded();
+ case 1 -> _buffers.get(0).writeTo(sink, last, callback);
+ default -> new IteratingNestedCallback(callback)
+ {
+ private int i = 0;
+
+ @Override
+ protected Action process()
+ {
+ if (i < _buffers.size())
+ {
+ _buffers.get(i).writeTo(sink, last && ++i == _buffers.size(), this);
+ return Action.SCHEDULED;
+ }
+ return Action.SUCCEEDED;
+ }
+ }.iterate();
+ }
+ }
+ }
+
/**
* A wrapper for {@link RetainableByteBuffer} instances
*/
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java
index 6bd5eeebc32a..d4db343868a2 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/AsyncContent.java
@@ -301,6 +301,12 @@ public boolean canRetain()
return referenceCounter != null;
}
+ @Override
+ public boolean isRetained()
+ {
+ return canRetain() && referenceCounter.isRetained();
+ }
+
@Override
public void retain()
{
@@ -330,5 +336,17 @@ public void failed(Throwable x)
{
callback.failed(x);
}
+
+ @Override
+ public String toString()
+ {
+ return "%s@%x[rc=%s,l=%b,b=%s]".formatted(
+ getClass().getSimpleName(),
+ hashCode(),
+ referenceCounter == null ? "-" : referenceCounter.get(),
+ isLast(),
+ BufferUtil.toDetailString(getByteBuffer())
+ );
+ }
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java
index acb85e2ddb60..d709df155946 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java
@@ -17,7 +17,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
-import org.eclipse.jetty.io.ByteBufferAggregator;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
@@ -43,15 +42,13 @@ public class BufferedContentSink implements Content.Sink
private static final Logger LOG = LoggerFactory.getLogger(BufferedContentSink.class);
- private static final int START_BUFFER_SIZE = 1024;
-
+ private final Flusher _flusher = new Flusher();
private final Content.Sink _delegate;
private final ByteBufferPool _bufferPool;
private final boolean _direct;
private final int _maxBufferSize;
private final int _maxAggregationSize;
- private final Flusher _flusher;
- private ByteBufferAggregator _aggregator;
+ private RetainableByteBuffer _aggregator;
private boolean _firstWrite = true;
private boolean _lastWritten;
@@ -68,7 +65,13 @@ public BufferedContentSink(Content.Sink delegate, ByteBufferPool bufferPool, boo
_direct = direct;
_maxBufferSize = maxBufferSize;
_maxAggregationSize = maxAggregationSize;
- _flusher = new Flusher(delegate);
+ }
+
+ private void releaseAggregator()
+ {
+ if (_aggregator != null)
+ _aggregator.release();
+ _aggregator = null;
}
@Override
@@ -99,14 +102,13 @@ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
{
// current buffer can be aggregated
if (_aggregator == null)
- _aggregator = new ByteBufferAggregator(_bufferPool, _direct, Math.min(START_BUFFER_SIZE, _maxBufferSize), _maxBufferSize);
- aggregateAndFlush(last, current, callback);
- }
- else
- {
- // current buffer is greater than the max aggregation size
- flush(last, current, callback);
+ _aggregator = new RetainableByteBuffer.Aggregator(_bufferPool, _direct, _maxBufferSize);
+ aggregate(last, current, callback);
+ return;
}
+
+ // current buffer is greater than the max aggregation size
+ _flusher.flush(last, current, callback);
}
/**
@@ -115,154 +117,57 @@ public void write(boolean last, ByteBuffer byteBuffer, Callback callback)
*/
public void flush(Callback callback)
{
- flush(false, FLUSH_BUFFER, callback);
- }
-
- /**
- * Flushes the aggregated buffer if something was aggregated, then flushes the
- * given buffer, bypassing the aggregator.
- */
- private void flush(boolean last, ByteBuffer currentBuffer, Callback callback)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("given buffer is greater than _maxBufferSize");
-
- RetainableByteBuffer aggregatedBuffer = _aggregator == null ? null : _aggregator.takeRetainableByteBuffer();
- if (aggregatedBuffer == null)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("nothing aggregated, flushing current buffer {}", currentBuffer);
- _flusher.offer(last, currentBuffer, callback);
- }
- else if (BufferUtil.hasContent(currentBuffer))
- {
- if (LOG.isDebugEnabled())
- LOG.debug("flushing aggregated buffer {}", aggregatedBuffer);
- _flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
- {
- @Override
- public void succeeded()
- {
- super.succeeded();
- if (LOG.isDebugEnabled())
- LOG.debug("succeeded writing aggregated buffer, flushing current buffer {}", currentBuffer);
- _flusher.offer(last, currentBuffer, callback);
- }
-
- @Override
- public void failed(Throwable x)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("failure writing aggregated buffer", x);
- super.failed(x);
- callback.failed(x);
- }
- });
- }
- else
- {
- _flusher.offer(false, aggregatedBuffer.getByteBuffer(), Callback.from(aggregatedBuffer::release, callback));
- }
+ _flusher.flush(false, FLUSH_BUFFER, callback);
}
/**
* Aggregates the given buffer, flushing the aggregated buffer if necessary.
*/
- private void aggregateAndFlush(boolean last, ByteBuffer currentBuffer, Callback callback)
+ private void aggregate(boolean last, ByteBuffer byteBuffer, Callback callback)
{
- boolean full = _aggregator.aggregate(currentBuffer);
- boolean empty = !currentBuffer.hasRemaining();
- boolean flush = full || currentBuffer == FLUSH_BUFFER;
+ boolean full = !_aggregator.append(byteBuffer) || _aggregator.isFull();
+ boolean empty = !byteBuffer.hasRemaining();
+ boolean flush = full || byteBuffer == FLUSH_BUFFER;
boolean complete = last && empty;
if (LOG.isDebugEnabled())
- LOG.debug("aggregated current buffer, full={}, complete={}, bytes left={}, aggregator={}", full, complete, currentBuffer.remaining(), _aggregator);
- if (complete)
- {
- RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer();
- if (aggregatedBuffer != null)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("complete; writing aggregated buffer as the last one: {} bytes", aggregatedBuffer.remaining());
- _flusher.offer(true, aggregatedBuffer.getByteBuffer(), Callback.from(callback, aggregatedBuffer::release));
- }
- else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("complete; no aggregated buffer, writing last empty buffer");
- _flusher.offer(true, BufferUtil.EMPTY_BUFFER, callback);
- }
- }
- else if (flush)
- {
- RetainableByteBuffer aggregatedBuffer = _aggregator.takeRetainableByteBuffer();
- if (LOG.isDebugEnabled())
- LOG.debug("writing aggregated buffer: {} bytes, then {}", aggregatedBuffer.remaining(), currentBuffer.remaining());
-
- if (BufferUtil.hasContent(currentBuffer))
- {
- _flusher.offer(false, aggregatedBuffer.getByteBuffer(), new Callback.Nested(Callback.from(aggregatedBuffer::release))
- {
- @Override
- public void succeeded()
- {
- super.succeeded();
- if (LOG.isDebugEnabled())
- LOG.debug("written aggregated buffer, writing remaining of current: {} bytes{}", currentBuffer.remaining(), (last ? " (last write)" : ""));
- if (last)
- _flusher.offer(true, currentBuffer, callback);
- else
- aggregateAndFlush(false, currentBuffer, callback);
- }
-
- @Override
- public void failed(Throwable x)
- {
- if (LOG.isDebugEnabled())
- LOG.debug("failure writing aggregated buffer", x);
- super.failed(x);
- callback.failed(x);
- }
- });
- }
- else
- {
- _flusher.offer(false, aggregatedBuffer.getByteBuffer(), Callback.from(aggregatedBuffer::release, callback));
- }
- }
+ LOG.debug("aggregated current buffer, full={}, complete={}, bytes left={}, aggregator={}", full, complete, byteBuffer.remaining(), _aggregator);
+ if (complete || flush)
+ _flusher.flush(last, byteBuffer, callback);
else
- {
- if (LOG.isDebugEnabled())
- LOG.debug("buffer fully aggregated, delaying writing - aggregator: {}", _aggregator);
- _flusher.offer(callback);
- }
+ _flusher.serialize(callback);
}
- private static class Flusher extends IteratingCallback
+ private class Flusher extends IteratingCallback
{
- private static final ByteBuffer COMPLETE_CALLBACK = BufferUtil.allocate(0);
+ private enum Scheduled
+ {
+ FLUSHING_AGGREGATION,
+ FLUSHING_BUFFER,
+ }
- private final Content.Sink _sink;
+ private Scheduled _scheduled;
+ private boolean _flush;
private boolean _last;
- private ByteBuffer _buffer;
+ private ByteBuffer _byteBuffer;
private Callback _callback;
private boolean _lastWritten;
- Flusher(Content.Sink sink)
+ private void flush(boolean last, ByteBuffer byteBuffer, Callback callback)
{
- _sink = sink;
- }
-
- void offer(Callback callback)
- {
- offer(false, COMPLETE_CALLBACK, callback);
+ if (_callback != null)
+ throw new WritePendingException();
+ _flush = true;
+ _last = last;
+ _byteBuffer = byteBuffer;
+ _callback = callback;
+ iterate();
}
- void offer(boolean last, ByteBuffer byteBuffer, Callback callback)
+ private void serialize(Callback callback)
{
if (_callback != null)
throw new WritePendingException();
- _last = last;
- _buffer = byteBuffer;
+ _flush = false;
_callback = callback;
iterate();
}
@@ -270,37 +175,79 @@ void offer(boolean last, ByteBuffer byteBuffer, Callback callback)
@Override
protected Action process()
{
- if (_lastWritten)
- return Action.SUCCEEDED;
- if (_callback == null)
- return Action.IDLE;
- if (_buffer != COMPLETE_CALLBACK)
+ if (_scheduled != null)
{
- _lastWritten = _last;
- _sink.write(_last, _buffer, this);
+ switch (_scheduled)
+ {
+ case FLUSHING_AGGREGATION ->
+ {
+ _aggregator.clear();
+ if (_byteBuffer != null && _byteBuffer.remaining() <= _maxAggregationSize && !_last)
+ {
+ _aggregator.append(_byteBuffer);
+ _byteBuffer = null;
+ _flush = false;
+ }
+ }
+
+ case FLUSHING_BUFFER ->
+ _byteBuffer = null;
+ }
+ _scheduled = null;
}
- else
+
+ if (_flush && _aggregator != null && _aggregator.hasRemaining())
{
- succeeded();
+ boolean last = _last && BufferUtil.isEmpty(_byteBuffer);
+ _lastWritten |= last;
+ _aggregator.writeTo(_delegate, last, this);
+ _scheduled = Scheduled.FLUSHING_AGGREGATION;
+ return Action.SCHEDULED;
}
- return Action.SCHEDULED;
+
+ if (_flush && (BufferUtil.hasContent(_byteBuffer) || _byteBuffer == FLUSH_BUFFER))
+ {
+ ByteBuffer buffer = _byteBuffer;
+ _byteBuffer = null;
+ _lastWritten |= _last;
+ _delegate.write(_last, buffer, this);
+ _scheduled = Scheduled.FLUSHING_BUFFER;
+ return Action.SCHEDULED;
+ }
+
+ if (_last && !_lastWritten)
+ {
+ _lastWritten = true;
+ _delegate.write(_last, BufferUtil.EMPTY_BUFFER, this);
+ return Action.SCHEDULED;
+ }
+
+ Callback callback = _callback;
+ if (callback != null)
+ {
+ _callback = null;
+ callback.succeeded();
+ this.succeeded();
+ return Action.SCHEDULED;
+ }
+
+ return _last ? Action.SUCCEEDED : Action.IDLE;
}
@Override
- public void succeeded()
+ protected void onCompleteFailure(Throwable cause)
{
- _buffer = null;
+ releaseAggregator();
Callback callback = _callback;
_callback = null;
- callback.succeeded();
- super.succeeded();
+ if (callback != null)
+ callback.failed(cause);
}
@Override
- protected void onCompleteFailure(Throwable cause)
+ protected void onCompleteSuccess()
{
- _buffer = null;
- _callback.failed(cause);
+ releaseAggregator();
}
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java
index 821782fd32fe..29f40d25c4f2 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ByteBufferChunk.java
@@ -65,6 +65,12 @@ public WithReferenceCount(ByteBuffer byteBuffer, boolean last)
super(byteBuffer, last);
}
+ @Override
+ public boolean isRetained()
+ {
+ return references.isRetained();
+ }
+
@Override
public boolean canRetain()
{
@@ -148,6 +154,12 @@ public WithRetainable(ByteBuffer byteBuffer, boolean last, Retainable retainable
this.retainable = retainable;
}
+ @Override
+ public boolean isRetained()
+ {
+ return retainable.isRetained();
+ }
+
@Override
public boolean canRetain()
{
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java
index 9db5923b287f..79936a6c3e0f 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceByteBuffer.java
@@ -15,13 +15,13 @@
import java.nio.ByteBuffer;
-import org.eclipse.jetty.io.ByteBufferAccumulator;
import org.eclipse.jetty.io.Content;
+import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.Promise;
public class ContentSourceByteBuffer implements Runnable
{
- private final ByteBufferAccumulator accumulator = new ByteBufferAccumulator();
+ private final RetainableByteBuffer.Accumulator accumulator = new RetainableByteBuffer.Accumulator(null, false, -1);
private final Content.Source source;
private final Promise promise;
@@ -52,12 +52,18 @@ public void run()
return;
}
- accumulator.copyBuffer(chunk.getByteBuffer());
+ accumulator.append(chunk);
chunk.release();
if (chunk.isLast())
{
- promise.succeeded(accumulator.takeByteBuffer());
+ RetainableByteBuffer copy = accumulator.copy();
+ accumulator.release();
+
+ // We know the accumulator is not using a pool, so whilst we release after succeeded, it is safe
+ // for the promise to retain the ByteBuffer after the call.
+ promise.succeeded(copy.getByteBuffer());
+ copy.release();
return;
}
}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceRetainableByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceRetainableByteBuffer.java
new file mode 100644
index 000000000000..65015cd2d64b
--- /dev/null
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentSourceRetainableByteBuffer.java
@@ -0,0 +1,75 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.io.internal;
+
+import org.eclipse.jetty.io.ByteBufferPool;
+import org.eclipse.jetty.io.Content;
+import org.eclipse.jetty.io.RetainableByteBuffer;
+import org.eclipse.jetty.util.Promise;
+
+public class ContentSourceRetainableByteBuffer implements Runnable
+{
+ private final RetainableByteBuffer.Accumulator accumulator;
+ private final Content.Source source;
+ private final Promise promise;
+
+ public ContentSourceRetainableByteBuffer(Content.Source source, ByteBufferPool pool, boolean direct, int maxSize, Promise promise)
+ {
+ this.source = source;
+ this.accumulator = new RetainableByteBuffer.Accumulator(pool, direct, maxSize);
+ this.promise = promise;
+ }
+
+ @Override
+ public void run()
+ {
+ while (true)
+ {
+ Content.Chunk chunk = source.read();
+
+ if (chunk == null)
+ {
+ source.demand(this);
+ return;
+ }
+
+ if (Content.Chunk.isFailure(chunk))
+ {
+ promise.failed(chunk.getFailure());
+ if (!chunk.isLast())
+ source.fail(chunk.getFailure());
+ return;
+ }
+
+ boolean appended = accumulator.append(chunk);
+ chunk.release();
+
+ if (!appended)
+ {
+ IllegalStateException ise = new IllegalStateException("Max size (" + accumulator.capacity() + ") exceeded");
+ promise.failed(ise);
+ accumulator.release();
+ source.fail(ise);
+ return;
+ }
+
+ if (chunk.isLast())
+ {
+ promise.succeeded(accumulator);
+ accumulator.release();
+ return;
+ }
+ }
+ }
+}
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
index 39f74d7dde11..7c9c8acb59a4 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
@@ -342,7 +342,7 @@ private void acquireEncryptedOutput()
public void onUpgradeTo(ByteBuffer buffer)
{
acquireEncryptedInput();
- BufferUtil.append(_encryptedInput.getByteBuffer(), buffer);
+ _encryptedInput.append(buffer);
}
@Override
diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java
index 0731eff5d32b..0f4108f6343d 100644
--- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java
+++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/BufferedContentSinkTest.java
@@ -13,9 +13,13 @@
package org.eclipse.jetty.io;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -34,9 +38,11 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -270,6 +276,12 @@ public void testFlush(BiConsumer flusher) throws
assertThat(BufferUtil.toString(chunk.getByteBuffer()), is("Hello World!"));
chunk.release();
callback.get(5, TimeUnit.SECONDS);
+
+ buffered.write(true, BufferUtil.EMPTY_BUFFER, Callback.NOOP);
+ chunk = async.read();
+ assertThat(chunk.isLast(), is(true));
+ assertThat(chunk.remaining(), is(0));
+ chunk.release();
}
}
@@ -509,10 +521,11 @@ public void succeeded()
@Test
public void testByteByByteAsync() throws Exception
{
+ // This test relies on selecting a size that will not be over allocated by the buffer pool
try (AsyncContent async = new AsyncContent())
{
- BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 1024, 1024);
- AtomicInteger count = new AtomicInteger(2048);
+ BufferedContentSink buffered = new BufferedContentSink(async, _bufferPool, true, 8 * 1024, 8 * 1024);
+ AtomicInteger count = new AtomicInteger(16 * 1024);
CountDownLatch complete = new CountDownLatch(1);
Callback callback = new Callback()
{
@@ -540,12 +553,12 @@ public void succeeded()
Content.Chunk read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull);
assertThat(read.isLast(), is(false));
- assertThat(read.remaining(), is(1024));
+ assertThat(read.remaining(), is(8 * 1024));
assertThat(read.release(), is(true));
read = await().atMost(5, TimeUnit.SECONDS).until(async::read, Objects::nonNull);
assertThat(read.isLast(), is(true));
- assertThat(read.remaining(), is(1024));
+ assertThat(read.remaining(), is(8 * 1024));
assertThat(read.release(), is(true));
assertTrue(complete.await(5, TimeUnit.SECONDS));
@@ -594,4 +607,45 @@ public void succeeded()
assertThat(count.get(), is(-1));
}
}
+
+ @Test
+ public void testFromOutputStream()
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Content.Sink sink = Content.Sink.from(baos);
+
+ AccountingCallback accountingCallback = new AccountingCallback();
+
+ sink.write(false, ByteBuffer.wrap("hello ".getBytes(US_ASCII)), accountingCallback);
+ assertThat(accountingCallback.reports, equalTo(List.of("succeeded")));
+ accountingCallback.reports.clear();
+
+ sink.write(true, ByteBuffer.wrap("world".getBytes(US_ASCII)), accountingCallback);
+ assertThat(accountingCallback.reports, equalTo(List.of("succeeded")));
+ accountingCallback.reports.clear();
+
+ sink.write(true, ByteBuffer.wrap(" again".getBytes(US_ASCII)), accountingCallback);
+ assertThat(accountingCallback.reports.size(), is(1));
+ assertThat(accountingCallback.reports.get(0), instanceOf(EOFException.class));
+ accountingCallback.reports.clear();
+
+ assertThat(baos.toString(US_ASCII), is("hello world"));
+ }
+
+ private static class AccountingCallback implements Callback
+ {
+ private final List