Skip to content

Commit

Permalink
Large responses written slowly due to allocating too small buffers fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed Nov 13, 2023
1 parent db14990 commit b36db1e
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 138 deletions.
38 changes: 38 additions & 0 deletions docs/modules/ROOT/pages/includes/quarkus-cxf.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,44 @@ endif::add-copy-button-to-env-var[]
|


a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.min-chunk-size]]`link:#quarkus-cxf_quarkus.cxf.min-chunk-size[quarkus.cxf.min-chunk-size]`


[.description]
--
The size in bytes of the chunks of memory allocated when writing data.

This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations of the application.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_CXF_MIN_CHUNK_SIZE+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_CXF_MIN_CHUNK_SIZE+++`
endif::add-copy-button-to-env-var[]
--|int
|`128`


a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.output-buffer-size]]`link:#quarkus-cxf_quarkus.cxf.output-buffer-size[quarkus.cxf.output-buffer-size]`


[.description]
--
The size of the output stream response buffer in bytes. If a response is larger than this and no content-length is provided then the response will be chunked.

Larger values may give slight performance increases for large responses, at the expense of more memory usage.

ifdef::add-copy-button-to-env-var[]
Environment variable: env_var_with_copy_button:+++QUARKUS_CXF_OUTPUT_BUFFER_SIZE+++[]
endif::add-copy-button-to-env-var[]
ifndef::add-copy-button-to-env-var[]
Environment variable: `+++QUARKUS_CXF_OUTPUT_BUFFER_SIZE+++`
endif::add-copy-button-to-env-var[]
--|int
|`8191`


a|icon:lock[title=Fixed at build time] [[quarkus-cxf_quarkus.cxf.codegen.wsdl2java.-named-parameter-sets-.includes]]`link:#quarkus-cxf_quarkus.cxf.codegen.wsdl2java.-named-parameter-sets-.includes[quarkus.cxf.codegen.wsdl2java."named-parameter-sets".includes]`


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.quarkiverse.cxf.CXFRecorder;
import io.quarkiverse.cxf.CXFServletInfos;
import io.quarkiverse.cxf.CxfConfig;
import io.quarkiverse.cxf.CxfFixedConfig;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.BeanContainerBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
Expand Down Expand Up @@ -123,6 +124,7 @@ void startRoute(CXFRecorder recorder,
HttpBuildTimeConfig httpBuildTimeConfig,
HttpConfiguration httpConfiguration,
CxfBuildTimeConfig cxfBuildTimeConfig,
CxfFixedConfig fixedConfig,
CxfConfig cxfConfig) {
final RuntimeValue<CXFServletInfos> infos = recorder.createInfos(cxfBuildTimeConfig.path(),
httpBuildTimeConfig.rootPath);
Expand All @@ -140,7 +142,7 @@ void startRoute(CXFRecorder recorder,
}
if (!requestors.isEmpty()) {
final Handler<RoutingContext> handler = recorder.initServer(infos, beanContainer.getValue(),
httpConfiguration);
httpConfiguration, fixedConfig);
final String mappingPath = getMappingPath(cxfBuildTimeConfig.path());
LOGGER.infof("Mapping a Vert.x handler for CXF to %s as requested by %s", mappingPath, requestors);
routes.produce(RouteBuildItem.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,15 @@ public RuntimeValue<CXFServletInfos> createInfos(String path, String contextPath
return new RuntimeValue<>(infos);
}

public Handler<RoutingContext> initServer(RuntimeValue<CXFServletInfos> infos, BeanContainer beanContainer,
HttpConfiguration httpConfiguration) {
public Handler<RoutingContext> initServer(
RuntimeValue<CXFServletInfos> infos,
BeanContainer beanContainer,
HttpConfiguration httpConfiguration,
CxfFixedConfig fixedConfig) {
LOGGER.trace("init server");
// There may be a better way to handle this
CxfJsonRPCService.setServletInfos(infos.getValue());
return new CxfHandler(infos.getValue(), beanContainer, httpConfiguration);
return new CxfHandler(infos.getValue(), beanContainer, httpConfiguration, fixedConfig);
}

public void resetDestinationRegistry(ShutdownContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@
@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED)
public interface CxfFixedConfig {

/**
* The size in bytes of the chunks of memory allocated when writing data.
* <p>
* This is a very advanced setting that should only be set if you understand exactly how it affects the output IO operations
* of the application.
*/
@WithDefault("128")
int minChunkSize();

/**
* The size of the output stream response buffer in bytes. If a response is larger than this and no content-length
* is provided then the response will be chunked.
* <p>
* Larger values may give slight performance increases for large responses, at the expense of more memory usage.
*/
@WithDefault("8191")
int outputBufferSize();

/**
* The build time part of the client configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package io.quarkiverse.cxf.transport;

import java.util.ArrayDeque;
import java.util.Objects;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;

/**
* It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.<br>
* In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}.
*/
final class AppendBuffer {
private final ByteBufAllocator allocator;

private final int minChunkSize;
private final int capacity;
private ByteBuf buffer;
private ArrayDeque<ByteBuf> otherBuffers;
private int size;

private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) {
this.allocator = allocator;
this.minChunkSize = Math.min(minChunkSize, capacity);
this.capacity = capacity;
}

/**
* This buffer append data in a single eagerly allocated {@link ByteBuf}.
*/
public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, capacity, capacity);
}

/**
* This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.<br>
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}.
*/
public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, 0, capacity);
}

