diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java index a258d033ba..00237da4a2 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java @@ -1,8 +1,6 @@ package org.opensearch.migrations.replay; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufHolder; -import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -11,9 +9,6 @@ import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; - import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; @@ -24,6 +19,10 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.replay.util.ByteBufUtils; +import org.opensearch.migrations.replay.util.RefSafeHolder; @Slf4j public class HttpByteBufFormatter { @@ -64,38 +63,29 @@ public static String httpPacketBytesToString(HttpMessageType msgType, List byteArrStream) { - return httpPacketBytesToString(msgType, byteArrStream, DEFAULT_LINE_DELIMITER); - } - - public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream, - boolean releaseByteBufs) { - return httpPacketBufsToString(msgType, byteBufStream, releaseByteBufs, DEFAULT_LINE_DELIMITER); - } - - public static String httpPacketBytesToString(HttpMessageType msgType, List byteArrStream, String lineDelimiter) { - return httpPacketBytesToString(msgType, - Optional.ofNullable(byteArrStream).map(p -> p.stream()).orElse(Stream.of()), lineDelimiter); + public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream) { + return httpPacketBufsToString(msgType, byteBufStream, DEFAULT_LINE_DELIMITER); } - public static String httpPacketBytesToString(HttpMessageType msgType, Stream byteArrStream, String lineDelimiter) { + public static String httpPacketBytesToString(HttpMessageType msgType, List byteArrs, String lineDelimiter) { // This isn't memory efficient, // but stringifying byte bufs through a full parse and reserializing them was already really slow! - return httpPacketBufsToString(msgType, byteArrStream.map(Unpooled::wrappedBuffer), true, lineDelimiter); + try (var stream = ByteBufUtils.createCloseableByteBufStream(byteArrs)) { + return httpPacketBufsToString(msgType, stream, lineDelimiter); + } } - public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream, - boolean releaseByteBufs, String lineDelimiter) { + public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream, String lineDelimiter) { switch (printStyle.get().orElse(PacketPrintFormat.TRUNCATED)) { case TRUNCATED: - return httpPacketBufsToString(byteBufStream, Utils.MAX_BYTES_SHOWN_FOR_TO_STRING, releaseByteBufs); + return httpPacketBufsToString(byteBufStream, Utils.MAX_BYTES_SHOWN_FOR_TO_STRING); case FULL_BYTES: - return httpPacketBufsToString(byteBufStream, Long.MAX_VALUE, releaseByteBufs); + return httpPacketBufsToString(byteBufStream, Long.MAX_VALUE); case PARSED_HTTP: - return httpPacketsToPrettyPrintedString(msgType, byteBufStream, false, releaseByteBufs, + return httpPacketsToPrettyPrintedString(msgType, byteBufStream, false, lineDelimiter); case PARSED_HTTP_SORTED_HEADERS: - return httpPacketsToPrettyPrintedString(msgType, byteBufStream, true, releaseByteBufs, + return httpPacketsToPrettyPrintedString(msgType, byteBufStream, true, lineDelimiter); default: throw new IllegalStateException("Unknown PacketPrintFormat: " + printStyle.get()); @@ -103,22 +93,22 @@ public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream, - boolean sortHeaders, boolean releaseByteBufs, String lineDelimiter) { - HttpMessage httpMessage = parseHttpMessageFromBufs(msgType, byteBufStream, releaseByteBufs); - var holderOp = Optional.ofNullable((httpMessage instanceof ByteBufHolder) ? (ByteBufHolder) httpMessage : null); - try { - if (httpMessage instanceof FullHttpRequest) { - return prettyPrintNettyRequest((FullHttpRequest) httpMessage, sortHeaders, lineDelimiter); - } else if (httpMessage instanceof FullHttpResponse) { - return prettyPrintNettyResponse((FullHttpResponse) httpMessage, sortHeaders, lineDelimiter); - } else if (httpMessage == null) { - return "[NULL]"; + boolean sortHeaders, String lineDelimiter) { + try(var messageHolder = RefSafeHolder.create(parseHttpMessageFromBufs(msgType, byteBufStream))) { + final Optional httpMessageOp = messageHolder.get(); + if (httpMessageOp.isPresent()) { + var httpMessage = httpMessageOp.get(); + if (httpMessage instanceof FullHttpRequest) { + return prettyPrintNettyRequest((FullHttpRequest) httpMessage, sortHeaders, lineDelimiter); + } else if (httpMessage instanceof FullHttpResponse) { + return prettyPrintNettyResponse((FullHttpResponse) httpMessage, sortHeaders, lineDelimiter); + } else { + throw new IllegalStateException("Embedded channel with an HttpObjectAggregator returned an " + + "unexpected object of type " + httpMessage.getClass() + ": " + httpMessage); + } } else { - throw new IllegalStateException("Embedded channel with an HttpObjectAggregator returned an " + - "unexpected object of type " + httpMessage.getClass() + ": " + httpMessage); + return "[NULL]"; } - } finally { - holderOp.ifPresent(bbh->bbh.content().release()); } } @@ -153,58 +143,40 @@ private static String prettyPrintNettyMessage(StringJoiner sj, boolean sorted, H * @param byteBufStream * @return */ - public static HttpMessage parseHttpMessageFromBufs(HttpMessageType msgType, Stream byteBufStream, - boolean releaseByteBufs) { + public static HttpMessage parseHttpMessageFromBufs(HttpMessageType msgType, Stream byteBufStream) { EmbeddedChannel channel = new EmbeddedChannel( msgType == HttpMessageType.REQUEST ? new HttpServerCodec() : new HttpClientCodec(), new HttpContentDecompressor(), new HttpObjectAggregator(Utils.MAX_PAYLOAD_SIZE_TO_PRINT) // Set max content length if needed ); - - byteBufStream.forEach(b -> { - try { - channel.writeInbound(b.retainedDuplicate()); - } finally { - if (releaseByteBufs) { - b.release(); - } - } - }); - try { + byteBufStream.forEachOrdered(b -> channel.writeInbound(b.retainedDuplicate())); return channel.readInbound(); } finally { channel.finishAndReleaseAll(); } } - public static FullHttpRequest parseHttpRequestFromBufs(Stream byteBufStream, boolean releaseByteBufs) { - return (FullHttpRequest) parseHttpMessageFromBufs(HttpMessageType.REQUEST, byteBufStream, releaseByteBufs); + public static FullHttpRequest parseHttpRequestFromBufs(Stream byteBufStream) { + return (FullHttpRequest) parseHttpMessageFromBufs(HttpMessageType.REQUEST, byteBufStream); } - public static FullHttpResponse parseHttpResponseFromBufs(Stream byteBufStream, boolean releaseByteBufs) { - return (FullHttpResponse) parseHttpMessageFromBufs(HttpMessageType.RESPONSE, byteBufStream, releaseByteBufs); + public static FullHttpResponse parseHttpResponseFromBufs(Stream byteBufStream) { + return (FullHttpResponse) parseHttpMessageFromBufs(HttpMessageType.RESPONSE, byteBufStream); } - public static String httpPacketBufsToString(Stream byteBufStream, long maxBytesToShow, - boolean releaseByteBufs) { + public static String httpPacketBufsToString(Stream byteBufStream, long maxBytesToShow) { if (byteBufStream == null) { return "null"; } return byteBufStream.map(originalByteBuf -> { - try { - var bb = originalByteBuf.duplicate(); - var length = bb.readableBytes(); - var str = IntStream.range(0, length).map(idx -> bb.readByte()) - .limit(maxBytesToShow) - .mapToObj(b -> "" + (char) b) - .collect(Collectors.joining()); - return "[" + (length > maxBytesToShow ? str + "..." : str) + "]"; - } finally { - if (releaseByteBufs) { - originalByteBuf.release(); - } - }}) - .collect(Collectors.joining(",")); + var bb = originalByteBuf.duplicate(); + var length = bb.readableBytes(); + var str = IntStream.range(0, length).map(idx -> bb.readByte()) + .limit(maxBytesToShow) + .mapToObj(b -> "" + (char) b) + .collect(Collectors.joining()); + return "[" + (length > maxBytesToShow ? str + "..." : str) + "]"; + }).collect(Collectors.joining(",")); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java index cb9089c3df..92f9315714 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java @@ -1,6 +1,7 @@ package org.opensearch.migrations.replay; -import io.netty.buffer.Unpooled; +import static org.opensearch.migrations.replay.util.ByteBufUtils.createCloseableByteBufStream; + import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Lombok; @@ -65,17 +66,17 @@ public Stream stream() { } public String format(Optional messageTypeOp) { - var packetBytesAsStr = messageTypeOp.map(mt-> HttpByteBufFormatter.httpPacketBytesToString(mt, packetBytes, - HttpByteBufFormatter.LF_LINE_DELIMITER)) - .orElseGet(()-> HttpByteBufFormatter.httpPacketBufsToString( - packetBytes.stream().map(Unpooled::wrappedBuffer), - Utils.MAX_PAYLOAD_SIZE_TO_PRINT, true)); - final StringBuilder sb = new StringBuilder("HttpMessageAndTimestamp{"); - sb.append("firstPacketTimestamp=").append(firstPacketTimestamp); - sb.append(", lastPacketTimestamp=").append(lastPacketTimestamp); - sb.append(", message=[").append(packetBytesAsStr); - sb.append("]}"); - return sb.toString(); + try (var bufStream = createCloseableByteBufStream(packetBytes)) { + var packetBytesAsStr = messageTypeOp.map(mt-> HttpByteBufFormatter.httpPacketBytesToString(mt, packetBytes, + HttpByteBufFormatter.LF_LINE_DELIMITER)) + .orElseGet(()-> HttpByteBufFormatter.httpPacketBufsToString(bufStream, Utils.MAX_PAYLOAD_SIZE_TO_PRINT)); + final StringBuilder sb = new StringBuilder("HttpMessageAndTimestamp{"); + sb.append("firstPacketTimestamp=").append(firstPacketTimestamp); + sb.append(", lastPacketTimestamp=").append(lastPacketTimestamp); + sb.append(", message=[").append(packetBytesAsStr); + sb.append("]}"); + return sb.toString(); + } } public void addSegment(byte[] data) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index 0c78c5a07e..a520a54ed3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -1,14 +1,9 @@ package org.opensearch.migrations.replay; +import static org.opensearch.migrations.replay.util.ByteBufUtils.createCloseableByteBufStream; + import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpHeaders; -import io.netty.util.ReferenceCounted; -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; -import org.opensearch.migrations.replay.datatypes.TransformedPackets; -import org.opensearch.migrations.replay.tracing.IReplayContexts; - import java.time.Duration; import java.util.Base64; import java.util.LinkedHashMap; @@ -17,7 +12,12 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.stream.Collectors; -import java.util.stream.Stream; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.replay.datatypes.TransformedPackets; +import org.opensearch.migrations.replay.tracing.IReplayContexts; +import org.opensearch.migrations.replay.tracing.IReplayContexts.ITupleHandlingContext; +import org.opensearch.migrations.replay.util.RefSafeHolder; /** * TODO - This class will pull all bodies in as a byte[], even if that byte[] isn't @@ -37,7 +37,7 @@ public class ParsedHttpMessagesAsDicts { public final Optional> sourceResponseOp; public final Optional> targetRequestOp; public final Optional> targetResponseOp; - public final IReplayContexts.ITupleHandlingContext context; + public final ITupleHandlingContext context; public ParsedHttpMessagesAsDicts(@NonNull SourceTargetCaptureTuple tuple) { this(tuple, Optional.ofNullable(tuple.sourcePair)); @@ -82,7 +82,7 @@ private static Optional> getSourceResponseOp(SourceTargetCap .map(d -> convertRequest(context, d))); } - public ParsedHttpMessagesAsDicts(IReplayContexts.ITupleHandlingContext context, + public ParsedHttpMessagesAsDicts(ITupleHandlingContext context, Optional> sourceRequestOp1, Optional> sourceResponseOp2, Optional> targetRequestOp3, @@ -102,11 +102,6 @@ public static void fillStatusCodeMetrics(@NonNull IReplayContexts.ITupleHandling targetResponseOp.ifPresent(r -> context.setTargetStatus((Integer) r.get(STATUS_CODE_KEY))); } - - private static Stream byteToByteBufStream(List incoming) { - return incoming.stream().map(Unpooled::wrappedBuffer); - } - private static byte[] getBytesFromByteBuf(ByteBuf buf) { var bytes = new byte[buf.readableBytes()]; buf.getBytes(buf.readerIndex(), bytes); @@ -138,17 +133,21 @@ private static Map convertRequest(@NonNull IReplayContexts.ITupl @NonNull List data) { return makeSafeMap(context, () -> { var map = new LinkedHashMap(); - var message = HttpByteBufFormatter.parseHttpRequestFromBufs(byteToByteBufStream(data), true); - try { - map.put("Request-URI", message.uri()); - map.put("Method", message.method().toString()); - map.put("HTTP-Version", message.protocolVersion().toString()); - context.setMethod(message.method().toString()); - context.setEndpoint(message.uri()); - context.setHttpVersion(message.protocolVersion().toString()); - return fillMap(map, message.headers(), message.content()); - } finally { - Optional.ofNullable(message).ifPresent(ReferenceCounted::release); + try (var bufStream = createCloseableByteBufStream(data); + var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpRequestFromBufs(bufStream))) { + var messageOp = messageHolder.get(); + if (messageOp.isPresent()) { + var message = messageOp.get(); + map.put("Request-URI", message.uri()); + map.put("Method", message.method().toString()); + map.put("HTTP-Version", message.protocolVersion().toString()); + context.setMethod(message.method().toString()); + context.setEndpoint(message.uri()); + context.setHttpVersion(message.protocolVersion().toString()); + return fillMap(map, message.headers(), message.content()); + } else { + return Map.of("Exception", "Message couldn't be parsed as a full http message"); + } } }); } @@ -157,18 +156,19 @@ private static Map convertResponse(@NonNull IReplayContexts.ITup @NonNull List data, Duration latency) { return makeSafeMap(context, () -> { var map = new LinkedHashMap(); - var message = HttpByteBufFormatter.parseHttpResponseFromBufs(byteToByteBufStream(data), true); - if (message == null) { - return Map.of("Exception", "Message couldn't be parsed as a full http message"); - } - try { - map.put("HTTP-Version", message.protocolVersion()); - map.put(STATUS_CODE_KEY, message.status().code()); - map.put("Reason-Phrase", message.status().reasonPhrase()); - map.put(RESPONSE_TIME_MS_KEY, latency.toMillis()); - return fillMap(map, message.headers(), message.content()); - } finally { - Optional.ofNullable(message).ifPresent(ReferenceCounted::release); + try (var bufStream = createCloseableByteBufStream(data); + var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpResponseFromBufs(bufStream))) { + var messageOp = messageHolder.get(); + if (messageOp.isPresent()) { + var message = messageOp.get(); + map.put("HTTP-Version", message.protocolVersion()); + map.put(STATUS_CODE_KEY, message.status().code()); + map.put("Reason-Phrase", message.status().reasonPhrase()); + map.put(RESPONSE_TIME_MS_KEY, latency.toMillis()); + return fillMap(map, message.headers(), message.content()); + } else { + return Map.of("Exception", "Message couldn't be parsed as a full http message"); + } } }); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java index b63f1aa150..0067ed78c7 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java @@ -54,7 +54,7 @@ public String toString() { if (targetResponseDuration != null) { sj.add("targetResponseDuration=").add(targetResponseDuration+""); } Optional.ofNullable(targetRequestData).ifPresent(d-> sj.add("targetRequestData=") .add(d.isClosed() ? "CLOSED" : HttpByteBufFormatter.httpPacketBufsToString( - HttpByteBufFormatter.HttpMessageType.REQUEST, d.streamUnretained(), false, LF_LINE_DELIMITER))); + HttpByteBufFormatter.HttpMessageType.REQUEST, d.streamUnretained(), LF_LINE_DELIMITER))); Optional.ofNullable(targetResponseData).filter(d->!d.isEmpty()).ifPresent(d -> sj.add("targetResponseData=") .add(HttpByteBufFormatter.httpPacketBytesToString(HttpByteBufFormatter.HttpMessageType.RESPONSE, d, LF_LINE_DELIMITER))); sj.add("transformStatus=").add(transformationStatus+""); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java new file mode 100644 index 0000000000..5081341316 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java @@ -0,0 +1,13 @@ +package org.opensearch.migrations.replay.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.Collection; + +import java.util.stream.Stream; + +public interface ByteBufUtils { + static Stream createCloseableByteBufStream(Collection byteArrCollection) { + return RefSafeStreamUtils.refSafeMap(byteArrCollection.stream(), Unpooled::wrappedBuffer); + } +} \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java new file mode 100644 index 0000000000..c4356ad276 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java @@ -0,0 +1,26 @@ +package org.opensearch.migrations.replay.util; + +import io.netty.util.ReferenceCountUtil; +import java.util.Optional; +import javax.annotation.Nullable; + +public class RefSafeHolder implements AutoCloseable { + private final T resource; + + private RefSafeHolder(@Nullable T resource) { + this.resource = resource; + } + + static public RefSafeHolder create(@Nullable T resource) { + return new RefSafeHolder<>(resource); + } + + public Optional get() { + return Optional.ofNullable(resource); + } + + @Override + public void close() { + ReferenceCountUtil.release(resource); + } +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java new file mode 100644 index 0000000000..43190a9906 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java @@ -0,0 +1,19 @@ +package org.opensearch.migrations.replay.util; + +import io.netty.util.ReferenceCounted; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.function.Function; +import java.util.stream.Stream; + +public interface RefSafeStreamUtils { + static Stream refSafeMap(Stream inputStream, + Function referenceTrackedMappingFunction) { + final Deque refCountedTracker = new ArrayDeque<>(); + return inputStream.map(t -> { + var resource = referenceTrackedMappingFunction.apply(t); + refCountedTracker.add(resource); + return resource; + }).onClose(() -> refCountedTracker.forEach(ReferenceCounted::release)); + } +} diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java index c895bc3676..edfaba59f9 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java @@ -146,7 +146,7 @@ private static String prettyPrintByteBufs(List byteArrays, HttpByteBufFormatter.HttpMessageType messageType, boolean usePooled) { var bbList = byteArrays.stream().map(b->TestUtilities.getByteBuf(b,usePooled)).collect(Collectors.toList()); - var formattedString = HttpByteBufFormatter.httpPacketBufsToString(messageType, bbList.stream(), false); + var formattedString = HttpByteBufFormatter.httpPacketBufsToString(messageType, bbList.stream()); bbList.forEach(bb->bb.release()); return formattedString; } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index 661d76ba31..912be16ae9 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -1,28 +1,28 @@ package org.opensearch.migrations.replay; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufHolder; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.FullHttpResponse; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; +import org.opensearch.migrations.replay.util.ByteBufUtils; import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture; +import org.opensearch.migrations.replay.util.RefSafeHolder; import org.opensearch.migrations.testutils.SimpleHttpServer; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import org.opensearch.migrations.tracing.InstrumentationTest; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - @Slf4j @WrapWithNettyLeakDetection(repetitions = 1) class RequestSenderOrchestratorTest extends InstrumentationTest { @@ -67,18 +67,18 @@ public void testThatSchedulingWorks() throws Exception { var arr = cf.get(); Assertions.assertNull(arr.error); Assertions.assertTrue(arr.responseSizeInBytes > 0); - var httpMessage = HttpByteBufFormatter.parseHttpMessageFromBufs(HttpByteBufFormatter.HttpMessageType.RESPONSE, - arr.responsePackets.stream().map(kvp -> Unpooled.wrappedBuffer(kvp.getValue())), false); - try { - var response = (FullHttpResponse) httpMessage; + var packetBytesArr = arr.responsePackets.stream().map(SimpleEntry::getValue).collect(Collectors.toList()); + try (var bufStream = ByteBufUtils.createCloseableByteBufStream(packetBytesArr); + var messageHolder = RefSafeHolder.create( + HttpByteBufFormatter.parseHttpMessageFromBufs(HttpByteBufFormatter.HttpMessageType.RESPONSE, + bufStream))) { + Assertions.assertTrue(messageHolder.get().isPresent()); + var response = (FullHttpResponse) messageHolder.get().get(); Assertions.assertEquals(200, response.status().code()); var body = response.content(); Assertions.assertEquals(TestHttpServerContext.SERVER_RESPONSE_BODY_PREFIX + TestHttpServerContext.getUriForIthRequest(i / NUM_REPEATS), - new String(body.duplicate().toString(StandardCharsets.UTF_8))); - } finally { - Optional.ofNullable((httpMessage instanceof ByteBufHolder) ? (ByteBufHolder) httpMessage : null) - .ifPresent(bbh -> bbh.content().release()); + body.duplicate().toString(StandardCharsets.UTF_8)); } } closeFuture.get();