Skip to content

Commit

Permalink
Add a test to show exception in channel does not work
Browse files Browse the repository at this point in the history
Relates: ES-9537
  • Loading branch information
ywangd committed Oct 18, 2024
1 parent 5bf446e commit f2dd6dc
Showing 1 changed file with 33 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.Collection;
Expand Down Expand Up @@ -211,6 +212,33 @@ public void testServerCloseConnectionMidStream() throws Exception {
}
}

public void testServerExceptionMidStream() throws Exception {
try (var ctx = setupClientCtx()) {
var opaqueId = opaqueId(0);

// write half of http request
ctx.clientChannel.write(httpRequest(opaqueId, 2 * 1024));
ctx.clientChannel.writeAndFlush(randomContent(1024, false));

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotNull(handler.stream.buf()));
assertFalse(handler.streamClosed);

if (randomBoolean() || true) {
handler.stream.channel().eventLoop().submit(() -> { throw new RuntimeException("simulated exception"); });
} else {
handler.shouldThrowInsideHandleChunk = true;
handler.stream.next();
}

assertBusy(() -> {
assertNull(handler.stream.buf());
assertTrue(handler.streamClosed);
});
}
}

// ensure that client's socket buffers data when server is not consuming data
public void testClientBackpressure() throws Exception {
try (var ctx = setupClientCtx()) {
Expand Down Expand Up @@ -594,6 +622,7 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon
RestChannel channel;
boolean recvLast = false;
volatile boolean streamClosed = false;
volatile boolean shouldThrowInsideHandleChunk = false;

ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
this.opaqueId = opaqueId;
Expand All @@ -602,6 +631,10 @@ static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkCon

@Override
public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) {
Transports.assertTransportThread();
if (shouldThrowInsideHandleChunk) {
throw new RuntimeException("simulated exception inside handleChunk");
}
recvChunks.add(new Chunk(chunk, isLast));
}

Expand Down

0 comments on commit f2dd6dc

Please sign in to comment.