From 6da774af75c5f4dfcc3b4f2dd2675087b82b835e Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Mon, 13 Nov 2023 17:09:30 +0100 Subject: [PATCH] Large responses written slowly due to allocating too small buffers fix #1066 --- .../soap-service/application.properties | 2 + .../ROOT/pages/includes/quarkus-cxf.adoc | 38 ++ .../CxfEndpointImplementationProcessor.java | 4 +- .../java/io/quarkiverse/cxf/CXFRecorder.java | 9 +- .../io/quarkiverse/cxf/CxfFixedConfig.java | 18 + .../cxf/transport/AppendBuffer.java | 197 +++++++ .../quarkiverse/cxf/transport/CxfHandler.java | 10 +- .../transport/VertxHttpServletResponse.java | 10 +- .../transport/VertxServletOutputStream.java | 274 +++++----- .../cxf/it/server/LargeEntityService.java | 18 + .../cxf/it/server/LargeEntityServiceImpl.java | 36 ++ .../src/main/resources/application.properties | 2 + .../cxf/it/server/ChunkedTest.java | 498 ++++++++++++++++++ 13 files changed, 978 insertions(+), 138 deletions(-) create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java create mode 100644 integration-tests/server/src/main/java/io/quarkiverse/cxf/it/server/LargeEntityService.java create mode 100644 integration-tests/server/src/main/java/io/quarkiverse/cxf/it/server/LargeEntityServiceImpl.java create mode 100644 integration-tests/server/src/test/java/io/quarkiverse/cxf/it/server/ChunkedTest.java diff --git a/docs/modules/ROOT/examples/soap-service/application.properties b/docs/modules/ROOT/examples/soap-service/application.properties index 0a7a774b1..43457d6c4 100644 --- a/docs/modules/ROOT/examples/soap-service/application.properties +++ b/docs/modules/ROOT/examples/soap-service/application.properties @@ -24,6 +24,8 @@ quarkus.cxf.endpoint."/greeting-soap-handler".handlers = io.quarkiverse.cxf.it.s quarkus.cxf.endpoint."/faulty-hello".implementor = io.quarkiverse.cxf.it.server.FaultyHelloServiceImpl +quarkus.cxf.endpoint."/large-entity".implementor = io.quarkiverse.cxf.it.server.LargeEntityServiceImpl + # # Providers # diff --git a/docs/modules/ROOT/pages/includes/quarkus-cxf.adoc b/docs/modules/ROOT/pages/includes/quarkus-cxf.adoc index f659677a4..12153bac0 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-cxf.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-cxf.adoc @@ -517,6 +517,44 @@ endif::add-copy-button-to-env-var[] | +a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.min-chunk-size]]`link:#quarkus-cxf_quarkus.cxf.min-chunk-size[quarkus.cxf.min-chunk-size]` + + +[.description] +-- +The size in bytes of the chunks of memory allocated when writing data. + +This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations of the application. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++QUARKUS_CXF_MIN_CHUNK_SIZE+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++QUARKUS_CXF_MIN_CHUNK_SIZE+++` +endif::add-copy-button-to-env-var[] +--|int +|`128` + + +a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.output-buffer-size]]`link:#quarkus-cxf_quarkus.cxf.output-buffer-size[quarkus.cxf.output-buffer-size]` + + +[.description] +-- +The size of the output stream response buffer in bytes. If a response is larger than this and no content-length is provided then the response will be chunked. + +Larger values may give slight performance increases for large responses, at the expense of more memory usage. + +ifdef::add-copy-button-to-env-var[] +Environment variable: env_var_with_copy_button:+++QUARKUS_CXF_OUTPUT_BUFFER_SIZE+++[] +endif::add-copy-button-to-env-var[] +ifndef::add-copy-button-to-env-var[] +Environment variable: `+++QUARKUS_CXF_OUTPUT_BUFFER_SIZE+++` +endif::add-copy-button-to-env-var[] +--|int +|`8191` + + a| [[quarkus-cxf_quarkus.cxf.logging.enabled-for]]`link:#quarkus-cxf_quarkus.cxf.logging.enabled-for[quarkus.cxf.logging.enabled-for]` diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java index 6e7a65663..c69bc92b5 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java @@ -18,6 +18,7 @@ import io.quarkiverse.cxf.CXFRecorder; import io.quarkiverse.cxf.CXFServletInfos; import io.quarkiverse.cxf.CxfConfig; +import io.quarkiverse.cxf.CxfFixedConfig; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; import io.quarkus.arc.deployment.BeanContainerBuildItem; import io.quarkus.deployment.annotations.BuildProducer; @@ -123,6 +124,7 @@ void startRoute(CXFRecorder recorder, HttpBuildTimeConfig httpBuildTimeConfig, HttpConfiguration httpConfiguration, CxfBuildTimeConfig cxfBuildTimeConfig, + CxfFixedConfig fixedConfig, CxfConfig cxfConfig) { final RuntimeValue infos = recorder.createInfos(cxfBuildTimeConfig.path(), httpBuildTimeConfig.rootPath); @@ -140,7 +142,7 @@ void startRoute(CXFRecorder recorder, } if (!requestors.isEmpty()) { final Handler handler = recorder.initServer(infos, beanContainer.getValue(), - httpConfiguration); + httpConfiguration, fixedConfig); final String mappingPath = getMappingPath(cxfBuildTimeConfig.path()); LOGGER.infof("Mapping a Vert.x handler for CXF to %s as requested by %s", mappingPath, requestors); routes.produce(RouteBuildItem.builder() diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java index 668828c5c..a9e254e7c 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java @@ -128,12 +128,15 @@ public RuntimeValue createInfos(String path, String contextPath return new RuntimeValue<>(infos); } - public Handler initServer(RuntimeValue infos, BeanContainer beanContainer, - HttpConfiguration httpConfiguration) { + public Handler initServer( + RuntimeValue infos, + BeanContainer beanContainer, + HttpConfiguration httpConfiguration, + CxfFixedConfig fixedConfig) { LOGGER.trace("init server"); // There may be a better way to handle this CxfJsonRPCService.setServletInfos(infos.getValue()); - return new CxfHandler(infos.getValue(), beanContainer, httpConfiguration); + return new CxfHandler(infos.getValue(), beanContainer, httpConfiguration, fixedConfig); } public void resetDestinationRegistry(ShutdownContext context) { diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfFixedConfig.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfFixedConfig.java index c746fd4be..a17b27dce 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfFixedConfig.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfFixedConfig.java @@ -17,6 +17,24 @@ @ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED) public interface CxfFixedConfig { + /** + * The size in bytes of the chunks of memory allocated when writing data. + *

