Skip to content

Commit

Permalink
Fix codestyle
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Apr 20, 2023
1 parent 3acb4d5 commit f428f93
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,116 +6,116 @@
import io.netty.buffer.ByteBufAllocator;

final class AppendBuffer {
private final ByteBufAllocator allocator;
private final int capacity;
private ByteBuf buffer;
private ArrayDeque<ByteBuf> otherBuffers;
private int size;
private int minChunkSize;
private final ByteBufAllocator allocator;
private final int capacity;
private ByteBuf buffer;
private ArrayDeque<ByteBuf> otherBuffers;
private int size;
private int minChunkSize;

private AppendBuffer(ByteBufAllocator allocator, int capacity, int minChunkSize) {
this.allocator = allocator;
this.capacity = capacity;
this.minChunkSize = Math.min(minChunkSize, capacity);
}
private AppendBuffer(ByteBufAllocator allocator, int capacity, int minChunkSize) {
this.allocator = allocator;
this.capacity = capacity;
this.minChunkSize = Math.min(minChunkSize, capacity);
}

public static AppendBuffer withExactChunks(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, capacity, 0);
}
public static AppendBuffer withExactChunks(ByteBufAllocator allocator, int capacity) {
return new AppendBuffer(allocator, capacity, 0);
}

public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int capacity, int minChunkSize) {
return new AppendBuffer(allocator, capacity, minChunkSize);
}
public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int capacity, int minChunkSize) {
return new AppendBuffer(allocator, capacity, minChunkSize);
}

private ByteBuf lastBuffer() {
if (otherBuffers == null || otherBuffers.isEmpty()) {
return buffer;
}
return otherBuffers.peekLast();
}
private ByteBuf lastBuffer() {
if (otherBuffers == null || otherBuffers.isEmpty()) {
return buffer;
}
return otherBuffers.peekLast();
}

/**
* It returns how many bytes have been appended<br>
* If the return value is different from {@code len}, is it required to invoke {@link #flushBatch}
* that would refill the available capacity till {@link #capacity()}
*/
public int append(byte[] bytes, int off, int len) {
int alreadyWritten = 0;
if (minChunkSize > 0) {
var lastBuffer = lastBuffer();
if (lastBuffer != null) {
int availableOnLast = lastBuffer.writableBytes();
if (availableOnLast > 0) {
int toWrite = Math.min(len, availableOnLast);
lastBuffer.writeBytes(bytes, off, toWrite);
size += toWrite;
len -= toWrite;
// we stop if there's no more to append
if (len == 0) {
return toWrite;
}
off += toWrite;
alreadyWritten = toWrite;
/**
* It returns how many bytes have been appended<br>
* If the return value is different from {@code len}, is it required to invoke {@link #flushBatch}
* that would refill the available capacity till {@link #capacity()}
*/
public int append(byte[] bytes, int off, int len) {
int alreadyWritten = 0;
if (minChunkSize > 0) {
var lastBuffer = lastBuffer();
if (lastBuffer != null) {
int availableOnLast = lastBuffer.writableBytes();
if (availableOnLast > 0) {
int toWrite = Math.min(len, availableOnLast);
lastBuffer.writeBytes(bytes, off, toWrite);
size += toWrite;
len -= toWrite;
// we stop if there's no more to append
if (len == 0) {
return toWrite;
}
off += toWrite;
alreadyWritten = toWrite;
}
}
}
}
final int availableCapacity = capacity - size;
if (availableCapacity == 0) {
return alreadyWritten;
}
// we can still write some
int toWrite = Math.min(len, availableCapacity);
final int chunkCapacity;
if (minChunkSize > 0) {
// Cannot allocate less the minChunkSize, till the limit of capacity left
chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity);
} else {
chunkCapacity = toWrite;
}
var tmpBuf = allocator.directBuffer(chunkCapacity);
tmpBuf.writeBytes(bytes, off, toWrite);
if (buffer == null) {
buffer = tmpBuf;
} else {
if (otherBuffers == null) {
// TODO: 16 + 1 are A LOT; maybe we would like to check with JOL first!
otherBuffers = new ArrayDeque<>();
}
otherBuffers.add(tmpBuf);
}
size += toWrite;
return toWrite + alreadyWritten;
}
}
final int availableCapacity = capacity - size;
if (availableCapacity == 0) {
return alreadyWritten;
}
// we can still write some
int toWrite = Math.min(len, availableCapacity);
final int chunkCapacity;
if (minChunkSize > 0) {
// Cannot allocate less the minChunkSize, till the limit of capacity left
chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity);
} else {
chunkCapacity = toWrite;
}
var tmpBuf = allocator.directBuffer(chunkCapacity);
tmpBuf.writeBytes(bytes, off, toWrite);
if (buffer == null) {
buffer = tmpBuf;
} else {
if (otherBuffers == null) {
// TODO: 16 + 1 are A LOT; maybe we would like to check with JOL first!
otherBuffers = new ArrayDeque<>();
}
otherBuffers.add(tmpBuf);
}
size += toWrite;
return toWrite + alreadyWritten;
}

public ByteBuf flushBatch() {
var firstBuf = buffer;
if (firstBuf == null) {
return null;
}
var others = otherBuffers;
if (others == null || others.isEmpty()) {
size = 0;
buffer = null;
// super fast-path
return firstBuf;
}
var batch = allocator.compositeDirectBuffer(1 + others.size());
batch.addComponent(true, 0, firstBuf);
buffer = null;
for (int i = 0, size = others.size(); i < size; i++) {
var curr = others.poll();
batch.addComponent(true, 1 + i, curr);
}
size = 0;
return batch;
}
public ByteBuf flushBatch() {
var firstBuf = buffer;
if (firstBuf == null) {
return null;
}
var others = otherBuffers;
if (others == null || others.isEmpty()) {
size = 0;
buffer = null;
// super fast-path
return firstBuf;
}
var batch = allocator.compositeDirectBuffer(1 + others.size());
batch.addComponent(true, 0, firstBuf);
buffer = null;
for (int i = 0, size = others.size(); i < size; i++) {
var curr = others.poll();
batch.addComponent(true, 1 + i, curr);
}
size = 0;
return batch;
}

public int capacity() {
return capacity;
}
public int capacity() {
return capacity;
}

public int availableCapacity() {
return capacity - size;
}
public int availableCapacity() {
return capacity - size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public ResteasyReactiveOutputStream(VertxResteasyReactiveRequestContext context)
this.context = context;
this.request = context.getContext().request();
this.writeBatch = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT,
context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize(),
128);
context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize(),
128);
request.response().exceptionHandler(new Handler<Throwable>() {
@Override
public void handle(Throwable event) {
Expand Down Expand Up @@ -209,8 +209,8 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
if (written < rem) {
writeBlocking(writeBatch.flushBatch(), false);
}
rem-=written;
idx+=written;
rem -= written;
idx += written;
}
} catch (Exception e) {
throw new IOException(e);
Expand Down

0 comments on commit f428f93

Please sign in to comment.