From b36db1e18ef4e2beaa372a2cd05503c7a8678b3e Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Mon, 13 Nov 2023 17:09:30 +0100 Subject: [PATCH] Large responses written slowly due to allocating too small buffers fix #1066 --- .../ROOT/pages/includes/quarkus-cxf.adoc | 38 +++ .../CxfEndpointImplementationProcessor.java | 4 +- .../java/io/quarkiverse/cxf/CXFRecorder.java | 9 +- .../io/quarkiverse/cxf/CxfFixedConfig.java | 18 ++ .../cxf/transport/AppendBuffer.java | 192 +++++++++++++ .../quarkiverse/cxf/transport/CxfHandler.java | 10 +- .../transport/VertxHttpServletResponse.java | 10 +- .../transport/VertxServletOutputStream.java | 268 +++++++++--------- 8 files changed, 411 insertions(+), 138 deletions(-) create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java diff --git a/docs/modules/ROOT/pages/includes/quarkus-cxf.adoc b/docs/modules/ROOT/pages/includes/quarkus-cxf.adoc index 1381dc1fa..cb0718194 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-cxf.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-cxf.adoc @@ -517,6 +517,44 @@ endif::add-copy-button-to-env-var[] | +a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.min-chunk-size]]`link:#quarkus-cxf_quarkus.cxf.min-chunk-size[quarkus.cxf.min-chunk-size]` + + +[.description] +-- +The size in bytes of the chunks of memory allocated when writing data. + +This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations of the application. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++QUARKUS_CXF_MIN_CHUNK_SIZE+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++QUARKUS_CXF_MIN_CHUNK_SIZE+++` +endif::add-copy-button-to-env-var[] +--|int +|`128` + + +a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.output-buffer-size]]`link:#quarkus-cxf_quarkus.cxf.output-buffer-size[quarkus.cxf.output-buffer-size]` + + +[.description] +-- +The size of the output stream response buffer in bytes. If a response is larger than this and no content-length is provided then the response will be chunked. + +Larger values may give slight performance increases for large responses, at the expense of more memory usage. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++QUARKUS_CXF_OUTPUT_BUFFER_SIZE+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++QUARKUS_CXF_OUTPUT_BUFFER_SIZE+++` +endif::add-copy-button-to-env-var[] +--|int +|`8191` + + a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.codegen.wsdl2java.-named-parameter-sets-.includes]]`link:#quarkus-cxf_quarkus.cxf.codegen.wsdl2java.-named-parameter-sets-.includes[quarkus.cxf.codegen.wsdl2java."named-parameter-sets".includes]` diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java index 6e7a65663..c69bc92b5 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java @@ -18,6 +18,7 @@ import io.quarkiverse.cxf.CXFRecorder; import io.quarkiverse.cxf.CXFServletInfos; import io.quarkiverse.cxf.CxfConfig; +import io.quarkiverse.cxf.CxfFixedConfig; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.arc.deployment.BeanContainerBuildItem; import io.quarkus.deployment.annotations.BuildProducer; @@ -123,6 +124,7 @@ void startRoute(CXFRecorder recorder, HttpBuildTimeConfig httpBuildTimeConfig, HttpConfiguration httpConfiguration, CxfBuildTimeConfig cxfBuildTimeConfig, + CxfFixedConfig fixedConfig, CxfConfig cxfConfig) { final RuntimeValue infos = recorder.createInfos(cxfBuildTimeConfig.path(), httpBuildTimeConfig.rootPath); @@ -140,7 +142,7 @@ void startRoute(CXFRecorder recorder, } if (!requestors.isEmpty()) { final Handler handler = recorder.initServer(infos, beanContainer.getValue(), - httpConfiguration); + httpConfiguration, fixedConfig); final String mappingPath = getMappingPath(cxfBuildTimeConfig.path()); LOGGER.infof("Mapping a Vert.x handler for CXF to %s as requested by %s", mappingPath, requestors); routes.produce(RouteBuildItem.builder() diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java index 668828c5c..a9e254e7c 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java @@ -128,12 +128,15 @@ public RuntimeValue createInfos(String path, String contextPath return new RuntimeValue<>(infos); } - public Handler initServer(RuntimeValue infos, BeanContainer beanContainer, - HttpConfiguration httpConfiguration) { + public Handler initServer( + RuntimeValue infos, + BeanContainer beanContainer, + HttpConfiguration httpConfiguration, + CxfFixedConfig fixedConfig) { LOGGER.trace("init server"); // There may be a better way to handle this CxfJsonRPCService.setServletInfos(infos.getValue()); - return new CxfHandler(infos.getValue(), beanContainer, httpConfiguration); + return new CxfHandler(infos.getValue(), beanContainer, httpConfiguration, fixedConfig); } public void resetDestinationRegistry(ShutdownContext context) { diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfFixedConfig.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfFixedConfig.java index c746fd4be..a17b27dce 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfFixedConfig.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfFixedConfig.java @@ -17,6 +17,24 @@ @ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED) public interface CxfFixedConfig { + /** + * The size in bytes of the chunks of memory allocated when writing data. + *

