From 762a4bd62b4fc6df8c299775488172470e9a8744 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 15 Nov 2024 09:23:02 -0800 Subject: [PATCH] fix trappy http stream tests (#116829) (cherry picked from commit 94897268b345319e2c559eb8c7c361389590c2d1) # Conflicts: # modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java # modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java # muted-tests.yml --- .../netty4/Netty4IncrementalRequestHandlingIT.java | 8 ++++---- .../http/netty4/Netty4HttpRequestBodyStream.java | 12 +++++++++--- .../netty4/Netty4HttpRequestBodyStreamTests.java | 4 ++-- muted-tests.yml | 3 --- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 26d31b941f356..a47e805ff63a2 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -173,7 +173,7 @@ public void testClientConnectionCloseMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotNull(handler.stream.buf())); + assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); // enable auto-read to receive channel close event handler.stream.channel().config().setAutoRead(true); @@ -182,7 +182,7 @@ public void testClientConnectionCloseMidStream() throws Exception { // terminate connection and wait resources are released ctx.clientChannel.close(); assertBusy(() -> { - assertNull(handler.stream.buf()); + assertEquals(0, handler.stream.bufSize()); assertTrue(handler.streamClosed); }); } @@ -199,13 +199,13 @@ public void testServerCloseConnectionMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotNull(handler.stream.buf())); + assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.streamClosed); // terminate connection on server and wait resources are released handler.channel.request().getHttpChannel().close(); assertBusy(() -> { - assertNull(handler.stream.buf()); + assertEquals(0, handler.stream.bufSize()); assertTrue(handler.streamClosed); }); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 96f7deea978d9..309388c16aa71 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -35,11 +35,14 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final ChannelFutureListener closeListener = future -> doClose(); private final List tracingHandlers = new ArrayList<>(4); private ByteBuf buf; - private boolean hasLast = false; private boolean requested = false; private boolean closing = false; private HttpBody.ChunkHandler handler; + // used in tests + private volatile int bufSize = 0; + private volatile boolean hasLast = false; + public Netty4HttpRequestBodyStream(Channel channel) { this.channel = channel; Netty4Utils.addListener(channel.closeFuture(), closeListener); @@ -106,6 +109,7 @@ private void addChunk(ByteBuf chunk) { comp.addComponent(true, chunk); buf = comp; } + bufSize = buf.readableBytes(); } // visible for test @@ -114,8 +118,8 @@ Channel channel() { } // visible for test - ByteBuf buf() { - return buf; + int bufSize() { + return bufSize; } // visible for test @@ -129,6 +133,7 @@ private void send() { var bytesRef = Netty4Utils.toReleasableBytesReference(buf); requested = false; buf = null; + bufSize = 0; for (var tracer : tracingHandlers) { tracer.onNext(bytesRef, hasLast); } @@ -155,6 +160,7 @@ private void doClose() { if (buf != null) { buf.release(); buf = null; + bufSize = 0; } channel.config().setAutoRead(true); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index f268c3e6e744f..f737c2e481258 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -58,7 +58,7 @@ public void testEnqueueChunksBeforeRequest() { for (int i = 0; i < totalChunks; i++) { channel.writeInbound(randomContent(1024)); } - assertEquals(totalChunks * 1024, stream.buf().readableBytes()); + assertEquals(totalChunks * 1024, stream.bufSize()); } // ensures all received chunks can be flushed downstream @@ -101,7 +101,7 @@ public void testReadFromChannel() { channel.writeInbound(randomLastContent(chunkSize)); for (int i = 0; i < totalChunks; i++) { - assertNull("should not enqueue chunks", stream.buf()); + assertEquals("should not enqueue chunks", 0, stream.bufSize()); stream.next(); channel.runPendingTasks(); assertEquals("each next() should produce single chunk", i + 1, gotChunks.size()); diff --git a/muted-tests.yml b/muted-tests.yml index de04df8c00544..6ea09a48607e4 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -286,9 +286,6 @@ tests: - class: org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderIT method: testEnterpriseDownloaderTask issue: https://github.com/elastic/elasticsearch/issues/115163 -- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT - method: testServerCloseConnectionMidStream - issue: https://github.com/elastic/elasticsearch/issues/116774 # Examples: #