Skip to content

Commit

Permalink
Fix ByteBuf Release in HttpByteBufFormatter when Exception
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Apr 16, 2024
1 parent 4ec659b commit 537bad1
Showing 1 changed file with 55 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import lombok.Lombok;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -161,17 +164,13 @@ public static HttpMessage parseHttpMessageFromBufs(HttpMessageType msgType, Stre
new HttpObjectAggregator(Utils.MAX_PAYLOAD_SIZE_TO_PRINT) // Set max content length if needed
);

byteBufStream.forEach(b -> {
try {
channel.writeInbound(b.retainedDuplicate());
} finally {
if (releaseByteBufs) {
b.release();
}
}
});

final AtomicReference<Throwable> throwable = new AtomicReference<>();
final Function<ByteBuf, Boolean> byteBufConsumer = (ByteBuf b) -> channel.writeInbound(b.retainedDuplicate());
byteBufStream.forEach((b) -> makeReferenceSafeFunction(byteBufConsumer, releaseByteBufs, throwable).apply(b));
try {
if (throwable.get() != null) {
throw Lombok.sneakyThrow(throwable.get());
}
return channel.readInbound();
} finally {
channel.finishAndReleaseAll();
Expand All @@ -191,20 +190,51 @@ public static String httpPacketBufsToString(Stream<ByteBuf> byteBufStream, long
if (byteBufStream == null) {
return "null";
}
return byteBufStream.map(originalByteBuf -> {
try {
var bb = originalByteBuf.duplicate();
var length = bb.readableBytes();
var str = IntStream.range(0, length).map(idx -> bb.readByte())
.limit(maxBytesToShow)
.mapToObj(b -> "" + (char) b)
.collect(Collectors.joining());
return "[" + (length > maxBytesToShow ? str + "..." : str) + "]";
} finally {
if (releaseByteBufs) {
originalByteBuf.release();
}
}})
.collect(Collectors.joining(","));

final AtomicReference<Throwable> throwable = new AtomicReference<>();
final Function<ByteBuf, String> byteBufFunction = (ByteBuf originalByteBuf) -> {
var bb = originalByteBuf.duplicate();
var length = bb.readableBytes();
var str = IntStream.range(0, length).map(idx -> bb.readByte())
.limit(maxBytesToShow)
.mapToObj(b -> "" + (char) b)
.collect(Collectors.joining());
return "[" + (length > maxBytesToShow ? str + "..." : str) + "]";
};
var packetBufsString = byteBufStream.map(makeReferenceSafeFunction(byteBufFunction, releaseByteBufs, throwable))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.joining(","));

if (throwable.get() != null) {
throw Lombok.sneakyThrow(throwable.get());
}
return packetBufsString;
}

public static <T extends ByteBuf, R> Function<T,Optional<R>> makeReferenceSafeFunction(Function<T, R> operation,
boolean releaseByteBufs, AtomicReference<Throwable> throwableRef) {
return (T buf) -> {
try {
if (throwableRef.get() != null) {
return Optional.ofNullable(operation.apply(buf));
}
} catch (Throwable t) {
boolean isSet = throwableRef.compareAndSet(null, t);
if (!releaseByteBufs && isSet) {
throw Lombok.sneakyThrow(throwableRef.get());
}
}
finally {
try {
if (releaseByteBufs) {
buf.release();
}
} catch (Throwable t) {
throwableRef.compareAndSet(null, t);
}
}
return Optional.empty();
};
}
}

0 comments on commit 537bad1

Please sign in to comment.