Skip to content

Commit

Permalink
Harden NettyJsonContentAuthSigner
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Apr 26, 2024
1 parent 0483cd6 commit be72c5b
Showing 1 changed file with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,49 @@
public class NettyJsonContentAuthSigner extends ChannelInboundHandlerAdapter {
IAuthTransformer.StreamingFullMessageTransformer signer;
HttpJsonMessageWithFaultingPayload httpMessage;
List<HttpContent> receivedHttpContents;
List<HttpContent> httpContentsBuffer;

public NettyJsonContentAuthSigner(IAuthTransformer.StreamingFullMessageTransformer signer) {
this.signer = signer;
this.receivedHttpContents = new ArrayList<>();
this.httpContentsBuffer = new ArrayList<>();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpJsonMessageWithFaultingPayload) {
httpMessage = (HttpJsonMessageWithFaultingPayload) msg;
} else if (msg instanceof HttpContent) {
receivedHttpContents.add(((HttpContent) msg).retainedDuplicate());
var httpContent = (HttpContent) msg;
httpContentsBuffer.add(httpContent);
signer.consumeNextPayloadPart(httpContent.content().nioBuffer());
if (msg instanceof LastHttpContent) {
finalizeSignature(ctx);
signer.finalizeSignature(httpMessage);
flushDownstream(ctx);
}
} else {
super.channelRead(ctx, msg);
}
}

private void finalizeSignature(ChannelHandlerContext ctx) {
signer.finalizeSignature(httpMessage);
ctx.fireChannelRead(httpMessage);
receivedHttpContents.stream().forEach(content->{
ctx.fireChannelRead(content);
content.content().release();
});
private void flushDownstream(ChannelHandlerContext ctx) {
if(httpMessage != null) {
ctx.fireChannelRead(httpMessage);
httpMessage = null;
}
httpContentsBuffer.forEach(ctx::fireChannelRead);
httpContentsBuffer.clear();
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
flushDownstream(ctx);
super.handlerRemoved(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
flushDownstream(ctx);
super.channelUnregistered(ctx);
}

}

0 comments on commit be72c5b

Please sign in to comment.