From 25cf86e028b2cfb4707c9ac7a43a758e4e997f82 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sat, 6 Aug 2022 20:08:07 +0800 Subject: [PATCH] HBASE-27273 Should stop autoRead and skip all the bytes when rpc request too big (#4679) Signed-off-by: Xiaolin Ha (cherry picked from commit 486d19e99ff4370bc60e0db235508198c84a00e3) --- .../hadoop/hbase/ipc/NettyRpcFrameDecoder.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java index 164934ac2477..dec00424e73d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java @@ -44,6 +44,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder { private final int maxFrameLength; private boolean requestTooBig; + private boolean requestTooBigSent; private String requestTooBigMessage; public NettyRpcFrameDecoder(int maxFrameLength) { @@ -58,8 +59,12 @@ void setConnection(NettyServerRpcConnection connection) { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (requestTooBigSent) { + in.skipBytes(in.readableBytes()); + return; + } if (requestTooBig) { - handleTooBigRequest(in); + handleTooBigRequest(ctx, in); return; } @@ -83,7 +88,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t NettyRpcServer.LOG.warn(requestTooBigMessage); if (connection.connectionHeaderRead) { - handleTooBigRequest(in); + handleTooBigRequest(ctx, in); return; } ctx.channel().close(); @@ -101,7 +106,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t out.add(in.readRetainedSlice(frameLengthInt)); } - private void handleTooBigRequest(ByteBuf in) throws IOException { + private void handleTooBigRequest(ChannelHandlerContext ctx, ByteBuf in) throws IOException { in.skipBytes(FRAME_LENGTH_FIELD_LENGTH); in.markReaderIndex(); int preIndex = in.readerIndex(); @@ -146,6 +151,10 @@ private void handleTooBigRequest(ByteBuf in) throws IOException { // instead of calling reqTooBig.sendResponseIfReady() reqTooBig.param = null; connection.channel.writeAndFlush(reqTooBig).addListener(ChannelFutureListener.CLOSE); + in.skipBytes(in.readableBytes()); + requestTooBigSent = true; + // disable auto read as we do not care newer data from this channel any more + ctx.channel().config().setAutoRead(false); } private RPCProtos.RequestHeader getHeader(ByteBuf in, int headerSize) throws IOException {