Skip to content

Commit

Permalink
fix trappy http stream tests (elastic#116829)
Browse files Browse the repository at this point in the history
(cherry picked from commit 9489726)

# Conflicts:
#	modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
#	modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java
#	muted-tests.yml
  • Loading branch information
mhl-b committed Nov 18, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 68337ff commit 762a4bd
Showing 4 changed files with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -173,7 +173,7 @@ public void testClientConnectionCloseMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotNull(handler.stream.buf()));
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));

// enable auto-read to receive channel close event
handler.stream.channel().config().setAutoRead(true);
@@ -182,7 +182,7 @@ public void testClientConnectionCloseMidStream() throws Exception {
// terminate connection and wait resources are released
ctx.clientChannel.close();
assertBusy(() -> {
assertNull(handler.stream.buf());
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
@@ -199,13 +199,13 @@ public void testServerCloseConnectionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotNull(handler.stream.buf()));
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);

// terminate connection on server and wait resources are released
handler.channel.request().getHttpChannel().close();
assertBusy(() -> {
assertNull(handler.stream.buf());
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
Original file line number Diff line number Diff line change
@@ -35,11 +35,14 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private final ChannelFutureListener closeListener = future -> doClose();
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
private ByteBuf buf;
private boolean hasLast = false;
private boolean requested = false;
private boolean closing = false;
private HttpBody.ChunkHandler handler;

// used in tests
private volatile int bufSize = 0;
private volatile boolean hasLast = false;

public Netty4HttpRequestBodyStream(Channel channel) {
this.channel = channel;
Netty4Utils.addListener(channel.closeFuture(), closeListener);
@@ -106,6 +109,7 @@ private void addChunk(ByteBuf chunk) {
comp.addComponent(true, chunk);
buf = comp;
}
bufSize = buf.readableBytes();
}

// visible for test
@@ -114,8 +118,8 @@ Channel channel() {
}

// visible for test
ByteBuf buf() {
return buf;
int bufSize() {
return bufSize;
}

// visible for test
@@ -129,6 +133,7 @@ private void send() {
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
requested = false;
buf = null;
bufSize = 0;
for (var tracer : tracingHandlers) {
tracer.onNext(bytesRef, hasLast);
}
@@ -155,6 +160,7 @@ private void doClose() {
if (buf != null) {
buf.release();
buf = null;
bufSize = 0;
}
channel.config().setAutoRead(true);
}
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ public void testEnqueueChunksBeforeRequest() {
for (int i = 0; i < totalChunks; i++) {
channel.writeInbound(randomContent(1024));
}
assertEquals(totalChunks * 1024, stream.buf().readableBytes());
assertEquals(totalChunks * 1024, stream.bufSize());
}

// ensures all received chunks can be flushed downstream
@@ -101,7 +101,7 @@ public void testReadFromChannel() {
channel.writeInbound(randomLastContent(chunkSize));

for (int i = 0; i < totalChunks; i++) {
assertNull("should not enqueue chunks", stream.buf());
assertEquals("should not enqueue chunks", 0, stream.bufSize());
stream.next();
channel.runPendingTasks();
assertEquals("each next() should produce single chunk", i + 1, gotChunks.size());
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
@@ -286,9 +286,6 @@ tests:
- class: org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderIT
method: testEnterpriseDownloaderTask
issue: https://github.com/elastic/elasticsearch/issues/115163
- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT
method: testServerCloseConnectionMidStream
issue: https://github.com/elastic/elasticsearch/issues/116774

# Examples:
#

0 comments on commit 762a4bd

Please sign in to comment.