From 63b30a6e0114ab4ba3a3d81591489965ceb49fda Mon Sep 17 00:00:00 2001 From: franz1981 Date: Tue, 18 Apr 2023 15:06:06 +0200 Subject: [PATCH] Batching buffer with minChunkSize --- .../JaxrsClientReactiveProcessor.java | 1 + .../runtime/ResteasyReactiveConfig.java | 11 +- .../deployment/ResteasyReactiveProcessor.java | 1 + .../common/ResteasyReactiveConfig.java | 21 +- .../ResteasyReactiveDeploymentManager.java | 7 +- .../reactive/server/vertx/AppendBuffer.java | 192 ++++++++++++++++++ .../vertx/ResteasyReactiveOutputStream.java | 40 ++-- 7 files changed, 241 insertions(+), 32 deletions(-) create mode 100644 independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/AppendBuffer.java diff --git a/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java b/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java index ac8e95ad13d78..eb2d502468585 100644 --- a/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java +++ b/extensions/resteasy-reactive/jaxrs-client-reactive/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java @@ -649,6 +649,7 @@ private org.jboss.resteasy.reactive.common.ResteasyReactiveConfig createRestReac return new org.jboss.resteasy.reactive.common.ResteasyReactiveConfig( getEffectivePropertyValue("input-buffer-size", config.inputBufferSize().asLongValue(), Long.class, mpConfig), + getEffectivePropertyValue("min-chunk-size", config.minChunkSize(), Integer.class, mpConfig), getEffectivePropertyValue("output-buffer-size", config.outputBufferSize(), Integer.class, mpConfig), getEffectivePropertyValue("single-default-produces", config.singleDefaultProduces(), Boolean.class, mpConfig), getEffectivePropertyValue("default-produces", config.defaultProduces(), Boolean.class, mpConfig)); diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-common/runtime/src/main/java/io/quarkus/resteasy/reactive/common/runtime/ResteasyReactiveConfig.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-common/runtime/src/main/java/io/quarkus/resteasy/reactive/common/runtime/ResteasyReactiveConfig.java index 14b75b983ef60..611d0ccd64fb7 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-common/runtime/src/main/java/io/quarkus/resteasy/reactive/common/runtime/ResteasyReactiveConfig.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-common/runtime/src/main/java/io/quarkus/resteasy/reactive/common/runtime/ResteasyReactiveConfig.java @@ -18,10 +18,19 @@ public interface ResteasyReactiveConfig { @WithDefault("10k") MemorySize inputBufferSize(); + /** + * The size of the chunks of memory allocated when writing data. + *

+ * This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations + * of the application. + */ + @WithDefault("128") + int minChunkSize(); + /** * The size of the output stream response buffer. If a response is larger than this and no content-length * is provided then the request will be chunked. - * + *

* Larger values may give slight performance increases for large responses, at the expense of more memory usage. */ @WithDefault("8191") diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java index 439bceb1405ff..a26c79b2e50aa 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/ResteasyReactiveProcessor.java @@ -1381,6 +1381,7 @@ private org.jboss.resteasy.reactive.common.ResteasyReactiveConfig createRestReac return new org.jboss.resteasy.reactive.common.ResteasyReactiveConfig( getEffectivePropertyValue("input-buffer-size", config.inputBufferSize().asLongValue(), Long.class, mpConfig), + getEffectivePropertyValue("min-chunk-size", config.outputBufferSize(), Integer.class, mpConfig), getEffectivePropertyValue("output-buffer-size", config.outputBufferSize(), Integer.class, mpConfig), getEffectivePropertyValue("single-default-produces", config.singleDefaultProduces(), Boolean.class, mpConfig), getEffectivePropertyValue("default-produces", config.defaultProduces(), Boolean.class, mpConfig)); diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/ResteasyReactiveConfig.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/ResteasyReactiveConfig.java index edee68197af95..4b140b20b7b06 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/ResteasyReactiveConfig.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/ResteasyReactiveConfig.java @@ -8,10 +8,18 @@ public class ResteasyReactiveConfig { */ private long inputBufferSize; + /** + * The size of the chunks of memory allocated when writing data. + *

+ * This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations + * of the application. + */ + private int minChunkSize = 128; + /** * The size of the output stream response buffer. If a response is larger than this and no content-length * is provided then the request will be chunked. - * + *

* Larger values may give slight performance increases for large responses, at the expense of more memory usage. */ private int outputBufferSize = 8192; @@ -35,9 +43,10 @@ public class ResteasyReactiveConfig { public ResteasyReactiveConfig() { } - public ResteasyReactiveConfig(long inputBufferSize, int outputBufferSize, boolean singleDefaultProduces, + public ResteasyReactiveConfig(long inputBufferSize, int minChunkSize, int outputBufferSize, boolean singleDefaultProduces, boolean defaultProduces) { this.inputBufferSize = inputBufferSize; + this.minChunkSize = minChunkSize; this.outputBufferSize = outputBufferSize; this.singleDefaultProduces = singleDefaultProduces; this.defaultProduces = defaultProduces; @@ -55,6 +64,14 @@ public int getOutputBufferSize() { return outputBufferSize; } + public int getMinChunkSize() { + return minChunkSize; + } + + public void setMinChunkSize(int minChunkSize) { + this.minChunkSize = minChunkSize; + } + public void setOutputBufferSize(int outputBufferSize) { this.outputBufferSize = outputBufferSize; } diff --git a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/ResteasyReactiveDeploymentManager.java b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/ResteasyReactiveDeploymentManager.java index cae40343c58aa..77d1917a5bf8b 100644 --- a/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/ResteasyReactiveDeploymentManager.java +++ b/independent-projects/resteasy-reactive/server/processor/src/main/java/org/jboss/resteasy/reactive/server/processor/ResteasyReactiveDeploymentManager.java @@ -82,6 +82,8 @@ public static ScanStep start(IndexView nonCalculatingIndex) { public static class ScanStep { final IndexView index; int inputBufferSize = 10000; + + int minChunkSize = 128; int outputBufferSize = 8192; /** * By default, we assume a default produced media type of "text/plain" @@ -215,8 +217,9 @@ public ScanResult scan() { .setAdditionalReaders(readers) .setAdditionalWriters(writers) .setInjectableBeans(new HashMap<>()) - .setConfig(new ResteasyReactiveConfig(inputBufferSize, outputBufferSize, singleDefaultProduces, - defaultProduces)) + .setConfig( + new ResteasyReactiveConfig(inputBufferSize, minChunkSize, outputBufferSize, singleDefaultProduces, + defaultProduces)) .setHttpAnnotationToMethod(resources.getHttpAnnotationToMethod()) .setApplicationScanningResult(applicationScanningResult); for (MethodScanner scanner : methodScanners) { diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/AppendBuffer.java b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/AppendBuffer.java new file mode 100644 index 0000000000000..4f2fb4a7e499e --- /dev/null +++ b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/AppendBuffer.java @@ -0,0 +1,192 @@ +package org.jboss.resteasy.reactive.server.vertx; + +import java.util.ArrayDeque; +import java.util.Objects; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; + +/** + * It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.
+ * In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}. + */ +final class AppendBuffer { + private final ByteBufAllocator allocator; + + private final int minChunkSize; + private final int capacity; + private ByteBuf buffer; + private ArrayDeque otherBuffers; + private int size; + + private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) { + this.allocator = allocator; + this.minChunkSize = Math.min(minChunkSize, capacity); + this.capacity = capacity; + } + + /** + * This buffer append data in a single eagerly allocated {@link ByteBuf}. + */ + public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, capacity, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, 0, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or + * as each {@code len}, if greater than it.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) { + return new AppendBuffer(allocator, minChunkSize, capacity); + } + + private ByteBuf lastBuffer() { + if (otherBuffers == null || otherBuffers.isEmpty()) { + return buffer; + } + return otherBuffers.peekLast(); + } + + /** + * It returns how many bytes have been appended
+ * If returns a value different from {@code len}, is it required to invoke {@link #clear} + * that would refill the available capacity till {@link #capacity()} + */ + public int append(byte[] bytes, int off, int len) { + Objects.requireNonNull(bytes); + if (len == 0) { + return 0; + } + int alreadyWritten = 0; + if (minChunkSize > 0) { + var lastBuffer = lastBuffer(); + if (lastBuffer != null) { + int availableOnLast = lastBuffer.writableBytes(); + if (availableOnLast > 0) { + int toWrite = Math.min(len, availableOnLast); + lastBuffer.writeBytes(bytes, off, toWrite); + size += toWrite; + len -= toWrite; + // we stop if there's no more to append + if (len == 0) { + return toWrite; + } + off += toWrite; + alreadyWritten = toWrite; + } + } + } + final int availableCapacity = capacity - size; + if (availableCapacity == 0) { + return alreadyWritten; + } + // we can still write some + int toWrite = Math.min(len, availableCapacity); + assert toWrite > 0; + final int chunkCapacity; + if (minChunkSize > 0) { + // Cannot allocate less than minChunkSize, till the limit of capacity left + chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity); + } else { + chunkCapacity = toWrite; + } + var tmpBuf = allocator.directBuffer(chunkCapacity); + try { + tmpBuf.writeBytes(bytes, off, toWrite); + } catch (Throwable t) { + tmpBuf.release(); + throw t; + } + if (buffer == null) { + buffer = tmpBuf; + } else { + boolean resetOthers = false; + try { + if (otherBuffers == null) { + otherBuffers = new ArrayDeque<>(); + resetOthers = true; + } + otherBuffers.add(tmpBuf); + } catch (Throwable t) { + rollback(alreadyWritten, tmpBuf, resetOthers); + throw t; + } + } + size += toWrite; + return toWrite + alreadyWritten; + } + + private void rollback(int alreadyWritten, ByteBuf tmpBuf, boolean resetOthers) { + tmpBuf.release(); + if (resetOthers) { + otherBuffers = null; + } + if (alreadyWritten > 0) { + var last = lastBuffer(); + last.writerIndex(last.writerIndex() - alreadyWritten); + size -= alreadyWritten; + assert last.writerIndex() > 0; + } + } + + public ByteBuf clear() { + var firstBuf = buffer; + if (firstBuf == null) { + return null; + } + var others = otherBuffers; + if (others == null || others.isEmpty()) { + size = 0; + buffer = null; + // super fast-path + return firstBuf; + } + return clearBuffers(); + } + + private CompositeByteBuf clearBuffers() { + var firstBuf = buffer; + var others = otherBuffers; + var batch = allocator.compositeDirectBuffer(1 + others.size()); + try { + buffer = null; + size = 0; + batch.addComponent(true, 0, firstBuf); + for (int i = 0, othersCount = others.size(); i < othersCount; i++) { + // if addComponent fail, it takes care of releasing curr and throwing the exception: + batch.addComponent(true, 1 + i, others.poll()); + } + return batch; + } catch (Throwable anyError) { + batch.release(); + releaseOthers(others); + throw anyError; + } + } + + private static void releaseOthers(ArrayDeque others) { + ByteBuf buf; + while ((buf = others.poll()) != null) { + buf.release(); + } + } + + public int capacity() { + return capacity; + } + + public int availableCapacity() { + return capacity - size; + } + +} diff --git a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java index f1d7f01ddff7d..701d64586253f 100644 --- a/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java +++ b/independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/ResteasyReactiveOutputStream.java @@ -26,8 +26,7 @@ public class ResteasyReactiveOutputStream extends OutputStream { private static final Logger log = Logger.getLogger("org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveOutputStream"); private final ResteasyReactiveRequestContext context; protected final HttpServerRequest request; - private final int outputBufferSize; - private ByteBuf pooledBuffer; + private final AppendBuffer appendBuffer; private boolean committed; private boolean closed; @@ -40,7 +39,9 @@ public class ResteasyReactiveOutputStream extends OutputStream { public ResteasyReactiveOutputStream(VertxResteasyReactiveRequestContext context) { this.context = context; this.request = context.getContext().request(); - this.outputBufferSize = context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize(); + this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, + context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), + context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize()); request.response().exceptionHandler(new Handler() { @Override public void handle(Throwable event) { @@ -202,26 +203,16 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti int rem = len; int idx = off; - ByteBuf buffer = pooledBuffer; try { - if (buffer == null) { - pooledBuffer = buffer = PooledByteBufAllocator.DEFAULT.directBuffer(outputBufferSize); - } while (rem > 0) { - int toWrite = Math.min(rem, buffer.writableBytes()); - buffer.writeBytes(b, idx, toWrite); - rem -= toWrite; - idx += toWrite; - if (!buffer.isWritable()) { - ByteBuf tmpBuf = buffer; - this.pooledBuffer = buffer = PooledByteBufAllocator.DEFAULT.directBuffer(outputBufferSize); - writeBlocking(tmpBuf, false); + final int written = appendBuffer.append(b, idx, rem); + if (written < rem) { + writeBlocking(appendBuffer.clear(), false); } + rem -= written; + idx += written; } } catch (Exception e) { - if (buffer != null && buffer.refCnt() > 0) { - buffer.release(); - } throw new IOException(e); } } @@ -282,15 +273,11 @@ public void flush() throws IOException { throw new IOException("Stream is closed"); } try { - if (pooledBuffer != null) { - writeBlocking(pooledBuffer, false); - pooledBuffer = null; + var toFlush = appendBuffer.clear(); + if (toFlush != null) { + writeBlocking(toFlush, false); } } catch (Exception e) { - if (pooledBuffer != null) { - pooledBuffer.release(); - pooledBuffer = null; - } throw new IOException(e); } } @@ -302,12 +289,11 @@ public void close() throws IOException { if (closed) return; try { - writeBlocking(pooledBuffer, true); + writeBlocking(appendBuffer.clear(), true); } catch (Exception e) { throw new IOException(e); } finally { closed = true; - pooledBuffer = null; } }