Skip to content

Commit

Permalink
Large responses written slowly due to allocating too small buffers fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed Nov 16, 2023
1 parent 530e5a4 commit 2c0ff7e
Show file tree
Hide file tree
Showing 13 changed files with 978 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ quarkus.cxf.endpoint."/greeting-soap-handler".handlers = io.quarkiverse.cxf.it.s

quarkus.cxf.endpoint."/faulty-hello".implementor = io.quarkiverse.cxf.it.server.FaultyHelloServiceImpl

quarkus.cxf.endpoint."/large-entity".implementor = io.quarkiverse.cxf.it.server.LargeEntityServiceImpl

#
# Providers
#
Expand Down
38 changes: 38 additions & 0 deletions docs/modules/ROOT/pages/includes/quarkus-cxf.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,44 @@ endif::add-copy-button-to-env-var[]
|


a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.min-chunk-size]]`link:#quarkus-cxf_quarkus.cxf.min-chunk-size[quarkus.cxf.min-chunk-size]`


[.description]
--
The size in bytes of the chunks of memory allocated when writing data.

This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations of the application.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_CXF_MIN_CHUNK_SIZE+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_CXF_MIN_CHUNK_SIZE+++`
endif::add-copy-button-to-env-var[]
--|int
|`128`


a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.output-buffer-size]]`link:#quarkus-cxf_quarkus.cxf.output-buffer-size[quarkus.cxf.output-buffer-size]`


[.description]
--
The size of the output stream response buffer in bytes. If a response is larger than this and no content-length is provided then the response will be chunked.

Larger values may give slight performance increases for large responses, at the expense of more memory usage.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_CXF_OUTPUT_BUFFER_SIZE+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_CXF_OUTPUT_BUFFER_SIZE+++`
endif::add-copy-button-to-env-var[]
--|int
|`8191`


a| [[quarkus-cxf_quarkus.cxf.logging.enabled-for]]`link:#quarkus-cxf_quarkus.cxf.logging.enabled-for[quarkus.cxf.logging.enabled-for]`


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.quarkiverse.cxf.CXFRecorder;
import io.quarkiverse.cxf.CXFServletInfos;
import io.quarkiverse.cxf.CxfConfig;
import io.quarkiverse.cxf.CxfFixedConfig;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.BeanContainerBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
Expand Down Expand Up @@ -123,6 +124,7 @@ void startRoute(CXFRecorder recorder,
HttpBuildTimeConfig httpBuildTimeConfig,
HttpConfiguration httpConfiguration,
CxfBuildTimeConfig cxfBuildTimeConfig,
CxfFixedConfig fixedConfig,
CxfConfig cxfConfig) {
final RuntimeValue<CXFServletInfos> infos = recorder.createInfos(cxfBuildTimeConfig.path(),
httpBuildTimeConfig.rootPath);
Expand All @@ -140,7 +142,7 @@ void startRoute(CXFRecorder recorder,
}
if (!requestors.isEmpty()) {
final Handler<RoutingContext> handler = recorder.initServer(infos, beanContainer.getValue(),
httpConfiguration);
httpConfiguration, fixedConfig);
final String mappingPath = getMappingPath(cxfBuildTimeConfig.path());
LOGGER.infof("Mapping a Vert.x handler for CXF to %s as requested by %s", mappingPath, requestors);
routes.produce(RouteBuildItem.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,15 @@ public RuntimeValue<CXFServletInfos> createInfos(String path, String contextPath
return new RuntimeValue<>(infos);
}

public Handler<RoutingContext> initServer(RuntimeValue<CXFServletInfos> infos, BeanContainer beanContainer,
HttpConfiguration httpConfiguration) {
public Handler<RoutingContext> initServer(
RuntimeValue<CXFServletInfos> infos,
BeanContainer beanContainer,
HttpConfiguration httpConfiguration,
CxfFixedConfig fixedConfig) {
LOGGER.trace("init server");
// There may be a better way to handle this
CxfJsonRPCService.setServletInfos(infos.getValue());
return new CxfHandler(infos.getValue(), beanContainer, httpConfiguration);
return new CxfHandler(infos.getValue(), beanContainer, httpConfiguration, fixedConfig);
}

public void resetDestinationRegistry(ShutdownContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@
@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED)
public interface CxfFixedConfig {

/**
* The size in bytes of the chunks of memory allocated when writing data.
* <p>
* This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations
* of the application.
*/
@WithDefault("128")
int minChunkSize();

/**
* The size of the output stream response buffer in bytes. If a response is larger than this and no content-length
* is provided then the response will be chunked.
* <p>
* Larger values may give slight performance increases for large responses, at the expense of more memory usage.
*/
@WithDefault("8191")
int outputBufferSize();

/**
* The build time part of the client configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package io.quarkiverse.cxf.transport;

import java.util.ArrayDeque;
import java.util.Objects;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;

/**
* Adapted from
* <a href=
* "independent-projects/resteasy-reactive/server/vertx/src/main/java/org/jboss/resteasy/reactive/server/vertx/AppendBuffer.java"><code>AppendBuffer</code></a>
* from Quarkus.
*
* It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.<br>
* In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}.
*/
final class AppendBuffer {
private final ByteBufAllocator allocator;

private final int minChunkSize;
private final int capacity;
private ByteBuf buffer;
private ArrayDeque<ByteBuf> otherBuffers;
private int size;

private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) {
this.allocator = allocator;
this.minChunkSize = Math.min(minChunkSize, capacity);
this.capacity = capacity;
}

/**
* This buffer append data in a single eagerly allocated {@link ByteBuf}.
*/
public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, capacity, capacity);
}

/**
* This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.<br>
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}.
*/
public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, 0, capacity);
}