+ * This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations + * of the application. + */ + @WithDefault("128") + int minChunkSize(); + + /** + * The size of the output stream response buffer in bytes. If a response is larger than this and no content-length + * is provided then the response will be chunked. + *

+ * Larger values may give slight performance increases for large responses, at the expense of more memory usage. + */ + @WithDefault("8191") + int outputBufferSize(); + /** * The build time part of the client configuration. */ diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java new file mode 100644 index 000000000..81bae1e61 --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/AppendBuffer.java @@ -0,0 +1,197 @@ +package io.quarkiverse.cxf.transport; + +import java.util.ArrayDeque; +import java.util.Objects; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; + +/** + * Adapted from + * AppendBuffer + * from Quarkus. + * + * It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.
+ * In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}. + */ +final class AppendBuffer { + private final ByteBufAllocator allocator; + + private final int minChunkSize; + private final int capacity; + private ByteBuf buffer; + private ArrayDeque otherBuffers; + private int size; + + private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) { + this.allocator = allocator; + this.minChunkSize = Math.min(minChunkSize, capacity); + this.capacity = capacity; + } + + /** + * This buffer append data in a single eagerly allocated {@link ByteBuf}. + */ + public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, capacity, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, 0, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or + * as each {@code len}, if greater than it.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) { + return new AppendBuffer(allocator, minChunkSize, capacity); + } + + private ByteBuf lastBuffer() { + if (otherBuffers == null || otherBuffers.isEmpty()) { + return buffer; + } + return otherBuffers.peekLast(); + } + + /** + * It returns how many bytes have been appended
+ * If returns a value different from {@code len}, is it required to invoke {@link #clear} + * that would refill the available capacity till {@link #capacity()} + */ + public int append(byte[] bytes, int off, int len) { + Objects.requireNonNull(bytes); + if (len == 0) { + return 0; + } + int alreadyWritten = 0; + if (minChunkSize > 0) { + var lastBuffer = lastBuffer(); + if (lastBuffer != null) { + int availableOnLast = lastBuffer.writableBytes(); + if (availableOnLast > 0) { + int toWrite = Math.min(len, availableOnLast); + lastBuffer.writeBytes(bytes, off, toWrite); + size += toWrite; + len -= toWrite; + // we stop if there's no more to append + if (len == 0) { + return toWrite; + } + off += toWrite; + alreadyWritten = toWrite; + } + } + } + final int availableCapacity = capacity - size; + if (availableCapacity == 0) { + return alreadyWritten; + } + // we can still write some + int toWrite = Math.min(len, availableCapacity); + assert toWrite > 0; + final int chunkCapacity; + if (minChunkSize > 0) { + // Cannot allocate less than minChunkSize, till the limit of capacity left + chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity); + } else { + chunkCapacity = toWrite; + } + var tmpBuf = allocator.directBuffer(chunkCapacity); + try { + tmpBuf.writeBytes(bytes, off, toWrite); + } catch (Throwable t) { + tmpBuf.release(); + throw t; + } + if (buffer == null) { + buffer = tmpBuf; + } else { + boolean resetOthers = false; + try { + if (otherBuffers == null) { + otherBuffers = new ArrayDeque<>(); + resetOthers = true; + } + otherBuffers.add(tmpBuf); + } catch (Throwable t) { + rollback(alreadyWritten, tmpBuf, resetOthers); + throw t; + } + } + size += toWrite; + return toWrite + alreadyWritten; + } + + private void rollback(int alreadyWritten, ByteBuf tmpBuf, boolean resetOthers) { + tmpBuf.release(); + if (resetOthers) { + otherBuffers = null; + } + if (alreadyWritten > 0) { + var last = lastBuffer(); + last.writerIndex(last.writerIndex() - alreadyWritten); + size -= alreadyWritten; + assert last.writerIndex() > 0; + } + } + + public ByteBuf clear() { + var firstBuf = buffer; + if (firstBuf == null) { + return null; + } + var others = otherBuffers; + if (others == null || others.isEmpty()) { + size = 0; + buffer = null; + // super fast-path + return firstBuf; + } + return clearBuffers(); + } + + private CompositeByteBuf clearBuffers() { + var firstBuf = buffer; + var others = otherBuffers; + var batch = allocator.compositeDirectBuffer(1 + others.size()); + try { + buffer = null; + size = 0; + batch.addComponent(true, 0, firstBuf); + for (int i = 0, othersCount = others.size(); i < othersCount; i++) { + // if addComponent fail, it takes care of releasing curr and throwing the exception: + batch.addComponent(true, 1 + i, others.poll()); + } + return batch; + } catch (Throwable anyError) { + batch.release(); + releaseOthers(others); + throw anyError; + } + } + + private static void releaseOthers(ArrayDeque others) { + ByteBuf buf; + while ((buf = others.poll()) != null) { + buf.release(); + } + } + + public int capacity() { + return capacity; + } + + public int availableCapacity() { + return capacity - size; + } + +} diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/CxfHandler.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/CxfHandler.java index ef2c3f69e..882093726 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/CxfHandler.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/CxfHandler.java @@ -26,6 +26,7 @@ import io.quarkiverse.cxf.CXFServletInfo; import io.quarkiverse.cxf.CXFServletInfos; import io.quarkiverse.cxf.CxfConfig; +import io.quarkiverse.cxf.CxfFixedConfig; import io.quarkiverse.cxf.QuarkusRuntimeJaxWsServiceFactoryBean; import io.quarkiverse.cxf.logging.LoggingFactoryCustomizer; import io.quarkus.arc.ManagedContext; @@ -51,12 +52,15 @@ public class CxfHandler implements Handler { private final IdentityProviderManager identityProviderManager; private final CurrentVertxRequest currentVertxRequest; private final HttpConfiguration httpConfiguration; + private final int outputBufferSize; + private final int minChunkSize; private static final String X_FORWARDED_PROTO_HEADER = "X-Forwarded-Proto"; private static final String X_FORWARDED_FOR_HEADER = "X-Forwarded-For"; private static final String X_FORWARDED_PORT_HEADER = "X-Forwarded-Port"; - public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, HttpConfiguration httpConfiguration) { + public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, HttpConfiguration httpConfiguration, + CxfFixedConfig fixedConfig) { LOGGER.trace("CxfHandler created"); this.beanContainer = beanContainer; this.httpConfiguration = httpConfiguration; @@ -70,6 +74,8 @@ public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, this.bus = BusFactory.getDefaultBus(); this.loader = this.bus.getExtension(ClassLoader.class); + this.outputBufferSize = fixedConfig.outputBufferSize(); + this.minChunkSize = fixedConfig.minChunkSize(); LOGGER.trace("load destination"); DestinationFactoryManager dfm = this.bus.getExtension(DestinationFactoryManager.class); @@ -219,7 +225,7 @@ private void process(RoutingContext event) { currentVertxRequest.setCurrent(event); try { HttpServletRequest req = new VertxHttpServletRequest(event, contextPath, servletPath); - VertxHttpServletResponse resp = new VertxHttpServletResponse(event); + VertxHttpServletResponse resp = new VertxHttpServletResponse(event, outputBufferSize, minChunkSize); req = checkXForwardedHeaders(req); controller.invoke(req, resp); resp.end(); diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java index dd30756e6..bc5a56e60 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxHttpServletResponse.java @@ -17,14 +17,18 @@ public class VertxHttpServletResponse implements HttpServletResponse { protected final RoutingContext context; private final HttpServerRequest request; protected final HttpServerResponse response; + private final int outputBufferSize; + private final int minChunkSize; private VertxServletOutputStream os; private PrintWriter printWriter; - public VertxHttpServletResponse(RoutingContext context) { + public VertxHttpServletResponse(RoutingContext context, int outputBufferSize, int minChunkSize) { this.request = context.request(); this.response = context.response(); this.context = context; - this.os = new VertxServletOutputStream(request, response); + this.outputBufferSize = outputBufferSize; + this.minChunkSize = minChunkSize; + this.os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize); } @Override @@ -187,7 +191,7 @@ public void resetBuffer() { } catch (IOException e) { } } - os = new VertxServletOutputStream(request, response); + os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize); } @Override diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java index fa078fe7e..9380da4d5 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/transport/VertxServletOutputStream.java @@ -13,139 +13,92 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.handler.codec.http.HttpHeaderNames; import io.quarkus.vertx.core.runtime.VertxBufferImpl; +import io.vertx.core.AsyncResult; import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.RoutingContext; +/** + * Adapted from + * ResteasyReactiveOutputStream + * from Quarkus. + */ public class VertxServletOutputStream extends ServletOutputStream { private static final Logger log = Logger.getLogger(VertxServletOutputStream.class); private final HttpServerRequest request; - protected HttpServerResponse response; - private ByteBuf pooledBuffer; + private final HttpServerResponse response; + + private final AppendBuffer appendBuffer; private boolean committed; + + private boolean closed; protected boolean waitingForDrain; protected boolean drainHandlerRegistered; - private boolean closed; protected boolean first = true; protected Throwable throwable; private ByteArrayOutputStream overflow; - private Object LOCK = new Object(); - - /** - * Construct a new instance.No write timeout is configured. - * - * @param request - * @param response - */ - public VertxServletOutputStream(HttpServerRequest request, HttpServerResponse response) { - this.response = response; + public VertxServletOutputStream( + HttpServerRequest request, + HttpServerResponse response, + RoutingContext context, + int outputBufferSize, + int minChunkSize) { this.request = request; - request.response().exceptionHandler(event -> { - throwable = event; - log.debugf(event, "IO Exception "); - request.connection().close(); - synchronized (LOCK) { - if (waitingForDrain) { - LOCK.notifyAll(); + this.response = response; + this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, + minChunkSize, + outputBufferSize); + request.response().exceptionHandler(new Handler() { + @Override + public void handle(Throwable event) { + throwable = event; + log.debugf(event, "IO Exception "); + //TODO: do we need this? + terminateResponse(); + request.connection().close(); + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } } } }); - request.response().endHandler(unused -> { - synchronized (LOCK) { - if (waitingForDrain) { - LOCK.notifyAll(); + context.addEndHandler(new Handler>() { + @Override + public void handle(AsyncResult event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } } + terminateResponse(); } }); } - /** - * {@inheritDoc} - */ - @Override - public void write(final int b) throws IOException { - write(new byte[] { (byte) b }, 0, 1); - } + public void terminateResponse() { - /** - * {@inheritDoc} - */ - @Override - public void write(final byte[] b) throws IOException { - write(b, 0, b.length); } - /** - * {@inheritDoc} - */ - @Override - public void write(final byte[] b, final int off, final int len) throws IOException { - if (len < 1) { - return; - } - if (closed) { - throw new IOException("Stream is closed"); - } - - int rem = len; - int idx = off; - ByteBuf buffer = pooledBuffer; - try { - if (buffer == null) { - pooledBuffer = buffer = PooledByteBufAllocator.DEFAULT.directBuffer(); - } - while (rem > 0) { - int toWrite = Math.min(rem, buffer.writableBytes()); - buffer.writeBytes(b, idx, toWrite); - rem -= toWrite; - idx += toWrite; - if (!buffer.isWritable()) { - ByteBuf tmpBuf = buffer; - this.pooledBuffer = buffer = PooledByteBufAllocator.DEFAULT.directBuffer(); - writeBlocking(tmpBuf, false); - } - } - } catch (IOException | RuntimeException e) { - if (buffer != null && buffer.refCnt() > 0) { - buffer.release(); - } - throw new IOException(e); - } - } - - public void writeBlocking(ByteBuf buffer, boolean finished) throws IOException { - prepareWrite(buffer, finished); - write(buffer, finished); - } - - private void prepareWrite(ByteBuf buffer, boolean finished) { - if (!committed) { - committed = true; - if (finished) { - if (buffer == null) { - response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "0"); - } else { - response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "" + buffer.readableBytes()); - } - } else if (!request.response().headers().contains(HttpHeaderNames.CONTENT_LENGTH)) { - request.response().setChunked(true); - } - } + Buffer createBuffer(ByteBuf data) { + return new VertxBufferImpl(data); } public void write(ByteBuf data, boolean last) throws IOException { if (last && data == null) { - request.response().end(); + request.response().end((Handler>) null); return; } //do all this in the same lock - synchronized (LOCK) { + synchronized (request.connection()) { try { boolean bufferRequired = awaitWriteable() || (overflow != null && overflow.size() > 0); if (bufferRequired) { @@ -154,25 +107,23 @@ public void write(ByteBuf data, boolean last) throws IOException { if (overflow == null) { overflow = new ByteArrayOutputStream(); } - if (data != null && data.hasArray()) { + if (data.hasArray()) { overflow.write(data.array(), data.arrayOffset() + data.readerIndex(), data.readableBytes()); - } else if (data != null) { + } else { data.getBytes(data.readerIndex(), overflow, data.readableBytes()); } if (last) { closed = true; } - if (data != null) { - data.release(); - } + data.release(); } else { if (last) { - request.response().end(createBuffer(data)); + request.response().end(createBuffer(data), null); } else { - request.response().write(createBuffer(data)); + request.response().write(createBuffer(data), null); } } - } catch (IOException | RuntimeException e) { + } catch (Exception e) { if (data != null && data.refCnt() > 0) { data.release(); } @@ -189,7 +140,7 @@ private boolean awaitWriteable() throws IOException { first = false; return false; } - assert Thread.holdsLock(LOCK); + assert Thread.holdsLock(request.connection()); while (request.response().writeQueueFull()) { if (throwable != null) { throw new IOException(throwable); @@ -200,7 +151,7 @@ private boolean awaitWriteable() throws IOException { registerDrainHandler(); try { waitingForDrain = true; - LOCK.wait(); + request.connection().wait(); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } finally { @@ -213,18 +164,23 @@ private boolean awaitWriteable() throws IOException { private void registerDrainHandler() { if (!drainHandlerRegistered) { drainHandlerRegistered = true; - Handler handler = event -> { - synchronized (LOCK) { - if (waitingForDrain) { - LOCK.notifyAll(); - } - if (overflow != null && overflow.size() > 0) { - if (closed) { - request.response().end(Buffer.buffer(overflow.toByteArray())); - } else { - request.response().write(Buffer.buffer(overflow.toByteArray())); + Handler handler = new Handler() { + @Override + public void handle(Void event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + if (overflow != null) { + if (overflow.size() > 0) { + if (closed) { + request.response().end(Buffer.buffer(overflow.toByteArray()), null); + } else { + request.response().write(Buffer.buffer(overflow.toByteArray()), null); + } + overflow.reset(); + } } - overflow.reset(); } } }; @@ -233,8 +189,70 @@ private void registerDrainHandler() { } } - Buffer createBuffer(ByteBuf data) { - return new VertxBufferImpl(data); + /** + * {@inheritDoc} + */ + @Override + public void write(final int b) throws IOException { + write(new byte[] { (byte) b }, 0, 1); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + if (len < 1) { + return; + } + if (closed) { + throw new IOException("Stream is closed"); + } + + int rem = len; + int idx = off; + try { + while (rem > 0) { + final int written = appendBuffer.append(b, idx, rem); + if (written < rem) { + writeBlocking(appendBuffer.clear(), false); + } + rem -= written; + idx += written; + } + } catch (Exception e) { + throw new IOException(e); + } + } + + public void writeBlocking(ByteBuf buffer, boolean finished) throws IOException { + prepareWrite(buffer, finished); + write(buffer, finished); + } + + private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException { + if (!committed) { + committed = true; + if (finished) { + if (!response.headWritten()) { + if (buffer == null) { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "0"); + } else { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "" + buffer.readableBytes()); + } + } + } else { + request.response().setChunked(true); + } + } } /** @@ -245,15 +263,13 @@ public void flush() throws IOException { if (closed) { throw new IOException("Stream is closed"); } - if (pooledBuffer != null) { - try { - writeBlocking(pooledBuffer, false); - pooledBuffer = null; - } catch (IOException | RuntimeException e) { - pooledBuffer.release(); - pooledBuffer = null; - throw new IOException(e); + try { + var toFlush = appendBuffer.clear(); + if (toFlush != null) { + writeBlocking(toFlush, false); } + } catch (Exception e) { + throw new IOException(e); } } @@ -265,12 +281,11 @@ public void close() throws IOException { if (closed) return; try { - writeBlocking(pooledBuffer, true); - } catch (IOException | RuntimeException e) { + writeBlocking(appendBuffer.clear(), true); + } catch (Exception e) { throw new IOException(e); } finally { closed = true; - pooledBuffer = null; } } @@ -283,4 +298,5 @@ public boolean isReady() { public void setWriteListener(WriteListener writeListener) { throw new UnsupportedOperationException(); } + } diff --git a/integration-tests/server/src/main/java/io/quarkiverse/cxf/it/server/LargeEntityService.java b/integration-tests/server/src/main/java/io/quarkiverse/cxf/it/server/LargeEntityService.java new file mode 100644 index 000000000..c5b2ca6a5 --- /dev/null +++ b/integration-tests/server/src/main/java/io/quarkiverse/cxf/it/server/LargeEntityService.java @@ -0,0 +1,18 @@ +package io.quarkiverse.cxf.it.server; + +import jakarta.jws.WebMethod; +import jakarta.jws.WebParam; +import jakarta.jws.WebService; + +@WebService(serviceName = "LargeEntityService", name = "LargeEntityService") +public interface LargeEntityService { + + @WebMethod + int outputBufferSize(); + + @WebMethod + String[] items( + @WebParam(name = "count") int count, + @WebParam(name = "itemLength") int itemLength); + +} diff --git a/integration-tests/server/src/main/java/io/quarkiverse/cxf/it/server/LargeEntityServiceImpl.java b/integration-tests/server/src/main/java/io/quarkiverse/cxf/it/server/LargeEntityServiceImpl.java new file mode 100644 index 000000000..a67b78708 --- /dev/null +++ b/integration-tests/server/src/main/java/io/quarkiverse/cxf/it/server/LargeEntityServiceImpl.java @@ -0,0 +1,36 @@ +package io.quarkiverse.cxf.it.server; + +import jakarta.jws.WebService; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +/** + * The simplest Hello service implementation. + */ +@WebService(serviceName = "LargeEntityService", name = "LargeEntityService") +public class LargeEntityServiceImpl implements LargeEntityService { + + @ConfigProperty(name = "quarkus.cxf.output-buffer-size") + int outputBufferSize; + + @Override + public int outputBufferSize() { + return outputBufferSize; + } + + @Override + public String[] items(int count, int itemLength) { + final String[] items = new String[count]; + final StringBuilder sb = new StringBuilder(itemLength); + for (int i = 0; i < itemLength; i++) { + char ch = Character.forDigit(i % 10, 10); + sb.append(ch); + } + final String str = sb.toString(); + for (int i = 0; i < count; i++) { + items[i] = str; + } + return items; + } + +} diff --git a/integration-tests/server/src/main/resources/application.properties b/integration-tests/server/src/main/resources/application.properties index 0a7a774b1..43457d6c4 100644 --- a/integration-tests/server/src/main/resources/application.properties +++ b/integration-tests/server/src/main/resources/application.properties @@ -24,6 +24,8 @@ quarkus.cxf.endpoint."/greeting-soap-handler".handlers = io.quarkiverse.cxf.it.s quarkus.cxf.endpoint."/faulty-hello".implementor = io.quarkiverse.cxf.it.server.FaultyHelloServiceImpl +quarkus.cxf.endpoint."/large-entity".implementor = io.quarkiverse.cxf.it.server.LargeEntityServiceImpl + # # Providers # diff --git a/integration-tests/server/src/test/java/io/quarkiverse/cxf/it/server/ChunkedTest.java b/integration-tests/server/src/test/java/io/quarkiverse/cxf/it/server/ChunkedTest.java new file mode 100644 index 000000000..b6b8e56ef --- /dev/null +++ b/integration-tests/server/src/test/java/io/quarkiverse/cxf/it/server/ChunkedTest.java @@ -0,0 +1,498 @@ +package io.quarkiverse.cxf.it.server; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CodingErrorAction; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.ConnectionClosedException; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.MalformedChunkCodingException; +import org.apache.http.TruncatedChunkException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.config.ConnectionConfig; +import org.apache.http.config.MessageConstraints; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.HttpConnectionFactory; +import org.apache.http.conn.ManagedHttpClientConnection; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentLengthStrategy; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +import org.apache.http.impl.conn.DefaultHttpResponseParserFactory; +import org.apache.http.impl.conn.DefaultManagedHttpClientConnection; +import org.apache.http.impl.conn.ManagedHttpClientConnectionFactory; +import org.apache.http.impl.entity.LaxContentLengthStrategy; +import org.apache.http.impl.entity.StrictContentLengthStrategy; +import org.apache.http.impl.io.AbstractMessageParser; +import org.apache.http.impl.io.ContentLengthInputStream; +import org.apache.http.impl.io.DefaultHttpRequestWriterFactory; +import org.apache.http.impl.io.EmptyInputStream; +import org.apache.http.impl.io.IdentityInputStream; +import org.apache.http.io.BufferInfo; +import org.apache.http.io.HttpMessageParserFactory; +import org.apache.http.io.HttpMessageWriterFactory; +import org.apache.http.io.SessionInputBuffer; +import org.apache.http.util.Args; +import org.apache.http.util.CharArrayBuffer; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkiverse.cxf.test.QuarkusCxfClientTestUtil; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTest +public class ChunkedTest { + + @Test + public void chunked() throws IOException { + + final int outputBufferSize = QuarkusCxfClientTestUtil.getClient(LargeEntityService.class, "/soap/large-entity") + .outputBufferSize(); + + Assertions.assertThat(outputBufferSize).isEqualTo(8191); + + List chunks = new CopyOnWriteArrayList<>(); + + BasicHttpClientConnectionManager connManager = new BasicHttpClientConnectionManager( + getDefaultRegistry(), + new CustomManagedHttpClientConnectionFactory(chunks::add), + null, + null); + + try (CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connManager).build()) { + HttpPost httpPost = new HttpPost("http://localhost:8081/soap/large-entity"); + byte[] body = largeRequest(1024, 1024); + httpPost.setEntity(new ByteArrayEntity(body, org.apache.http.entity.ContentType.TEXT_XML)); + + try (CloseableHttpResponse response = httpClient.execute(httpPost)) { + int statusCode = response.getStatusLine().getStatusCode(); + + Assertions.assertThat(statusCode).isEqualTo(200); + + HttpEntity entity = response.getEntity(); + InputStream stream = entity.getContent(); + + byte[] payload = stream.readAllBytes(); + final String payloadStr = new String(payload, StandardCharsets.UTF_8); + Assertions.assertThat(payloadStr).startsWith( + "01234567890"); + Assertions.assertThat(payload.length).isEqualTo(1066181); + + /* We add one for the final zero length chunk */ + final int expectedChunksCount = 1 + (int) Math.ceil((double) payload.length / outputBufferSize); + Assertions.assertThat(chunks.size()).isLessThanOrEqualTo(expectedChunksCount); + } + } + } + + private static Registry getDefaultRegistry() { + return RegistryBuilder. create() + .register("http", PlainConnectionSocketFactory.getSocketFactory()) + .register("https", SSLConnectionSocketFactory.getSocketFactory()) + .build(); + } + + private byte[] largeRequest(int count, int itemLength) { + return ("\n" + + " \n" + + " \n" + + " " + count + "\n" + + " " + itemLength + "\n" + + " \n" + + " \n" + + "").getBytes(StandardCharsets.UTF_8); + } + + /* + * Below are some custom HTTP Client classes to be able to count chunks. + * It is HTTP 4.5 because that one is brought by REST-assured + */ + + static class CustomManagedHttpClientConnectionFactory + implements HttpConnectionFactory { + + private static final AtomicLong COUNTER = new AtomicLong(); + + public static final ManagedHttpClientConnectionFactory INSTANCE = new ManagedHttpClientConnectionFactory(); + + private final Log log = LogFactory.getLog(DefaultManagedHttpClientConnection.class); + private final Log headerLog = LogFactory.getLog("org.apache.http.headers"); + private final Log wireLog = LogFactory.getLog("org.apache.http.wire"); + + private final HttpMessageWriterFactory requestWriterFactory; + private final HttpMessageParserFactory responseParserFactory; + private final ContentLengthStrategy incomingContentStrategy; + private final ContentLengthStrategy outgoingContentStrategy; + private final Consumer chunkConsumer; + + /** + * @since 4.4 + */ + public CustomManagedHttpClientConnectionFactory( + final HttpMessageWriterFactory requestWriterFactory, + final HttpMessageParserFactory responseParserFactory, + final ContentLengthStrategy incomingContentStrategy, + final ContentLengthStrategy outgoingContentStrategy, + Consumer chunkConsumer) { + super(); + this.requestWriterFactory = requestWriterFactory != null ? requestWriterFactory + : DefaultHttpRequestWriterFactory.INSTANCE; + this.responseParserFactory = responseParserFactory != null ? responseParserFactory + : DefaultHttpResponseParserFactory.INSTANCE; + this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy + : LaxContentLengthStrategy.INSTANCE; + this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy + : StrictContentLengthStrategy.INSTANCE; + this.chunkConsumer = chunkConsumer; + } + + public CustomManagedHttpClientConnectionFactory(Consumer chunkConsumer) { + this(null, null, null, null, chunkConsumer); + } + + @Override + public ManagedHttpClientConnection create(HttpRoute route, ConnectionConfig config) { + final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT; + CharsetDecoder charDecoder = null; + CharsetEncoder charEncoder = null; + final Charset charset = cconfig.getCharset(); + final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null + ? cconfig.getMalformedInputAction() + : CodingErrorAction.REPORT; + final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null + ? cconfig.getUnmappableInputAction() + : CodingErrorAction.REPORT; + if (charset != null) { + charDecoder = charset.newDecoder(); + charDecoder.onMalformedInput(malformedInputAction); + charDecoder.onUnmappableCharacter(unmappableInputAction); + charEncoder = charset.newEncoder(); + charEncoder.onMalformedInput(malformedInputAction); + charEncoder.onUnmappableCharacter(unmappableInputAction); + } + final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement()); + return new DefaultManagedHttpClientConnection( + id, + cconfig.getBufferSize(), + cconfig.getFragmentSizeHint(), + charDecoder, + charEncoder, + cconfig.getMessageConstraints(), + incomingContentStrategy, + outgoingContentStrategy, + requestWriterFactory, + responseParserFactory) { + + @Override + protected InputStream createInputStream( + final long len, + final SessionInputBuffer inBuffer) { + if (len == ContentLengthStrategy.CHUNKED) { + return new CountingChunkedInputStream(inBuffer, null, chunkConsumer); + } else if (len == ContentLengthStrategy.IDENTITY) { + return new IdentityInputStream(inBuffer); + } else if (len == 0L) { + return EmptyInputStream.INSTANCE; + } else { + return new ContentLengthInputStream(inBuffer, len); + } + } + }; + } + + } + + static class CountingChunkedInputStream extends InputStream { + + private static final int CHUNK_LEN = 1; + private static final int CHUNK_DATA = 2; + private static final int CHUNK_CRLF = 3; + private static final int CHUNK_INVALID = Integer.MAX_VALUE; + + private static final int BUFFER_SIZE = 2048; + + /** The session input buffer */ + private final SessionInputBuffer in; + private final CharArrayBuffer buffer; + private final MessageConstraints constraints; + + private final Consumer chunkConsumer; + + private int state; + + /** The chunk size */ + private long chunkSize; + + /** The current position within the current chunk */ + private long pos; + + /** True if we've reached the end of stream */ + private boolean eof = false; + + /** True if this stream is closed */ + private boolean closed = false; + + private Header[] footers = new Header[] {}; + + /** + * Wraps session input stream and reads chunk coded input. + * + * @param in The session input buffer + * @param constraints Message constraints. If {@code null} + * {@link MessageConstraints#DEFAULT} will be used. + * + * @since 4.4 + */ + public CountingChunkedInputStream(final SessionInputBuffer in, final MessageConstraints constraints, + Consumer chunkConsumer) { + super(); + this.in = Args.notNull(in, "Session input buffer"); + this.pos = 0L; + this.buffer = new CharArrayBuffer(16); + this.constraints = constraints != null ? constraints : MessageConstraints.DEFAULT; + this.state = CHUNK_LEN; + this.chunkConsumer = chunkConsumer; + } + + @Override + public int available() throws IOException { + if (this.in instanceof BufferInfo) { + final int len = ((BufferInfo) this.in).length(); + return (int) Math.min(len, this.chunkSize - this.pos); + } + return 0; + } + + /** + *

+ * Returns all the data in a chunked stream in coalesced form. A chunk + * is followed by a CRLF. The method returns -1 as soon as a chunksize of 0 + * is detected. + *

+ * + *

+ * Trailer headers are read automatically at the end of the stream and + * can be obtained with the getResponseFooters() method. + *

+ * + * @return -1 of the end of the stream has been reached or the next data + * byte + * @throws IOException in case of an I/O error + */ + @Override + public int read() throws IOException { + if (this.closed) { + throw new IOException("Attempted read from closed stream."); + } + if (this.eof) { + return -1; + } + if (state != CHUNK_DATA) { + nextChunk(); + if (this.eof) { + return -1; + } + } + final int b = in.read(); + if (b != -1) { + pos++; + if (pos >= chunkSize) { + state = CHUNK_CRLF; + } + } + return b; + } + + /** + * Read some bytes from the stream. + * + * @param b The byte array that will hold the contents from the stream. + * @param off The offset into the byte array at which bytes will start to be + * placed. + * @param len the maximum number of bytes that can be returned. + * @return The number of bytes returned or -1 if the end of stream has been + * reached. + * @throws IOException in case of an I/O error + */ + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + + if (closed) { + throw new IOException("Attempted read from closed stream."); + } + + if (eof) { + return -1; + } + if (state != CHUNK_DATA) { + nextChunk(); + if (eof) { + return -1; + } + } + final int readLen = in.read(b, off, (int) Math.min(len, chunkSize - pos)); + if (readLen != -1) { + pos += readLen; + if (pos >= chunkSize) { + state = CHUNK_CRLF; + } + return readLen; + } + eof = true; + throw new TruncatedChunkException("Truncated chunk (expected size: %,d; actual size: %,d)", + chunkSize, pos); + } + + /** + * Read some bytes from the stream. + * + * @param b The byte array that will hold the contents from the stream. + * @return The number of bytes returned or -1 if the end of stream has been + * reached. + * @throws IOException in case of an I/O error + */ + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** + * Read the next chunk. + * + * @throws IOException in case of an I/O error + */ + private void nextChunk() throws IOException { + if (state == CHUNK_INVALID) { + throw new MalformedChunkCodingException("Corrupt data stream"); + } + try { + chunkSize = getChunkSize(); + chunkConsumer.accept(chunkSize); + if (chunkSize < 0L) { + throw new MalformedChunkCodingException("Negative chunk size"); + } + state = CHUNK_DATA; + pos = 0L; + if (chunkSize == 0L) { + eof = true; + parseTrailerHeaders(); + } + } catch (final MalformedChunkCodingException ex) { + state = CHUNK_INVALID; + throw ex; + } + } + + /** + * Expects the stream to start with a chunksize in hex with optional + * comments after a semicolon. The line must end with a CRLF: "a3; some + * comment\r\n" Positions the stream at the start of the next line. + */ + private long getChunkSize() throws IOException { + final int st = this.state; + switch (st) { + case CHUNK_CRLF: + this.buffer.clear(); + final int bytesRead1 = this.in.readLine(this.buffer); + if (bytesRead1 == -1) { + throw new MalformedChunkCodingException( + "CRLF expected at end of chunk"); + } + if (!this.buffer.isEmpty()) { + throw new MalformedChunkCodingException( + "Unexpected content at the end of chunk"); + } + state = CHUNK_LEN; + //$FALL-THROUGH$ + case CHUNK_LEN: + this.buffer.clear(); + final int bytesRead2 = this.in.readLine(this.buffer); + if (bytesRead2 == -1) { + throw new ConnectionClosedException( + "Premature end of chunk coded message body: closing chunk expected"); + } + int separator = this.buffer.indexOf(';'); + if (separator < 0) { + separator = this.buffer.length(); + } + final String s = this.buffer.substringTrimmed(0, separator); + try { + return Long.parseLong(s, 16); + } catch (final NumberFormatException e) { + throw new MalformedChunkCodingException("Bad chunk header: " + s); + } + default: + throw new IllegalStateException("Inconsistent codec state"); + } + } + + /** + * Reads and stores the Trailer headers. + * + * @throws IOException in case of an I/O error + */ + private void parseTrailerHeaders() throws IOException { + try { + this.footers = AbstractMessageParser.parseHeaders(in, + constraints.getMaxHeaderCount(), + constraints.getMaxLineLength(), + null); + } catch (final HttpException ex) { + final IOException ioe = new MalformedChunkCodingException("Invalid footer: " + + ex.getMessage()); + ioe.initCause(ex); + throw ioe; + } + } + + /** + * Upon close, this reads the remainder of the chunked message, + * leaving the underlying socket at a position to start reading the + * next response without scanning. + * + * @throws IOException in case of an I/O error + */ + @Override + public void close() throws IOException { + if (!closed) { + try { + if (!eof && state != CHUNK_INVALID) { + // read and discard the remainder of the message + final byte buff[] = new byte[BUFFER_SIZE]; + while (read(buff) >= 0) { + } + } + } finally { + eof = true; + closed = true; + } + } + } + + public Header[] getFooters() { + return this.footers.clone(); + } + + } +}