/**
* This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or
* as each {@code len}, if greater than it.<br>
* The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}.
*/
public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) {
return new AppendBuffer(allocator, minChunkSize, capacity);
}

private ByteBuf lastBuffer() {
if (otherBuffers == null || otherBuffers.isEmpty()) {
return buffer;
}
return otherBuffers.peekLast();
}

/**
* It returns how many bytes have been appended<br>
* If returns a value different from {@code len}, is it required to invoke {@link #clear}
* that would refill the available capacity till {@link #capacity()}
*/
public int append(byte[] bytes, int off, int len) {
Objects.requireNonNull(bytes);
if (len == 0) {
return 0;
}
int alreadyWritten = 0;
if (minChunkSize > 0) {
var lastBuffer = lastBuffer();
if (lastBuffer != null) {
int availableOnLast = lastBuffer.writableBytes();
if (availableOnLast > 0) {
int toWrite = Math.min(len, availableOnLast);
lastBuffer.writeBytes(bytes, off, toWrite);
size += toWrite;
len -= toWrite;
// we stop if there's no more to append
if (len == 0) {
return toWrite;
}
off += toWrite;
alreadyWritten = toWrite;
}
}
}
final int availableCapacity = capacity - size;
if (availableCapacity == 0) {
return alreadyWritten;
}
// we can still write some
int toWrite = Math.min(len, availableCapacity);
assert toWrite > 0;
final int chunkCapacity;
if (minChunkSize > 0) {
// Cannot allocate less than minChunkSize, till the limit of capacity left
chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity);
} else {
chunkCapacity = toWrite;
}
var tmpBuf = allocator.directBuffer(chunkCapacity);
try {
tmpBuf.writeBytes(bytes, off, toWrite);
} catch (Throwable t) {
tmpBuf.release();
throw t;
}
if (buffer == null) {
buffer = tmpBuf;
} else {
boolean resetOthers = false;
try {
if (otherBuffers == null) {
otherBuffers = new ArrayDeque<>();
resetOthers = true;
}
otherBuffers.add(tmpBuf);
} catch (Throwable t) {
rollback(alreadyWritten, tmpBuf, resetOthers);
throw t;
}
}
size += toWrite;
return toWrite + alreadyWritten;
}

private void rollback(int alreadyWritten, ByteBuf tmpBuf, boolean resetOthers) {
tmpBuf.release();
if (resetOthers) {
otherBuffers = null;
}
if (alreadyWritten > 0) {
var last = lastBuffer();
last.writerIndex(last.writerIndex() - alreadyWritten);
size -= alreadyWritten;
assert last.writerIndex() > 0;
}
}

public ByteBuf clear() {
var firstBuf = buffer;
if (firstBuf == null) {
return null;
}
var others = otherBuffers;
if (others == null || others.isEmpty()) {
size = 0;
buffer = null;
// super fast-path
return firstBuf;
}
return clearBuffers();
}