/**
* This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or
* as each {@code len}, if greater than it.<br>
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}.
*/
public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) {
return new AppendBuffer(allocator, minChunkSize, capacity);
}

private ByteBuf lastBuffer() {
if (otherBuffers == null || otherBuffers.isEmpty()) {
return buffer;
}
return otherBuffers.peekLast();
}

/**
* It returns how many bytes have been appended<br>
* If returns a value different from {@code len}, is it required to invoke {@link #clear}
* that would refill the available capacity till {@link #capacity()}
*/
public int append(byte[] bytes, int off, int len) {
Objects.requireNonNull(bytes);
if (len == 0) {
return 0;
}
int alreadyWritten = 0;
if (minChunkSize > 0) {
var lastBuffer = lastBuffer();
if (lastBuffer != null) {
int availableOnLast = lastBuffer.writableBytes();
if (availableOnLast > 0) {
int toWrite = Math.min(len, availableOnLast);
lastBuffer.writeBytes(bytes, off, toWrite);
size += toWrite;
len -= toWrite;
// we stop if there's no more to append
if (len == 0) {
return toWrite;
}
off += toWrite;
alreadyWritten = toWrite;
}
}
}
final int availableCapacity = capacity - size;
if (availableCapacity == 0) {
return alreadyWritten;
}
// we can still write some
int toWrite = Math.min(len, availableCapacity);
assert toWrite > 0;
final int chunkCapacity;
if (minChunkSize > 0) {
// Cannot allocate less than minChunkSize, till the limit of capacity left
chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity);
} else {
chunkCapacity = toWrite;
}
var tmpBuf = allocator.directBuffer(chunkCapacity);
try {
tmpBuf.writeBytes(bytes, off, toWrite);
} catch (Throwable t) {
tmpBuf.release();
throw t;
}
if (buffer == null) {
buffer = tmpBuf;
} else {
boolean resetOthers = false;
try {
if (otherBuffers == null) {
otherBuffers = new ArrayDeque<>();
resetOthers = true;
}
otherBuffers.add(tmpBuf);
} catch (Throwable t) {
rollback(alreadyWritten, tmpBuf, resetOthers);
throw t;
}
}
size += toWrite;
return toWrite + alreadyWritten;
}

private void rollback(int alreadyWritten, ByteBuf tmpBuf, boolean resetOthers) {
tmpBuf.release();
if (resetOthers) {
otherBuffers = null;
}
if (alreadyWritten > 0) {
var last = lastBuffer();
last.writerIndex(last.writerIndex() - alreadyWritten);
size -= alreadyWritten;
assert last.writerIndex() > 0;
}
}

