Skip to content

Commit

Permalink
H2/H2C server stream channels deactivated while write still in progre…
Browse files Browse the repository at this point in the history
…ss (netty#13388)

Motivation:

In the context of Reactor-Netty, we experience an issue during HTTP2 (or
H2C) load tests.
We have an HTTP2 (or H2C, it does not matter) server which is using in
the pipeline an Http2FrameCodec, followed by a Http2MultiplexHandler,
and when the client sends a new request, then the Http2MultiplexHandler
will create a child `Http2MultiplexHandlerStreamChannel` that will
contain in its pipeline an Http2StreamFrameToHttpObjectCodec followed by
some other specific reactor-netty handlers.

Now, the issue is the following: when the client sends the last request
frame (with EOS flag=true), and when the server sends the last response
frame (also with EOS flag = true), then in the server the stream will be
closed, and the stream channel handlers will be called in
`channelInactive` (that is ok).
But sometimes, under high load, when the last server response can't be
flushed when the parent channel is non-writtable, then we see that the
server stream channel handlers may be called in`channelInactive` while
the server last response frame is still writing and is not yet flushed.
In other words, the `ChannelFuture` of the last server response sent to
the client is not "success", but is "incomplete" at the time the child
stream channel handlers are called in `channelInactive`.

Normally, if I'm correct, when a channel handler is called in
`channelInactive`, it means that it is now inactive and has reached its
end of lifetime. So when our handlers are called in `channelInactive`
while the response is not yet flushed (because parent channel was
non-writable), then then we are getting into troubles, because we are
then trying to cleanup resources, like the buffer of the last server
response, but a bit later, when the last response is now flushed, then
the buffer will be freed again, and we end up with many
IllegalReferenceCountExceptions. So, we think that `channelInactive`
should only be invoked after the last response is fully written and
released.

To reproduce the issue and investigate it with the debugger, you can
first run the
`Http2MultiplexTransportTest.streamHandlerInactivatedResponseFlushed`
from this PR, but without applying the patch. Only pay attention to the
"serverloop" thread.

I tend to think that the problem may start from
`DefaultHttp2RemoteFlowController.FlowState.writeAllocatedBytes(int
allocated)`, line 368, where `frame.writeComplete()` is called: this
method will indirectly trigger
`AbstractHttp2StreamChannel.fireChannelInactiveAndDeregister(), line
742`, but without waiting for the frame promise to complete (the promise
is in `DefaultHttp2ConnectionEncoder.FlowControlledBase`).

Modification:


Added a reproducer test
in`Http2MultiplexTransportTest.streamHandlerInactivatedResponseFlushed`test.
This test simulates non-writable socket by configuring the SO_SNDBUF of
the server side connection to `1`. This will trigger the issue, because
when the server will respond, it will get many writability events and
the response won't be flushed immediately.

I have tried to apply the following simple patch which seems to resolve
the problem: In. `Http2ConnectionHandler.closeStream` method, ensures
that the stream is closed once the future is completed:

instead of doing:

```
    @OverRide
    public void closeStream(final Http2Stream stream, ChannelFuture future) {
        stream.close();

        if (future.isDone()) {
            checkCloseConnection(future);
        } else {
            future.addListener(new ChannelFutureListener() {
                @OverRide
                public void operationComplete(ChannelFuture future) throws Exception {
                    checkCloseConnection(future);
                }
            });
        }
    }
```

the closing is then done in the future listener, like this:
```
Override
    public void closeStream(final Http2Stream stream, ChannelFuture future) {
        if (future.isDone()) {
            doCloseStream(stream, future);
        } else {
            future.addListener(f -> doCloseStream(stream, future));
        }
    }
    
    private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
        stream.close();
        checkCloseConnection(future);
    }
```

This seems to resolve the issue, because the stream will be closed only
once the last response frame has been fully flushed.
The Http2ConnectionHandlerTest.canCloseStreamWithVoidPromise has also
been modified in order to set the promise to success before doing the
test:

```
    @test
    public void canCloseStreamWithVoidPromise() throws Exception {
        handler = newHandler();
        handler.closeStream(stream, ctx.voidPromise().setSuccess());
        verify(stream, times(1)).close();
        verifyNoMoreInteractions(stream);
    }
```

Result:

Fixes reactor/reactor-netty#2760
  • Loading branch information
pderop authored May 22, 2023
1 parent 439e907 commit f76d646
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -625,15 +625,13 @@ public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {

@Override
public void closeStream(final Http2Stream stream, ChannelFuture future) {
stream.close();

if (future.isDone()) {
checkCloseConnection(future);
doCloseStream(stream, future);
} else {
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkCloseConnection(future);
public void operationComplete(ChannelFuture future) {
doCloseStream(stream, future);
}
});
}
Expand Down Expand Up @@ -919,6 +917,11 @@ private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture fut
}
}

private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
stream.close();
checkCloseConnection(future);
}