private CompositeByteBuf clearBuffers() {
var firstBuf = buffer;
var others = otherBuffers;
var batch = allocator.compositeDirectBuffer(1 + others.size());
try {
buffer = null;
size = 0;
batch.addComponent(true, 0, firstBuf);
for (int i = 0, othersCount = others.size(); i < othersCount; i++) {
// if addComponent fail, it takes care of releasing curr and throwing the exception:
batch.addComponent(true, 1 + i, others.poll());
}
return batch;
} catch (Throwable anyError) {
batch.release();
releaseOthers(others);
throw anyError;
}
}

private static void releaseOthers(ArrayDeque<ByteBuf> others) {
ByteBuf buf;
while ((buf = others.poll()) != null) {
buf.release();
}
}

public int capacity() {
return capacity;
}

public int availableCapacity() {
return capacity - size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.quarkiverse.cxf.CXFRuntimeUtils;
import io.quarkiverse.cxf.CXFServletInfo;
import io.quarkiverse.cxf.CXFServletInfos;
import io.quarkiverse.cxf.CxfFixedConfig;
import io.quarkiverse.cxf.QuarkusRuntimeJaxWsServiceFactoryBean;
import io.quarkus.arc.ManagedContext;
import io.quarkus.arc.runtime.BeanContainer;
Expand All @@ -49,12 +50,15 @@ public class CxfHandler implements Handler<RoutingContext> {
private final IdentityProviderManager identityProviderManager;
private final CurrentVertxRequest currentVertxRequest;
private final HttpConfiguration httpConfiguration;
private final int outputBufferSize;
private final int minChunkSize;

private static final String X_FORWARDED_PROTO_HEADER = "X-Forwarded-Proto";
private static final String X_FORWARDED_FOR_HEADER = "X-Forwarded-For";
private static final String X_FORWARDED_PORT_HEADER = "X-Forwarded-Port";

public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, HttpConfiguration httpConfiguration) {
public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer, HttpConfiguration httpConfiguration,
CxfFixedConfig fixedConfig) {
LOGGER.trace("CxfHandler created");
this.beanContainer = beanContainer;
this.httpConfiguration = httpConfiguration;
Expand All @@ -68,6 +72,8 @@ public CxfHandler(CXFServletInfos cxfServletInfos, BeanContainer beanContainer,
this.bus = BusFactory.getDefaultBus();

this.loader = this.bus.getExtension(ClassLoader.class);
this.outputBufferSize = fixedConfig.outputBufferSize();
this.minChunkSize = fixedConfig.minChunkSize();

LOGGER.trace("load destination");
DestinationFactoryManager dfm = this.bus.getExtension(DestinationFactoryManager.class);
Expand Down Expand Up @@ -214,7 +220,7 @@ private void process(RoutingContext event) {
currentVertxRequest.setCurrent(event);
try {
HttpServletRequest req = new VertxHttpServletRequest(event, contextPath, servletPath);
VertxHttpServletResponse resp = new VertxHttpServletResponse(event);
VertxHttpServletResponse resp = new VertxHttpServletResponse(event, outputBufferSize, minChunkSize);
req = checkXForwardedHeaders(req);
controller.invoke(req, resp);
resp.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ public class VertxHttpServletResponse implements HttpServletResponse {
protected final RoutingContext context;
private final HttpServerRequest request;
protected final HttpServerResponse response;
private final int outputBufferSize;
private final int minChunkSize;
private VertxServletOutputStream os;
private PrintWriter printWriter;

public VertxHttpServletResponse(RoutingContext context) {
public VertxHttpServletResponse(RoutingContext context, int outputBufferSize, int minChunkSize) {
this.request = context.request();
this.response = context.response();
this.context = context;
this.os = new VertxServletOutputStream(request, response);
this.outputBufferSize = outputBufferSize;
this.minChunkSize = minChunkSize;
this.os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize);
}

@Override
Expand Down Expand Up @@ -187,7 +191,7 @@ public void resetBuffer() {
} catch (IOException e) {
}
}
os = new VertxServletOutputStream(request, response);
os = new VertxServletOutputStream(request, response, context, outputBufferSize, minChunkSize);
}

@Override
Expand Down
Loading

0 comments on commit b36db1e

Please sign in to comment.