public ByteBuf clear() {
var firstBuf = buffer;
if (firstBuf == null) {
return null;
}
var others = otherBuffers;
if (others == null || others.isEmpty()) {
size = 0;
buffer = null;
// super fast-path
return firstBuf;
}
return clearBuffers();
}

private CompositeByteBuf clearBuffers() {
var firstBuf = buffer;
var others = otherBuffers;
var batch = allocator.compositeDirectBuffer(1 + others.size());
try {
buffer = null;
size = 0;
batch.addComponent(true, 0, firstBuf);
for (int i = 0, othersCount = others.size(); i < othersCount; i++) {
// if addComponent fail, it takes care of releasing curr and throwing the exception:
batch.addComponent(true, 1 + i, others.poll());
}
return batch;
} catch (Throwable anyError) {
batch.release();
releaseOthers(others);
throw anyError;
}
}

private static void releaseOthers(ArrayDeque<ByteBuf> others) {
ByteBuf buf;
while ((buf = others.poll()) != null) {
buf.release();
}
}

public int capacity() {
return capacity;
}

public int availableCapacity() {
return capacity - size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.quarkiverse.cxf.CXFServletInfo;
import io.quarkiverse.cxf.CXFServletInfos;
import io.quarkiverse.cxf.CxfConfig;
import io.quarkiverse.cxf.CxfFixedConfig;
import io.quarkiverse.cxf.QuarkusRuntimeJaxWsServiceFactoryBean;
import io.quarkiverse.cxf.logging.LoggingFactoryCustomizer;
import io.quarkus.arc.ManagedContext;
Expand All @@ -51,12 +52,15 @@ public class CxfHandler implements Handler<RoutingContext> {
private final IdentityProviderManager identityProviderManager;
private final CurrentVertxRequest currentVertxRequest;
private final HttpConfiguration httpConfiguration;
private final int outputBufferSize;
private final int minChunkSize;

private static final String X_FORWARDED_PROTO_HEADER = "X-Forwarded-Proto";
private static final String X_FORWARDED_FOR_HEADER = "X-Forwarded-For";
private static final String X_FORWARDED_PORT_HEADER = "X-Forwarded-Port";

public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, HttpConfiguration httpConfiguration) {
public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, HttpConfiguration httpConfiguration,
CxfFixedConfig fixedConfig) {
LOGGER.trace("CxfHandler created");
this.beanContainer = beanContainer;
this.httpConfiguration = httpConfiguration;
Expand All @@ -70,6 +74,8 @@ public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer,
this.bus = BusFactory.getDefaultBus();

this.loader = this.bus.getExtension(ClassLoader.class);
this.outputBufferSize = fixedConfig.outputBufferSize();
this.minChunkSize = fixedConfig.minChunkSize();

LOGGER.trace("load destination");
DestinationFactoryManager dfm = this.bus.getExtension(DestinationFactoryManager.class);
Expand Down Expand Up @@ -219,7 +225,7 @@ private void process(RoutingContext event) {
currentVertxRequest.setCurrent(event);
try {
HttpServletRequest req = new VertxHttpServletRequest(event, contextPath, servletPath);
VertxHttpServletResponse resp = new VertxHttpServletResponse(event);
VertxHttpServletResponse resp = new VertxHttpServletResponse(event, outputBufferSize, minChunkSize);
req = checkXForwardedHeaders(req);
controller.invoke(req, resp);
resp.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ public class VertxHttpServletResponse implements HttpServletResponse {
protected final RoutingContext context;
private final HttpServerRequest request;
protected final HttpServerResponse response;
private final int outputBufferSize;
private final int minChunkSize;
private VertxServletOutputStream os;
private PrintWriter printWriter;

public VertxHttpServletResponse(RoutingContext context) {
public VertxHttpServletResponse(RoutingContext context, int outputBufferSize, int minChunkSize) {
this.request = context.request();
this.response = context.response();
this.context = context;
this.os = new VertxServletOutputStream(request, response);
this.outputBufferSize = outputBufferSize;
this.minChunkSize = minChunkSize;
this.os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize);
}

@Override
Expand Down Expand Up @@ -187,7 +191,7 @@ public void resetBuffer() {
} catch (IOException e) {
}
}
os = new VertxServletOutputStream(request, response);
os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize);
}

@Override
Expand Down
Loading

0 comments on commit 2c0ff7e

Please sign in to comment.