/**
* Returns the client preface string if this is a client connection, otherwise returns {@code null}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ public ChannelFuture answer(InvocationOnMock invocation) throws Throwable {
@Test
public void canCloseStreamWithVoidPromise() throws Exception {
handler = newHandler();
handler.closeStream(stream, ctx.voidPromise());
handler.closeStream(stream, ctx.voidPromise().setSuccess());
verify(stream, times(1)).close();
verifyNoMoreInteractions(stream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -25,6 +26,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
Expand Down Expand Up @@ -60,13 +62,19 @@
import java.security.cert.CertificateException;
import java.security.cert.CertificateExpiredException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

public class Http2MultiplexTransportTest {
Expand All @@ -93,6 +101,49 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
private Channel serverChannel;
private Channel serverConnectedChannel;

private static final class MultiplexInboundStream extends ChannelInboundHandlerAdapter {
ChannelFuture responseFuture;
final AtomicInteger handlerInactivatedFlushed;
final AtomicInteger handleInactivatedNotFlushed;
final CountDownLatch latchHandlerInactive;
static final String LARGE_STRING = generateLargeString(10240);

MultiplexInboundStream(AtomicInteger handleInactivatedFlushed,
AtomicInteger handleInactivatedNotFlushed, CountDownLatch latchHandlerInactive) {
this.handlerInactivatedFlushed = handleInactivatedFlushed;
this.handleInactivatedNotFlushed = handleInactivatedNotFlushed;
this.latchHandlerInactive = latchHandlerInactive;
}

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2HeadersFrame && ((Http2HeadersFrame) msg).isEndStream()) {
ByteBuf response = Unpooled.copiedBuffer(LARGE_STRING, CharsetUtil.US_ASCII);
responseFuture = ctx.writeAndFlush(new DefaultHttp2DataFrame(response, true));
}
ReferenceCountUtil.release(msg);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (responseFuture.isSuccess()) {
handlerInactivatedFlushed.incrementAndGet();
} else {
handleInactivatedNotFlushed.incrementAndGet();
}
latchHandlerInactive.countDown();
ctx.fireChannelInactive();
}

private static String generateLargeString(int sizeInBytes) {
StringBuilder sb = new StringBuilder(sizeInBytes);
for (int i = 0; i < sizeInBytes; i++) {
sb.append('X');
}
return sb.toString();
}
}

@BeforeEach
public void setup() {
eventLoopGroup = new NioEventLoopGroup();
Expand Down Expand Up @@ -586,4 +637,109 @@ public void operationComplete(Future<Channel> future) {
}
}
}

/**
* When an HTTP/2 server stream channel receives a frame with EOS flag, and when it responds with a EOS
* flag, then the server side stream will be closed, hence the stream handler will be inactivated. This test
* verifies that the ChannelFuture of the server response is successful at the time the server stream handler is
* inactivated.
*/
@Test
@Timeout(value = 120000L, unit = MILLISECONDS)
public void streamHandlerInactivatedResponseFlushed() throws InterruptedException {
EventLoopGroup serverEventLoopGroup = null;
EventLoopGroup clientEventLoopGroup = null;

try {
serverEventLoopGroup = new NioEventLoopGroup(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "serverloop");
}
});

clientEventLoopGroup = new NioEventLoopGroup(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "clientloop");
}
});

final int streams = 10;
final CountDownLatch latchClientResponses = new CountDownLatch(streams);
final CountDownLatch latchHandlerInactive = new CountDownLatch(streams);

final AtomicInteger handlerInactivatedFlushed = new AtomicInteger();
final AtomicInteger handleInactivatedNotFlushed = new AtomicInteger();
final ServerBootstrap sb = new ServerBootstrap();

sb.group(serverEventLoopGroup);
sb.channel(NioServerSocketChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
// using a short sndbuf size will trigger writability events
ch.config().setOption(ChannelOption.SO_SNDBUF, 1);
ch.pipeline().addLast(new Http2FrameCodecBuilder(true).build());
ch.pipeline().addLast(new Http2MultiplexHandler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) {
ch.pipeline().remove(this);
ch.pipeline().addLast(new MultiplexInboundStream(handlerInactivatedFlushed,
handleInactivatedNotFlushed, latchHandlerInactive));
}
}));
}
});
serverChannel = sb.bind(new InetSocketAddress(NetUtil.LOCALHOST, 0)).syncUninterruptibly().channel();

final Bootstrap bs = new Bootstrap();

bs.group(clientEventLoopGroup);
bs.channel(NioSocketChannel.class);
bs.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new Http2FrameCodecBuilder(false).build());
ch.pipeline().addLast(new Http2MultiplexHandler(DISCARD_HANDLER));
}
});

clientChannel = bs.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
final Http2StreamChannelBootstrap h2Bootstrap = new Http2StreamChannelBootstrap(clientChannel);
h2Bootstrap.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http2DataFrame && ((Http2DataFrame) msg).isEndStream()) {
latchClientResponses.countDown();
}
ReferenceCountUtil.release(msg);
}
@Override
public boolean isSharable() {
return true;
}
});

List<ChannelFuture> streamFutures = new ArrayList<ChannelFuture>();
for (int i = 0; i < streams; i ++) {
Http2StreamChannel stream = h2Bootstrap.open().syncUninterruptibly().getNow();
streamFutures.add(stream.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers(), true)));
}
for (int i = 0; i < streams; i ++) {
streamFutures.get(i).syncUninterruptibly();
}

assertTrue(latchHandlerInactive.await(120000, MILLISECONDS));
assertTrue(latchClientResponses.await(120000, MILLISECONDS));
assertEquals(0, handleInactivatedNotFlushed.get());
assertEquals(streams, handlerInactivatedFlushed.get());
} finally {
if (serverEventLoopGroup != null) {
serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS);
}
if (clientEventLoopGroup != null) {
clientEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS);
}
}
}
}

0 comments on commit f76d646

Please sign in to comment.