From 1b661654b4bb1b53afe766f255e106c4df2dca5b Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 2 May 2024 16:50:26 -0500 Subject: [PATCH] Remove memory leaks in ConditionallyReliableLoggingHttpHandlerTest Signed-off-by: Andre Kurait --- ...ionallyReliableLoggingHttpHandlerTest.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java index ac35777b2..80b2aad91 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java @@ -30,6 +30,7 @@ import java.util.stream.Stream; @Slf4j +@WrapWithNettyLeakDetection public class ConditionallyReliableLoggingHttpHandlerTest { private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer channelWriter, @@ -46,10 +47,11 @@ private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer new ByteBufInputStream((ByteBuf) m, true)) - .collect(Collectors.toList()))) - .readAllBytes(); + var outputDataStream = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() + .map(m -> new ByteBufInputStream((ByteBuf) m, false)) + .collect(Collectors.toList()))); + var outputData = outputDataStream.readAllBytes(); + outputDataStream.close(); Assertions.assertArrayEquals(fullTrafficBytes, outputData); Assertions.assertNotNull(streamManager.byteBufferAtomicReference.get(), @@ -70,6 +72,9 @@ private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer offloader, headerCapturePredicate, x -> true)); getWriter(false, true, SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8)).accept(channel); + channel.finishAndReleaseAll(); channel.close(); var requestBytes = SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8); @@ -160,7 +166,7 @@ public void testThatHealthCheckCaptureCanBeSuppressed(boolean singleBytes) throw // we wrote the correct data to the downstream handler/channel var consumedData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() - .map(m -> new ByteBufInputStream((ByteBuf) m)) + .map(m -> new ByteBufInputStream((ByteBuf) m, false)) .collect(Collectors.toList()))) .readAllBytes(); log.info("captureddata = " + new String(consumedData, StandardCharsets.UTF_8)); @@ -198,6 +204,8 @@ public void testThatHealthCheckCaptureCanBeSuppressed(boolean singleBytes) throw new String(reconstitutedTrafficStreamWrites, StandardCharsets.UTF_8)); Assertions.assertArrayEquals(bytesForResponsePreserved, reconstitutedTrafficStreamWrites); } + + channel.finishAndReleaseAll(); } } @@ -220,7 +228,6 @@ private Consumer getWriter(boolean singleBytes, boolean usePool @ParameterizedTest @ValueSource(booleans = {true, false}) - @WrapWithNettyLeakDetection(repetitions = 16) public void testThatAPostInTinyPacketsBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { testThatAPostInTinyPacketsBlocksFutureActivity(usePool, false); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true);