+ * This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations + * of the application. + */ + @WithDefault("128") + int minChunkSize(); + + /** + * The size of the output stream response buffer in bytes. If a response is larger than this and no content-length + * is provided then the response will be chunked. + *

+ * Larger values may give slight performance increases for large responses, at the expense of more memory usage. + */ + @WithDefault("8191") + int outputBufferSize(); + /** * The build time part of the client configuration. */ diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java new file mode 100644 index 000000000..0e8f50cde --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java @@ -0,0 +1,192 @@ +package io.quarkiverse.cxf.transport; + +import java.util.ArrayDeque; +import java.util.Objects; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; + +/** + * It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.
+ * In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}. + */ +final class AppendBuffer { + private final ByteBufAllocator allocator; + + private final int minChunkSize; + private final int capacity; + private ByteBuf buffer; + private ArrayDeque otherBuffers; + private int size; + + private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) { + this.allocator = allocator; + this.minChunkSize = Math.min(minChunkSize, capacity); + this.capacity = capacity; + } + + /** + * This buffer append data in a single eagerly allocated {@link ByteBuf}. + */ + public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, capacity, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, 0, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or + * as each {@code len}, if greater than it.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) { + return new AppendBuffer(allocator, minChunkSize, capacity); + } + + private ByteBuf lastBuffer() { + if (otherBuffers == null || otherBuffers.isEmpty()) { + return buffer; + } + return otherBuffers.peekLast(); + } + + /** + * It returns how many bytes have been appended
+ * If returns a value different from {@code len}, is it required to invoke {@link #clear} + * that would refill the available capacity till {@link #capacity()} + */ + public int append(byte[] bytes, int off, int len) { + Objects.requireNonNull(bytes); + if (len == 0) { + return 0; + } + int alreadyWritten = 0; + if (minChunkSize > 0) { + var lastBuffer = lastBuffer(); + if (lastBuffer != null) { + int availableOnLast = lastBuffer.writableBytes(); + if (availableOnLast > 0) { + int toWrite = Math.min(len, availableOnLast); + lastBuffer.writeBytes(bytes, off, toWrite); + size += toWrite; + len -= toWrite; + // we stop if there's no more to append + if (len == 0) { + return toWrite; + } + off += toWrite; + alreadyWritten = toWrite; + } + } + } + final int availableCapacity = capacity - size; + if (availableCapacity == 0) { + return alreadyWritten; + } + // we can still write some + int toWrite = Math.min(len, availableCapacity); + assert toWrite > 0; + final int chunkCapacity; + if (minChunkSize > 0) { + // Cannot allocate less than minChunkSize, till the limit of capacity left + chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity); + } else { + chunkCapacity = toWrite; + } + var tmpBuf = allocator.directBuffer(chunkCapacity); + try { + tmpBuf.writeBytes(bytes, off, toWrite); + } catch (Throwable t) { + tmpBuf.release(); + throw t; + } + if (buffer == null) { + buffer = tmpBuf; + } else { + boolean resetOthers = false; + try { + if (otherBuffers == null) { + otherBuffers = new ArrayDeque<>(); + resetOthers = true; + } + otherBuffers.add(tmpBuf); + } catch (Throwable t) { + rollback(alreadyWritten, tmpBuf, resetOthers); + throw t; + } + } + size += toWrite; + return toWrite + alreadyWritten; + } + + private void rollback(int alreadyWritten, ByteBuf tmpBuf, boolean resetOthers) { + tmpBuf.release(); + if (resetOthers) { + otherBuffers = null; + } + if (alreadyWritten > 0) { + var last = lastBuffer(); + last.writerIndex(last.writerIndex() - alreadyWritten); + size -= alreadyWritten; + assert last.writerIndex() > 0; + } + } + + public ByteBuf clear() { + var firstBuf = buffer; + if (firstBuf == null) { + return null; + } + var others = otherBuffers; + if (others == null || others.isEmpty()) { + size = 0; + buffer = null; + // super fast-path + return firstBuf; + } + return clearBuffers(); + } + + private CompositeByteBuf clearBuffers() { + var firstBuf = buffer; + var others = otherBuffers; + var batch = allocator.compositeDirectBuffer(1 + others.size()); + try { + buffer = null; + size = 0; + batch.addComponent(true, 0, firstBuf); + for (int i = 0, othersCount = others.size(); i < othersCount; i++) { + // if addComponent fail, it takes care of releasing curr and throwing the exception: + batch.addComponent(true, 1 + i, others.poll()); + } + return batch; + } catch (Throwable anyError) { + batch.release(); + releaseOthers(others); + throw anyError; + } + } + + private static void releaseOthers(ArrayDeque others) { + ByteBuf buf; + while ((buf = others.poll()) != null) { + buf.release(); + } + } + + public int capacity() { + return capacity; + } + + public int availableCapacity() { + return capacity - size; + } + +} diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/CxfHandler.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/CxfHandler.java index fb3ab30d4..cb93742bb 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/CxfHandler.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/CxfHandler.java @@ -25,6 +25,7 @@ import io.quarkiverse.cxf.CXFRuntimeUtils; import io.quarkiverse.cxf.CXFServletInfo; import io.quarkiverse.cxf.CXFServletInfos; +import io.quarkiverse.cxf.CxfFixedConfig; import io.quarkiverse.cxf.QuarkusRuntimeJaxWsServiceFactoryBean; import io.quarkus.arc.ManagedContext; import io.quarkus.arc.runtime.BeanContainer; @@ -49,12 +50,15 @@ public class CxfHandler implements Handler { private final IdentityProviderManager identityProviderManager; private final CurrentVertxRequest currentVertxRequest; private final HttpConfiguration httpConfiguration; + private final int outputBufferSize; + private final int minChunkSize; private static final String X_FORWARDED_PROTO_HEADER = "X-Forwarded-Proto"; private static final String X_FORWARDED_FOR_HEADER = "X-Forwarded-For"; private static final String X_FORWARDED_PORT_HEADER = "X-Forwarded-Port"; - public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, HttpConfiguration httpConfiguration) { + public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, HttpConfiguration httpConfiguration, + CxfFixedConfig fixedConfig) { LOGGER.trace("CxfHandler created"); this.beanContainer = beanContainer; this.httpConfiguration = httpConfiguration; @@ -68,6 +72,8 @@ public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, this.bus = BusFactory.getDefaultBus(); this.loader = this.bus.getExtension(ClassLoader.class); + this.outputBufferSize = fixedConfig.outputBufferSize(); + this.minChunkSize = fixedConfig.minChunkSize(); LOGGER.trace("load destination"); DestinationFactoryManager dfm = this.bus.getExtension(DestinationFactoryManager.class); @@ -214,7 +220,7 @@ private void process(RoutingContext event) { currentVertxRequest.setCurrent(event); try { HttpServletRequest req = new VertxHttpServletRequest(event, contextPath, servletPath); - VertxHttpServletResponse resp = new VertxHttpServletResponse(event); + VertxHttpServletResponse resp = new VertxHttpServletResponse(event, outputBufferSize, minChunkSize); req = checkXForwardedHeaders(req); controller.invoke(req, resp); resp.end(); diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java index dd30756e6..bc5a56e60 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java @@ -17,14 +17,18 @@ public class VertxHttpServletResponse implements HttpServletResponse { protected final RoutingContext context; private final HttpServerRequest request; protected final HttpServerResponse response; + private final int outputBufferSize; + private final int minChunkSize; private VertxServletOutputStream os; private PrintWriter printWriter; - public VertxHttpServletResponse(RoutingContext context) { + public VertxHttpServletResponse(RoutingContext context, int outputBufferSize, int minChunkSize) { this.request = context.request(); this.response = context.response(); this.context = context; - this.os = new VertxServletOutputStream(request, response); + this.outputBufferSize = outputBufferSize; + this.minChunkSize = minChunkSize; + this.os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize); } @Override @@ -187,7 +191,7 @@ public void resetBuffer() { } catch (IOException e) { } } - os = new VertxServletOutputStream(request, response); + os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize); } @Override diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java index fa078fe7e..6ce080993 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java @@ -13,139 +13,86 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderNames; import io.quarkus.vertx.core.runtime.VertxBufferImpl; +import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.RoutingContext; public class VertxServletOutputStream extends ServletOutputStream { private static final Logger log = Logger.getLogger(VertxServletOutputStream.class); private final HttpServerRequest request; - protected HttpServerResponse response; - private ByteBuf pooledBuffer; + private final HttpServerResponse response; + + private final AppendBuffer appendBuffer; private boolean committed; + + private boolean closed; protected boolean waitingForDrain; protected boolean drainHandlerRegistered; - private boolean closed; protected boolean first = true; protected Throwable throwable; private ByteArrayOutputStream overflow; - private Object LOCK = new Object(); - - /** - * Construct a new instance.No write timeout is configured. - * - * @param request - * @param response - */ - public VertxServletOutputStream(HttpServerRequest request, HttpServerResponse response) { - this.response = response; + public VertxServletOutputStream( + HttpServerRequest request, + HttpServerResponse response, + RoutingContext context, + int outputBufferSize, + int minChunkSize) { this.request = request; - request.response().exceptionHandler(event -> { - throwable = event; - log.debugf(event, "IO Exception "); - request.connection().close(); - synchronized (LOCK) { - if (waitingForDrain) { - LOCK.notifyAll(); + this.response = response; + this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, + minChunkSize, + outputBufferSize); + request.response().exceptionHandler(new Handler() { + @Override + public void handle(Throwable event) { + throwable = event; + log.debugf(event, "IO Exception "); + //TODO: do we need this? + terminateResponse(); + request.connection().close(); + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } } } }); - request.response().endHandler(unused -> { - synchronized (LOCK) { - if (waitingForDrain) { - LOCK.notifyAll(); + context.addEndHandler(new Handler>() { + @Override + public void handle(AsyncResult event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } } + terminateResponse(); } }); } - /** - * {@inheritDoc} - */ - @Override - public void write(final int b) throws IOException { - write(new byte[] { (byte) b }, 0, 1); - } + public void terminateResponse() { - /** - * {@inheritDoc} - */ - @Override - public void write(final byte[] b) throws IOException { - write(b, 0, b.length); } - /** - * {@inheritDoc} - */ - @Override - public void write(final byte[] b, final int off, final int len) throws IOException { - if (len < 1) { - return; - } - if (closed) { - throw new IOException("Stream is closed"); - } - - int rem = len; - int idx = off; - ByteBuf buffer = pooledBuffer; - try { - if (buffer == null) { - pooledBuffer = buffer = PooledByteBufAllocator.DEFAULT.directBuffer(); - } - while (rem > 0) { - int toWrite = Math.min(rem, buffer.writableBytes()); - buffer.writeBytes(b, idx, toWrite); - rem -= toWrite; - idx += toWrite; - if (!buffer.isWritable()) { - ByteBuf tmpBuf = buffer; - this.pooledBuffer = buffer = PooledByteBufAllocator.DEFAULT.directBuffer(); - writeBlocking(tmpBuf, false); - } - } - } catch (IOException | RuntimeException e) { - if (buffer != null && buffer.refCnt() > 0) { - buffer.release(); - } - throw new IOException(e); - } - } - - public void writeBlocking(ByteBuf buffer, boolean finished) throws IOException { - prepareWrite(buffer, finished); - write(buffer, finished); - } - - private void prepareWrite(ByteBuf buffer, boolean finished) { - if (!committed) { - committed = true; - if (finished) { - if (buffer == null) { - response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "0"); - } else { - response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "" + buffer.readableBytes()); - } - } else if (!request.response().headers().contains(HttpHeaderNames.CONTENT_LENGTH)) { - request.response().setChunked(true); - } - } + Buffer createBuffer(ByteBuf data) { + return new VertxBufferImpl(data); } public void write(ByteBuf data, boolean last) throws IOException { if (last && data == null) { - request.response().end(); + request.response().end((Handler>) null); return; } //do all this in the same lock - synchronized (LOCK) { + synchronized (request.connection()) { try { boolean bufferRequired = awaitWriteable() || (overflow != null && overflow.size() > 0); if (bufferRequired) { @@ -154,25 +101,23 @@ public void write(ByteBuf data, boolean last) throws IOException { if (overflow == null) { overflow = new ByteArrayOutputStream(); } - if (data != null && data.hasArray()) { + if (data.hasArray()) { overflow.write(data.array(), data.arrayOffset() + data.readerIndex(), data.readableBytes()); - } else if (data != null) { + } else { data.getBytes(data.readerIndex(), overflow, data.readableBytes()); } if (last) { closed = true; } - if (data != null) { - data.release(); - } + data.release(); } else { if (last) { - request.response().end(createBuffer(data)); + request.response().end(createBuffer(data), null); } else { - request.response().write(createBuffer(data)); + request.response().write(createBuffer(data), null); } } - } catch (IOException | RuntimeException e) { + } catch (Exception e) { if (data != null && data.refCnt() > 0) { data.release(); } @@ -189,7 +134,7 @@ private boolean awaitWriteable() throws IOException { first = false; return false; } - assert Thread.holdsLock(LOCK); + assert Thread.holdsLock(request.connection()); while (request.response().writeQueueFull()) { if (throwable != null) { throw new IOException(throwable); @@ -200,7 +145,7 @@ private boolean awaitWriteable() throws IOException { registerDrainHandler(); try { waitingForDrain = true; - LOCK.wait(); + request.connection().wait(); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } finally { @@ -213,18 +158,23 @@ private boolean awaitWriteable() throws IOException { private void registerDrainHandler() { if (!drainHandlerRegistered) { drainHandlerRegistered = true; - Handler handler = event -> { - synchronized (LOCK) { - if (waitingForDrain) { - LOCK.notifyAll(); - } - if (overflow != null && overflow.size() > 0) { - if (closed) { - request.response().end(Buffer.buffer(overflow.toByteArray())); - } else { - request.response().write(Buffer.buffer(overflow.toByteArray())); + Handler handler = new Handler() { + @Override + public void handle(Void event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + if (overflow != null) { + if (overflow.size() > 0) { + if (closed) { + request.response().end(Buffer.buffer(overflow.toByteArray()), null); + } else { + request.response().write(Buffer.buffer(overflow.toByteArray()), null); + } + overflow.reset(); + } } - overflow.reset(); } } }; @@ -233,8 +183,70 @@ private void registerDrainHandler() { } } - Buffer createBuffer(ByteBuf data) { - return new VertxBufferImpl(data); + /** + * {@inheritDoc} + */ + @Override + public void write(final int b) throws IOException { + write(new byte[] { (byte) b }, 0, 1); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + if (len < 1) { + return; + } + if (closed) { + throw new IOException("Stream is closed"); + } + + int rem = len; + int idx = off; + try { + while (rem > 0) { + final int written = appendBuffer.append(b, idx, rem); + if (written < rem) { + writeBlocking(appendBuffer.clear(), false); + } + rem -= written; + idx += written; + } + } catch (Exception e) { + throw new IOException(e); + } + } + + public void writeBlocking(ByteBuf buffer, boolean finished) throws IOException { + prepareWrite(buffer, finished); + write(buffer, finished); + } + + private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException { + if (!committed) { + committed = true; + if (finished) { + if (!response.headWritten()) { + if (buffer == null) { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "0"); + } else { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "" + buffer.readableBytes()); + } + } + } else { + request.response().setChunked(true); + } + } } /** @@ -245,15 +257,13 @@ public void flush() throws IOException { if (closed) { throw new IOException("Stream is closed"); } - if (pooledBuffer != null) { - try { - writeBlocking(pooledBuffer, false); - pooledBuffer = null; - } catch (IOException | RuntimeException e) { - pooledBuffer.release(); - pooledBuffer = null; - throw new IOException(e); + try { + var toFlush = appendBuffer.clear(); + if (toFlush != null) { + writeBlocking(toFlush, false); } + } catch (Exception e) { + throw new IOException(e); } } @@ -265,12 +275,11 @@ public void close() throws IOException { if (closed) return; try { - writeBlocking(pooledBuffer, true); - } catch (IOException | RuntimeException e) { + writeBlocking(appendBuffer.clear(), true); + } catch (Exception e) { throw new IOException(e); } finally { closed = true; - pooledBuffer = null; } } @@ -283,4 +292,5 @@ public boolean isReady() { public void setWriteListener(WriteListener writeListener) { throw new UnsupportedOperationException(); } + }