Skip to content

Commit

Permalink
Harden NettyJsonContentAuthSigner
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Apr 26, 2024
1 parent 0483cd6 commit 32a2485
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static void fillStatusCodeMetrics(@NonNull IReplayContexts.ITupleHandling

private static Map<String, Object> fillMap(LinkedHashMap<String, Object> map,
HttpHeaders headers, ByteBuf content) {
try (var bufHolder = RefSafeHolder.create(Base64.encode(content, Base64Dialect.URL_SAFE))) {
try (var bufHolder = RefSafeHolder.create(Base64.encode(content, false, Base64Dialect.STANDARD))) {
var buf = bufHolder.get();
assert buf != null : "Base64.encode should not return null";
String base64body = buf.toString(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,49 @@
public class NettyJsonContentAuthSigner extends ChannelInboundHandlerAdapter {
IAuthTransformer.StreamingFullMessageTransformer signer;
HttpJsonMessageWithFaultingPayload httpMessage;
List<HttpContent> receivedHttpContents;
List<HttpContent> httpContentsBuffer;

public NettyJsonContentAuthSigner(IAuthTransformer.StreamingFullMessageTransformer signer) {
this.signer = signer;
this.receivedHttpContents = new ArrayList<>();
this.httpContentsBuffer = new ArrayList<>();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpJsonMessageWithFaultingPayload) {
httpMessage = (HttpJsonMessageWithFaultingPayload) msg;
} else if (msg instanceof HttpContent) {
receivedHttpContents.add(((HttpContent) msg).retainedDuplicate());
var httpContent = (HttpContent) msg;
httpContentsBuffer.add(httpContent);
signer.consumeNextPayloadPart(httpContent.content().nioBuffer());
if (msg instanceof LastHttpContent) {
finalizeSignature(ctx);
signer.finalizeSignature(httpMessage);
flushDownstream(ctx);
}
} else {
super.channelRead(ctx, msg);
}
}

private void finalizeSignature(ChannelHandlerContext ctx) {
signer.finalizeSignature(httpMessage);
ctx.fireChannelRead(httpMessage);
receivedHttpContents.stream().forEach(content->{
ctx.fireChannelRead(content);
content.content().release();
});
private void flushDownstream(ChannelHandlerContext ctx) {
if(httpMessage != null) {
ctx.fireChannelRead(httpMessage);
httpMessage = null;
}
httpContentsBuffer.forEach(ctx::fireChannelRead);
httpContentsBuffer.clear();
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
flushDownstream(ctx);
super.handlerRemoved(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
flushDownstream(ctx);
super.channelUnregistered(ctx);
}

}

0 comments on commit 32a2485

Please sign in to comment.