diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 70f8f4623171..badd159aa5aa 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -625,15 +625,13 @@ public void closeStreamRemote(Http2Stream stream, ChannelFuture future) { @Override public void closeStream(final Http2Stream stream, ChannelFuture future) { - stream.close(); - if (future.isDone()) { - checkCloseConnection(future); + doCloseStream(stream, future); } else { future.addListener(new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture future) throws Exception { - checkCloseConnection(future); + public void operationComplete(ChannelFuture future) { + doCloseStream(stream, future); } }); } @@ -919,6 +917,11 @@ private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture fut } } + private void doCloseStream(final Http2Stream stream, ChannelFuture future) { + stream.close(); + checkCloseConnection(future); + } + /** * Returns the client preface string if this is a client connection, otherwise returns {@code null}. */ diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java index e67af9b6587d..9d5a1c463cb7 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java @@ -709,7 +709,7 @@ public ChannelFuture answer(InvocationOnMock invocation) throws Throwable { @Test public void canCloseStreamWithVoidPromise() throws Exception { handler = newHandler(); - handler.closeStream(stream, ctx.voidPromise()); + handler.closeStream(stream, ctx.voidPromise().setSuccess()); verify(stream, times(1)).close(); verifyNoMoreInteractions(stream); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java index 13ae27b178ad..8e489755b6a9 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTransportTest.java @@ -17,6 +17,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -25,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -60,13 +62,19 @@ import java.security.cert.CertificateException; import java.security.cert.CertificateExpiredException; import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; public class Http2MultiplexTransportTest { @@ -93,6 +101,49 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { private Channel serverChannel; private Channel serverConnectedChannel; + private static final class MultiplexInboundStream extends ChannelInboundHandlerAdapter { + ChannelFuture responseFuture; + final AtomicInteger handlerInactivatedFlushed; + final AtomicInteger handleInactivatedNotFlushed; + final CountDownLatch latchHandlerInactive; + static final String LARGE_STRING = generateLargeString(10240); + + MultiplexInboundStream(AtomicInteger handleInactivatedFlushed, + AtomicInteger handleInactivatedNotFlushed, CountDownLatch latchHandlerInactive) { + this.handlerInactivatedFlushed = handleInactivatedFlushed; + this.handleInactivatedNotFlushed = handleInactivatedNotFlushed; + this.latchHandlerInactive = latchHandlerInactive; + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) { + if (msg instanceof Http2HeadersFrame && ((Http2HeadersFrame) msg).isEndStream()) { + ByteBuf response = Unpooled.copiedBuffer(LARGE_STRING, CharsetUtil.US_ASCII); + responseFuture = ctx.writeAndFlush(new DefaultHttp2DataFrame(response, true)); + } + ReferenceCountUtil.release(msg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if (responseFuture.isSuccess()) { + handlerInactivatedFlushed.incrementAndGet(); + } else { + handleInactivatedNotFlushed.incrementAndGet(); + } + latchHandlerInactive.countDown(); + ctx.fireChannelInactive(); + } + + private static String generateLargeString(int sizeInBytes) { + StringBuilder sb = new StringBuilder(sizeInBytes); + for (int i = 0; i < sizeInBytes; i++) { + sb.append('X'); + } + return sb.toString(); + } + } + @BeforeEach public void setup() { eventLoopGroup = new NioEventLoopGroup(); @@ -586,4 +637,109 @@ public void operationComplete(Future future) { } } } + + /** + * When an HTTP/2 server stream channel receives a frame with EOS flag, and when it responds with a EOS + * flag, then the server side stream will be closed, hence the stream handler will be inactivated. This test + * verifies that the ChannelFuture of the server response is successful at the time the server stream handler is + * inactivated. + */ + @Test + @Timeout(value = 120000L, unit = MILLISECONDS) + public void streamHandlerInactivatedResponseFlushed() throws InterruptedException { + EventLoopGroup serverEventLoopGroup = null; + EventLoopGroup clientEventLoopGroup = null; + + try { + serverEventLoopGroup = new NioEventLoopGroup(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "serverloop"); + } + }); + + clientEventLoopGroup = new NioEventLoopGroup(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "clientloop"); + } + }); + + final int streams = 10; + final CountDownLatch latchClientResponses = new CountDownLatch(streams); + final CountDownLatch latchHandlerInactive = new CountDownLatch(streams); + + final AtomicInteger handlerInactivatedFlushed = new AtomicInteger(); + final AtomicInteger handleInactivatedNotFlushed = new AtomicInteger(); + final ServerBootstrap sb = new ServerBootstrap(); + + sb.group(serverEventLoopGroup); + sb.channel(NioServerSocketChannel.class); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + // using a short sndbuf size will trigger writability events + ch.config().setOption(ChannelOption.SO_SNDBUF, 1); + ch.pipeline().addLast(new Http2FrameCodecBuilder(true).build()); + ch.pipeline().addLast(new Http2MultiplexHandler(new ChannelInitializer() { + protected void initChannel(Channel ch) { + ch.pipeline().remove(this); + ch.pipeline().addLast(new MultiplexInboundStream(handlerInactivatedFlushed, + handleInactivatedNotFlushed, latchHandlerInactive)); + } + })); + } + }); + serverChannel = sb.bind(new InetSocketAddress(NetUtil.LOCALHOST, 0)).syncUninterruptibly().channel(); + + final Bootstrap bs = new Bootstrap(); + + bs.group(clientEventLoopGroup); + bs.channel(NioSocketChannel.class); + bs.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new Http2FrameCodecBuilder(false).build()); + ch.pipeline().addLast(new Http2MultiplexHandler(DISCARD_HANDLER)); + } + }); + + clientChannel = bs.connect(serverChannel.localAddress()).syncUninterruptibly().channel(); + final Http2StreamChannelBootstrap h2Bootstrap = new Http2StreamChannelBootstrap(clientChannel); + h2Bootstrap.handler(new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof Http2DataFrame && ((Http2DataFrame) msg).isEndStream()) { + latchClientResponses.countDown(); + } + ReferenceCountUtil.release(msg); + } + @Override + public boolean isSharable() { + return true; + } + }); + + List streamFutures = new ArrayList(); + for (int i = 0; i < streams; i ++) { + Http2StreamChannel stream = h2Bootstrap.open().syncUninterruptibly().getNow(); + streamFutures.add(stream.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), true))); + } + for (int i = 0; i < streams; i ++) { + streamFutures.get(i).syncUninterruptibly(); + } + + assertTrue(latchHandlerInactive.await(120000, MILLISECONDS)); + assertTrue(latchClientResponses.await(120000, MILLISECONDS)); + assertEquals(0, handleInactivatedNotFlushed.get()); + assertEquals(streams, handlerInactivatedFlushed.get()); + } finally { + if (serverEventLoopGroup != null) { + serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS); + } + if (clientEventLoopGroup != null) { + clientEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS); + } + } + } }