From a13ec3be2b241353c16d99cafdf4eee57bc5377d Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 19 Apr 2024 10:51:41 -0500 Subject: [PATCH 1/9] Refactor HttpByteBufFormatter without refCount side effects and add in helpers for refSafe stream mappings Signed-off-by: Andre Kurait --- .../replay/HttpByteBufFormatter.java | 116 +++++++----------- .../replay/HttpMessageAndTimestamp.java | 25 ++-- .../replay/ParsedHttpMessagesAsDicts.java | 74 +++++------ .../replay/SourceTargetCaptureTuple.java | 2 +- .../migrations/replay/util/ByteBufUtils.java | 17 +++ .../migrations/replay/util/RefSafeHolder.java | 26 ++++ .../replay/util/RefSafeStreamUtils.java | 19 +++ .../replay/HttpByteBufFormatterTest.java | 2 +- .../replay/RequestSenderOrchestratorTest.java | 34 ++--- 9 files changed, 175 insertions(+), 140 deletions(-) create mode 100644 TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java create mode 100644 TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java create mode 100644 TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java index a258d033b..00237da4a 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java @@ -1,8 +1,6 @@ package org.opensearch.migrations.replay; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufHolder; -import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -11,9 +9,6 @@ import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; - import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; @@ -24,6 +19,10 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.replay.util.ByteBufUtils; +import org.opensearch.migrations.replay.util.RefSafeHolder; @Slf4j public class HttpByteBufFormatter { @@ -64,38 +63,29 @@ public static String httpPacketBytesToString(HttpMessageType msgType, List byteArrStream) { - return httpPacketBytesToString(msgType, byteArrStream, DEFAULT_LINE_DELIMITER); - } - - public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream, - boolean releaseByteBufs) { - return httpPacketBufsToString(msgType, byteBufStream, releaseByteBufs, DEFAULT_LINE_DELIMITER); - } - - public static String httpPacketBytesToString(HttpMessageType msgType, List byteArrStream, String lineDelimiter) { - return httpPacketBytesToString(msgType, - Optional.ofNullable(byteArrStream).map(p -> p.stream()).orElse(Stream.of()), lineDelimiter); + public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream) { + return httpPacketBufsToString(msgType, byteBufStream, DEFAULT_LINE_DELIMITER); } - public static String httpPacketBytesToString(HttpMessageType msgType, Stream byteArrStream, String lineDelimiter) { + public static String httpPacketBytesToString(HttpMessageType msgType, List byteArrs, String lineDelimiter) { // This isn't memory efficient, // but stringifying byte bufs through a full parse and reserializing them was already really slow! - return httpPacketBufsToString(msgType, byteArrStream.map(Unpooled::wrappedBuffer), true, lineDelimiter); + try (var stream = ByteBufUtils.createCloseableByteBufStream(byteArrs)) { + return httpPacketBufsToString(msgType, stream, lineDelimiter); + } } - public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream, - boolean releaseByteBufs, String lineDelimiter) { + public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream, String lineDelimiter) { switch (printStyle.get().orElse(PacketPrintFormat.TRUNCATED)) { case TRUNCATED: - return httpPacketBufsToString(byteBufStream, Utils.MAX_BYTES_SHOWN_FOR_TO_STRING, releaseByteBufs); + return httpPacketBufsToString(byteBufStream, Utils.MAX_BYTES_SHOWN_FOR_TO_STRING); case FULL_BYTES: - return httpPacketBufsToString(byteBufStream, Long.MAX_VALUE, releaseByteBufs); + return httpPacketBufsToString(byteBufStream, Long.MAX_VALUE); case PARSED_HTTP: - return httpPacketsToPrettyPrintedString(msgType, byteBufStream, false, releaseByteBufs, + return httpPacketsToPrettyPrintedString(msgType, byteBufStream, false, lineDelimiter); case PARSED_HTTP_SORTED_HEADERS: - return httpPacketsToPrettyPrintedString(msgType, byteBufStream, true, releaseByteBufs, + return httpPacketsToPrettyPrintedString(msgType, byteBufStream, true, lineDelimiter); default: throw new IllegalStateException("Unknown PacketPrintFormat: " + printStyle.get()); @@ -103,22 +93,22 @@ public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream, - boolean sortHeaders, boolean releaseByteBufs, String lineDelimiter) { - HttpMessage httpMessage = parseHttpMessageFromBufs(msgType, byteBufStream, releaseByteBufs); - var holderOp = Optional.ofNullable((httpMessage instanceof ByteBufHolder) ? (ByteBufHolder) httpMessage : null); - try { - if (httpMessage instanceof FullHttpRequest) { - return prettyPrintNettyRequest((FullHttpRequest) httpMessage, sortHeaders, lineDelimiter); - } else if (httpMessage instanceof FullHttpResponse) { - return prettyPrintNettyResponse((FullHttpResponse) httpMessage, sortHeaders, lineDelimiter); - } else if (httpMessage == null) { - return "[NULL]"; + boolean sortHeaders, String lineDelimiter) { + try(var messageHolder = RefSafeHolder.create(parseHttpMessageFromBufs(msgType, byteBufStream))) { + final Optional httpMessageOp = messageHolder.get(); + if (httpMessageOp.isPresent()) { + var httpMessage = httpMessageOp.get(); + if (httpMessage instanceof FullHttpRequest) { + return prettyPrintNettyRequest((FullHttpRequest) httpMessage, sortHeaders, lineDelimiter); + } else if (httpMessage instanceof FullHttpResponse) { + return prettyPrintNettyResponse((FullHttpResponse) httpMessage, sortHeaders, lineDelimiter); + } else { + throw new IllegalStateException("Embedded channel with an HttpObjectAggregator returned an " + + "unexpected object of type " + httpMessage.getClass() + ": " + httpMessage); + } } else { - throw new IllegalStateException("Embedded channel with an HttpObjectAggregator returned an " + - "unexpected object of type " + httpMessage.getClass() + ": " + httpMessage); + return "[NULL]"; } - } finally { - holderOp.ifPresent(bbh->bbh.content().release()); } } @@ -153,58 +143,40 @@ private static String prettyPrintNettyMessage(StringJoiner sj, boolean sorted, H * @param byteBufStream * @return */ - public static HttpMessage parseHttpMessageFromBufs(HttpMessageType msgType, Stream byteBufStream, - boolean releaseByteBufs) { + public static HttpMessage parseHttpMessageFromBufs(HttpMessageType msgType, Stream byteBufStream) { EmbeddedChannel channel = new EmbeddedChannel( msgType == HttpMessageType.REQUEST ? new HttpServerCodec() : new HttpClientCodec(), new HttpContentDecompressor(), new HttpObjectAggregator(Utils.MAX_PAYLOAD_SIZE_TO_PRINT) // Set max content length if needed ); - - byteBufStream.forEach(b -> { - try { - channel.writeInbound(b.retainedDuplicate()); - } finally { - if (releaseByteBufs) { - b.release(); - } - } - }); - try { + byteBufStream.forEachOrdered(b -> channel.writeInbound(b.retainedDuplicate())); return channel.readInbound(); } finally { channel.finishAndReleaseAll(); } } - public static FullHttpRequest parseHttpRequestFromBufs(Stream byteBufStream, boolean releaseByteBufs) { - return (FullHttpRequest) parseHttpMessageFromBufs(HttpMessageType.REQUEST, byteBufStream, releaseByteBufs); + public static FullHttpRequest parseHttpRequestFromBufs(Stream byteBufStream) { + return (FullHttpRequest) parseHttpMessageFromBufs(HttpMessageType.REQUEST, byteBufStream); } - public static FullHttpResponse parseHttpResponseFromBufs(Stream byteBufStream, boolean releaseByteBufs) { - return (FullHttpResponse) parseHttpMessageFromBufs(HttpMessageType.RESPONSE, byteBufStream, releaseByteBufs); + public static FullHttpResponse parseHttpResponseFromBufs(Stream byteBufStream) { + return (FullHttpResponse) parseHttpMessageFromBufs(HttpMessageType.RESPONSE, byteBufStream); } - public static String httpPacketBufsToString(Stream byteBufStream, long maxBytesToShow, - boolean releaseByteBufs) { + public static String httpPacketBufsToString(Stream byteBufStream, long maxBytesToShow) { if (byteBufStream == null) { return "null"; } return byteBufStream.map(originalByteBuf -> { - try { - var bb = originalByteBuf.duplicate(); - var length = bb.readableBytes(); - var str = IntStream.range(0, length).map(idx -> bb.readByte()) - .limit(maxBytesToShow) - .mapToObj(b -> "" + (char) b) - .collect(Collectors.joining()); - return "[" + (length > maxBytesToShow ? str + "..." : str) + "]"; - } finally { - if (releaseByteBufs) { - originalByteBuf.release(); - } - }}) - .collect(Collectors.joining(",")); + var bb = originalByteBuf.duplicate(); + var length = bb.readableBytes(); + var str = IntStream.range(0, length).map(idx -> bb.readByte()) + .limit(maxBytesToShow) + .mapToObj(b -> "" + (char) b) + .collect(Collectors.joining()); + return "[" + (length > maxBytesToShow ? str + "..." : str) + "]"; + }).collect(Collectors.joining(",")); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java index cb9089c3d..92f931571 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java @@ -1,6 +1,7 @@ package org.opensearch.migrations.replay; -import io.netty.buffer.Unpooled; +import static org.opensearch.migrations.replay.util.ByteBufUtils.createCloseableByteBufStream; + import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Lombok; @@ -65,17 +66,17 @@ public Stream stream() { } public String format(Optional messageTypeOp) { - var packetBytesAsStr = messageTypeOp.map(mt-> HttpByteBufFormatter.httpPacketBytesToString(mt, packetBytes, - HttpByteBufFormatter.LF_LINE_DELIMITER)) - .orElseGet(()-> HttpByteBufFormatter.httpPacketBufsToString( - packetBytes.stream().map(Unpooled::wrappedBuffer), - Utils.MAX_PAYLOAD_SIZE_TO_PRINT, true)); - final StringBuilder sb = new StringBuilder("HttpMessageAndTimestamp{"); - sb.append("firstPacketTimestamp=").append(firstPacketTimestamp); - sb.append(", lastPacketTimestamp=").append(lastPacketTimestamp); - sb.append(", message=[").append(packetBytesAsStr); - sb.append("]}"); - return sb.toString(); + try (var bufStream = createCloseableByteBufStream(packetBytes)) { + var packetBytesAsStr = messageTypeOp.map(mt-> HttpByteBufFormatter.httpPacketBytesToString(mt, packetBytes, + HttpByteBufFormatter.LF_LINE_DELIMITER)) + .orElseGet(()-> HttpByteBufFormatter.httpPacketBufsToString(bufStream, Utils.MAX_PAYLOAD_SIZE_TO_PRINT)); + final StringBuilder sb = new StringBuilder("HttpMessageAndTimestamp{"); + sb.append("firstPacketTimestamp=").append(firstPacketTimestamp); + sb.append(", lastPacketTimestamp=").append(lastPacketTimestamp); + sb.append(", message=[").append(packetBytesAsStr); + sb.append("]}"); + return sb.toString(); + } } public void addSegment(byte[] data) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index 0c78c5a07..7aa7ccc2e 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -1,14 +1,9 @@ package org.opensearch.migrations.replay; +import static org.opensearch.migrations.replay.util.ByteBufUtils.createCloseableByteBufStream; + import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpHeaders; -import io.netty.util.ReferenceCounted; -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; -import org.opensearch.migrations.replay.datatypes.TransformedPackets; -import org.opensearch.migrations.replay.tracing.IReplayContexts; - import java.time.Duration; import java.util.Base64; import java.util.LinkedHashMap; @@ -17,7 +12,12 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.stream.Collectors; -import java.util.stream.Stream; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.opensearch.migrations.replay.datatypes.TransformedPackets; +import org.opensearch.migrations.replay.tracing.IReplayContexts; +import org.opensearch.migrations.replay.tracing.IReplayContexts.ITupleHandlingContext; +import org.opensearch.migrations.replay.util.RefSafeHolder; /** * TODO - This class will pull all bodies in as a byte[], even if that byte[] isn't @@ -82,7 +82,7 @@ private static Optional> getSourceResponseOp(SourceTargetCap .map(d -> convertRequest(context, d))); } - public ParsedHttpMessagesAsDicts(IReplayContexts.ITupleHandlingContext context, + public ParsedHttpMessagesAsDicts(ITupleHandlingContext context, Optional> sourceRequestOp1, Optional> sourceResponseOp2, Optional> targetRequestOp3, @@ -102,11 +102,6 @@ public static void fillStatusCodeMetrics(@NonNull IReplayContexts.ITupleHandling targetResponseOp.ifPresent(r -> context.setTargetStatus((Integer) r.get(STATUS_CODE_KEY))); } - - private static Stream byteToByteBufStream(List incoming) { - return incoming.stream().map(Unpooled::wrappedBuffer); - } - private static byte[] getBytesFromByteBuf(ByteBuf buf) { var bytes = new byte[buf.readableBytes()]; buf.getBytes(buf.readerIndex(), bytes); @@ -138,17 +133,21 @@ private static Map convertRequest(@NonNull IReplayContexts.ITupl @NonNull List data) { return makeSafeMap(context, () -> { var map = new LinkedHashMap(); - var message = HttpByteBufFormatter.parseHttpRequestFromBufs(byteToByteBufStream(data), true); - try { - map.put("Request-URI", message.uri()); - map.put("Method", message.method().toString()); - map.put("HTTP-Version", message.protocolVersion().toString()); - context.setMethod(message.method().toString()); - context.setEndpoint(message.uri()); - context.setHttpVersion(message.protocolVersion().toString()); - return fillMap(map, message.headers(), message.content()); - } finally { - Optional.ofNullable(message).ifPresent(ReferenceCounted::release); + try (var bufStream = createCloseableByteBufStream(data); + var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpRequestFromBufs(bufStream))) { + var messageOp = messageHolder.get(); + if (messageOp.isPresent()) { + var message = messageOp.get(); + map.put("Request-URI", message.uri()); + map.put("Method", message.method().toString()); + map.put("HTTP-Version", message.protocolVersion().toString()); + context.setMethod(message.method().toString()); + context.setEndpoint(message.uri()); + context.setHttpVersion(message.protocolVersion().toString()); + return fillMap(map, message.headers(), message.content()); + } else { + return Map.of("Exception", "Message couldn't be parsed as a full http message"); + } } }); } @@ -157,18 +156,19 @@ private static Map convertResponse(@NonNull IReplayContexts.ITup @NonNull List data, Duration latency) { return makeSafeMap(context, () -> { var map = new LinkedHashMap(); - var message = HttpByteBufFormatter.parseHttpResponseFromBufs(byteToByteBufStream(data), true); - if (message == null) { - return Map.of("Exception", "Message couldn't be parsed as a full http message"); - } - try { - map.put("HTTP-Version", message.protocolVersion()); - map.put(STATUS_CODE_KEY, message.status().code()); - map.put("Reason-Phrase", message.status().reasonPhrase()); - map.put(RESPONSE_TIME_MS_KEY, latency.toMillis()); - return fillMap(map, message.headers(), message.content()); - } finally { - Optional.ofNullable(message).ifPresent(ReferenceCounted::release); + try (var bufStream = createCloseableByteBufStream(data); + var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpResponseFromBufs(bufStream))) { + var messageOp = messageHolder.get(); + if (messageOp.isPresent()) { + var message = messageOp.get(); + map.put("HTTP-Version", message.protocolVersion()); + map.put(STATUS_CODE_KEY, message.status().code()); + map.put("Reason-Phrase", message.status().reasonPhrase()); + map.put(RESPONSE_TIME_MS_KEY, latency.toMillis()); + return fillMap(map, message.headers(), message.content()); + } else { + return Map.of("Exception", "Message couldn't be parsed as a full http message"); + } } }); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java index b63f1aa15..0067ed78c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java @@ -54,7 +54,7 @@ public String toString() { if (targetResponseDuration != null) { sj.add("targetResponseDuration=").add(targetResponseDuration+""); } Optional.ofNullable(targetRequestData).ifPresent(d-> sj.add("targetRequestData=") .add(d.isClosed() ? "CLOSED" : HttpByteBufFormatter.httpPacketBufsToString( - HttpByteBufFormatter.HttpMessageType.REQUEST, d.streamUnretained(), false, LF_LINE_DELIMITER))); + HttpByteBufFormatter.HttpMessageType.REQUEST, d.streamUnretained(), LF_LINE_DELIMITER))); Optional.ofNullable(targetResponseData).filter(d->!d.isEmpty()).ifPresent(d -> sj.add("targetResponseData=") .add(HttpByteBufFormatter.httpPacketBytesToString(HttpByteBufFormatter.HttpMessageType.RESPONSE, d, LF_LINE_DELIMITER))); sj.add("transformStatus=").add(transformationStatus+""); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java new file mode 100644 index 000000000..b18995ebd --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java @@ -0,0 +1,17 @@ +package org.opensearch.migrations.replay.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.Collection; + +import java.util.stream.Stream; + +public interface ByteBufUtils { + static Stream createCloseableByteBufStream(Stream byteArrStream) { + return RefSafeStreamUtils.refSafeMap(byteArrStream, Unpooled::wrappedBuffer); + } + + static Stream createCloseableByteBufStream(Collection byteArrCollection) { + return createCloseableByteBufStream(byteArrCollection.stream()); + } +} \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java new file mode 100644 index 000000000..c4356ad27 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java @@ -0,0 +1,26 @@ +package org.opensearch.migrations.replay.util; + +import io.netty.util.ReferenceCountUtil; +import java.util.Optional; +import javax.annotation.Nullable; + +public class RefSafeHolder implements AutoCloseable { + private final T resource; + + private RefSafeHolder(@Nullable T resource) { + this.resource = resource; + } + + static public RefSafeHolder create(@Nullable T resource) { + return new RefSafeHolder<>(resource); + } + + public Optional get() { + return Optional.ofNullable(resource); + } + + @Override + public void close() { + ReferenceCountUtil.release(resource); + } +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java new file mode 100644 index 000000000..43190a990 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java @@ -0,0 +1,19 @@ +package org.opensearch.migrations.replay.util; + +import io.netty.util.ReferenceCounted; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.function.Function; +import java.util.stream.Stream; + +public interface RefSafeStreamUtils { + static Stream refSafeMap(Stream inputStream, + Function referenceTrackedMappingFunction) { + final Deque refCountedTracker = new ArrayDeque<>(); + return inputStream.map(t -> { + var resource = referenceTrackedMappingFunction.apply(t); + refCountedTracker.add(resource); + return resource; + }).onClose(() -> refCountedTracker.forEach(ReferenceCounted::release)); + } +} diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java index c895bc367..edfaba59f 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java @@ -146,7 +146,7 @@ private static String prettyPrintByteBufs(List byteArrays, HttpByteBufFormatter.HttpMessageType messageType, boolean usePooled) { var bbList = byteArrays.stream().map(b->TestUtilities.getByteBuf(b,usePooled)).collect(Collectors.toList()); - var formattedString = HttpByteBufFormatter.httpPacketBufsToString(messageType, bbList.stream(), false); + var formattedString = HttpByteBufFormatter.httpPacketBufsToString(messageType, bbList.stream()); bbList.forEach(bb->bb.release()); return formattedString; } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index 661d76ba3..912be16ae 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -1,28 +1,28 @@ package org.opensearch.migrations.replay; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufHolder; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.FullHttpResponse; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; +import org.opensearch.migrations.replay.util.ByteBufUtils; import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture; +import org.opensearch.migrations.replay.util.RefSafeHolder; import org.opensearch.migrations.testutils.SimpleHttpServer; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import org.opensearch.migrations.tracing.InstrumentationTest; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - @Slf4j @WrapWithNettyLeakDetection(repetitions = 1) class RequestSenderOrchestratorTest extends InstrumentationTest { @@ -67,18 +67,18 @@ public void testThatSchedulingWorks() throws Exception { var arr = cf.get(); Assertions.assertNull(arr.error); Assertions.assertTrue(arr.responseSizeInBytes > 0); - var httpMessage = HttpByteBufFormatter.parseHttpMessageFromBufs(HttpByteBufFormatter.HttpMessageType.RESPONSE, - arr.responsePackets.stream().map(kvp -> Unpooled.wrappedBuffer(kvp.getValue())), false); - try { - var response = (FullHttpResponse) httpMessage; + var packetBytesArr = arr.responsePackets.stream().map(SimpleEntry::getValue).collect(Collectors.toList()); + try (var bufStream = ByteBufUtils.createCloseableByteBufStream(packetBytesArr); + var messageHolder = RefSafeHolder.create( + HttpByteBufFormatter.parseHttpMessageFromBufs(HttpByteBufFormatter.HttpMessageType.RESPONSE, + bufStream))) { + Assertions.assertTrue(messageHolder.get().isPresent()); + var response = (FullHttpResponse) messageHolder.get().get(); Assertions.assertEquals(200, response.status().code()); var body = response.content(); Assertions.assertEquals(TestHttpServerContext.SERVER_RESPONSE_BODY_PREFIX + TestHttpServerContext.getUriForIthRequest(i / NUM_REPEATS), - new String(body.duplicate().toString(StandardCharsets.UTF_8))); - } finally { - Optional.ofNullable((httpMessage instanceof ByteBufHolder) ? (ByteBufHolder) httpMessage : null) - .ifPresent(bbh -> bbh.content().release()); + body.duplicate().toString(StandardCharsets.UTF_8)); } } closeFuture.get(); From c226cf492e12c5c1d75a25c5988aea133847a035 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 19 Apr 2024 11:51:32 -0500 Subject: [PATCH 2/9] Add back in IReplayContexts class referencing Signed-off-by: Andre Kurait --- .../migrations/replay/ParsedHttpMessagesAsDicts.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index 7aa7ccc2e..76360e089 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -16,7 +16,6 @@ import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.replay.datatypes.TransformedPackets; import org.opensearch.migrations.replay.tracing.IReplayContexts; -import org.opensearch.migrations.replay.tracing.IReplayContexts.ITupleHandlingContext; import org.opensearch.migrations.replay.util.RefSafeHolder; /** @@ -82,7 +81,7 @@ private static Optional> getSourceResponseOp(SourceTargetCap .map(d -> convertRequest(context, d))); } - public ParsedHttpMessagesAsDicts(ITupleHandlingContext context, + public ParsedHttpMessagesAsDicts(IReplayContexts.ITupleHandlingContext context, Optional> sourceRequestOp1, Optional> sourceResponseOp2, Optional> targetRequestOp3, From 1c81fbfb00400c35ccc416024673e09d0e8b6216 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 19 Apr 2024 12:02:26 -0500 Subject: [PATCH 3/9] Rename ByteBufUtils to NettyUtils and convert util libraries to final class Signed-off-by: Andre Kurait --- .../migrations/replay/HttpByteBufFormatter.java | 4 ++-- .../migrations/replay/HttpMessageAndTimestamp.java | 2 +- .../migrations/replay/ParsedHttpMessagesAsDicts.java | 2 +- .../replay/util/{ByteBufUtils.java => NettyUtils.java} | 8 +++++--- .../migrations/replay/util/RefSafeStreamUtils.java | 6 ++++-- .../migrations/replay/RequestSenderOrchestratorTest.java | 5 +++-- 6 files changed, 16 insertions(+), 11 deletions(-) rename TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/{ByteBufUtils.java => NettyUtils.java} (57%) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java index 00237da4a..16439d48d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java @@ -21,7 +21,7 @@ import java.util.stream.Stream; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.opensearch.migrations.replay.util.ByteBufUtils; +import org.opensearch.migrations.replay.util.NettyUtils; import org.opensearch.migrations.replay.util.RefSafeHolder; @Slf4j @@ -70,7 +70,7 @@ public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteArrs, String lineDelimiter) { // This isn't memory efficient, // but stringifying byte bufs through a full parse and reserializing them was already really slow! - try (var stream = ByteBufUtils.createCloseableByteBufStream(byteArrs)) { + try (var stream = NettyUtils.createCloseableByteBufStream(byteArrs)) { return httpPacketBufsToString(msgType, stream, lineDelimiter); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java index 92f931571..d32b9d076 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java @@ -1,6 +1,6 @@ package org.opensearch.migrations.replay; -import static org.opensearch.migrations.replay.util.ByteBufUtils.createCloseableByteBufStream; +import static org.opensearch.migrations.replay.util.NettyUtils.createCloseableByteBufStream; import lombok.EqualsAndHashCode; import lombok.Getter; diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index 76360e089..a85e058c4 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -1,6 +1,6 @@ package org.opensearch.migrations.replay; -import static org.opensearch.migrations.replay.util.ByteBufUtils.createCloseableByteBufStream; +import static org.opensearch.migrations.replay.util.NettyUtils.createCloseableByteBufStream; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpHeaders; diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java similarity index 57% rename from TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java rename to TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java index b18995ebd..0597ea791 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ByteBufUtils.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java @@ -6,12 +6,14 @@ import java.util.stream.Stream; -public interface ByteBufUtils { - static Stream createCloseableByteBufStream(Stream byteArrStream) { +public final class NettyUtils { + public static Stream createCloseableByteBufStream(Stream byteArrStream) { return RefSafeStreamUtils.refSafeMap(byteArrStream, Unpooled::wrappedBuffer); } - static Stream createCloseableByteBufStream(Collection byteArrCollection) { + public static Stream createCloseableByteBufStream(Collection byteArrCollection) { return createCloseableByteBufStream(byteArrCollection.stream()); } + + private NettyUtils() {} } \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java index 43190a990..17dce21e5 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java @@ -6,8 +6,8 @@ import java.util.function.Function; import java.util.stream.Stream; -public interface RefSafeStreamUtils { - static Stream refSafeMap(Stream inputStream, +public final class RefSafeStreamUtils { + public static Stream refSafeMap(Stream inputStream, Function referenceTrackedMappingFunction) { final Deque refCountedTracker = new ArrayDeque<>(); return inputStream.map(t -> { @@ -16,4 +16,6 @@ static Stream refSafeMap(Stream inputStrea return resource; }).onClose(() -> refCountedTracker.forEach(ReferenceCounted::release)); } + + private RefSafeStreamUtils() {} } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index 912be16ae..8552af95a 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -1,5 +1,7 @@ package org.opensearch.migrations.replay; +import static org.opensearch.migrations.replay.util.NettyUtils.createCloseableByteBufStream; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.FullHttpResponse; @@ -16,7 +18,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; -import org.opensearch.migrations.replay.util.ByteBufUtils; import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture; import org.opensearch.migrations.replay.util.RefSafeHolder; import org.opensearch.migrations.testutils.SimpleHttpServer; @@ -68,7 +69,7 @@ public void testThatSchedulingWorks() throws Exception { Assertions.assertNull(arr.error); Assertions.assertTrue(arr.responseSizeInBytes > 0); var packetBytesArr = arr.responsePackets.stream().map(SimpleEntry::getValue).collect(Collectors.toList()); - try (var bufStream = ByteBufUtils.createCloseableByteBufStream(packetBytesArr); + try (var bufStream = createCloseableByteBufStream(packetBytesArr); var messageHolder = RefSafeHolder.create( HttpByteBufFormatter.parseHttpMessageFromBufs(HttpByteBufFormatter.HttpMessageType.RESPONSE, bufStream))) { From c0dafe3aba4ad85029b22769414e1401f9c912ac Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 19 Apr 2024 12:07:16 -0500 Subject: [PATCH 4/9] Convert RefSafeHolder get to return object instead of optional Signed-off-by: Andre Kurait --- .../migrations/replay/HttpByteBufFormatter.java | 5 ++--- .../migrations/replay/ParsedHttpMessagesAsDicts.java | 10 ++++------ .../migrations/replay/util/RefSafeHolder.java | 5 ++--- .../replay/RequestSenderOrchestratorTest.java | 5 +++-- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java index 16439d48d..e8eba96a4 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java @@ -95,9 +95,8 @@ public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteBufStream, boolean sortHeaders, String lineDelimiter) { try(var messageHolder = RefSafeHolder.create(parseHttpMessageFromBufs(msgType, byteBufStream))) { - final Optional httpMessageOp = messageHolder.get(); - if (httpMessageOp.isPresent()) { - var httpMessage = httpMessageOp.get(); + final HttpMessage httpMessage = messageHolder.get(); + if (httpMessage != null) { if (httpMessage instanceof FullHttpRequest) { return prettyPrintNettyRequest((FullHttpRequest) httpMessage, sortHeaders, lineDelimiter); } else if (httpMessage instanceof FullHttpResponse) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index a85e058c4..26143c0e4 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -134,9 +134,8 @@ private static Map convertRequest(@NonNull IReplayContexts.ITupl var map = new LinkedHashMap(); try (var bufStream = createCloseableByteBufStream(data); var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpRequestFromBufs(bufStream))) { - var messageOp = messageHolder.get(); - if (messageOp.isPresent()) { - var message = messageOp.get(); + var message = messageHolder.get(); + if (message != null) { map.put("Request-URI", message.uri()); map.put("Method", message.method().toString()); map.put("HTTP-Version", message.protocolVersion().toString()); @@ -157,9 +156,8 @@ private static Map convertResponse(@NonNull IReplayContexts.ITup var map = new LinkedHashMap(); try (var bufStream = createCloseableByteBufStream(data); var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpResponseFromBufs(bufStream))) { - var messageOp = messageHolder.get(); - if (messageOp.isPresent()) { - var message = messageOp.get(); + var message = messageHolder.get(); + if (message != null) { map.put("HTTP-Version", message.protocolVersion()); map.put(STATUS_CODE_KEY, message.status().code()); map.put("Reason-Phrase", message.status().reasonPhrase()); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java index c4356ad27..3e586e74a 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java @@ -1,7 +1,6 @@ package org.opensearch.migrations.replay.util; import io.netty.util.ReferenceCountUtil; -import java.util.Optional; import javax.annotation.Nullable; public class RefSafeHolder implements AutoCloseable { @@ -15,8 +14,8 @@ static public RefSafeHolder create(@Nullable T resource) { return new RefSafeHolder<>(resource); } - public Optional get() { - return Optional.ofNullable(resource); + public @Nullable T get() { + return resource; } @Override diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index 8552af95a..9ca6fc0e5 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -73,8 +73,9 @@ public void testThatSchedulingWorks() throws Exception { var messageHolder = RefSafeHolder.create( HttpByteBufFormatter.parseHttpMessageFromBufs(HttpByteBufFormatter.HttpMessageType.RESPONSE, bufStream))) { - Assertions.assertTrue(messageHolder.get().isPresent()); - var response = (FullHttpResponse) messageHolder.get().get(); + var message = messageHolder.get(); + Assertions.assertNotNull(message); + var response = (FullHttpResponse) message; Assertions.assertEquals(200, response.status().code()); var body = response.content(); Assertions.assertEquals(TestHttpServerContext.SERVER_RESPONSE_BODY_PREFIX + From 103c6a0e33499fd1ddb70de5bc1ced48f75f238a Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 19 Apr 2024 15:28:00 -0500 Subject: [PATCH 5/9] Add refSafeTransform to RefSafeStreamUtils Signed-off-by: Andre Kurait --- .../migrations/replay/util/RefSafeStreamUtils.java | 8 ++++++++ .../migrations/replay/HttpByteBufFormatterTest.java | 11 ++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java index 17dce21e5..0e0a0953d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java @@ -17,5 +17,13 @@ public static Stream refSafeMap(Stream inp }).onClose(() -> refCountedTracker.forEach(ReferenceCounted::release)); } + public static U refSafeTransform(Stream inputStream, + Function transformCreatingReferenceTrackedObjects, + Function, U> streamApplication) { + try (var mappedStream = refSafeMap(inputStream, transformCreatingReferenceTrackedObjects)) { + return streamApplication.apply(mappedStream); + } + } + private RefSafeStreamUtils() {} } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java index edfaba59f..2f9bf9731 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java @@ -1,5 +1,7 @@ package org.opensearch.migrations.replay; +import static org.opensearch.migrations.replay.util.RefSafeStreamUtils.refSafeTransform; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -14,7 +16,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; @WrapWithNettyLeakDetection @@ -145,10 +146,10 @@ private static String prettyPrint(List byteArrays, private static String prettyPrintByteBufs(List byteArrays, HttpByteBufFormatter.HttpMessageType messageType, boolean usePooled) { - var bbList = byteArrays.stream().map(b->TestUtilities.getByteBuf(b,usePooled)).collect(Collectors.toList()); - var formattedString = HttpByteBufFormatter.httpPacketBufsToString(messageType, bbList.stream()); - bbList.forEach(bb->bb.release()); - return formattedString; + return refSafeTransform(byteArrays.stream(), + b->TestUtilities.getByteBuf(b,usePooled), + bbs -> HttpByteBufFormatter.httpPacketBufsToString(messageType, bbs)); + } static String getExpectedResult(HttpByteBufFormatter.PacketPrintFormat format, BufferContent content) { From 06d04171bf664622e347b41c107bdbfbba082d3f Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 19 Apr 2024 14:34:09 -0500 Subject: [PATCH 6/9] Enable semantic checking for @MustBeClosed with RefCounted holders and streams Signed-off-by: Andre Kurait --- TrafficCapture/build.gradle | 16 ++++++++++++++++ .../migrations/replay/util/NettyUtils.java | 3 +++ .../migrations/replay/util/RefSafeHolder.java | 2 ++ .../replay/util/RefSafeStreamUtils.java | 2 ++ 4 files changed, 23 insertions(+) diff --git a/TrafficCapture/build.gradle b/TrafficCapture/build.gradle index 1caf48447..743308baa 100644 --- a/TrafficCapture/build.gradle +++ b/TrafficCapture/build.gradle @@ -12,6 +12,22 @@ allprojects { subprojects { apply plugin: 'java' apply plugin: 'maven-publish' + + // TODO: Expand to do more static checking in more projects + if (project.name == "trafficReplayer" || project.name == "trafficCaptureProxyServer") { + dependencies { + annotationProcessor group: 'com.google.errorprone', name: 'error_prone_core', version: '2.26.1' + } + tasks.named('compileJava', JavaCompile) { + if (project.name == "trafficReplayer" || project.name == "trafficCaptureProxyServer") { + options.compilerArgs += [ + "-XDcompilePolicy=simple", + "-Xplugin:ErrorProne -XepDisableAllChecks -Xep:MustBeClosed:ERROR -XepDisableWarningsInGeneratedCode", + ] + } + } + } + task javadocJar(type: Jar, dependsOn: javadoc) { archiveClassifier.set('javadoc') from javadoc.destinationDir diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java index 0597ea791..92bea1f31 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java @@ -1,5 +1,6 @@ package org.opensearch.migrations.replay.util; +import com.google.errorprone.annotations.MustBeClosed; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.util.Collection; @@ -7,10 +8,12 @@ import java.util.stream.Stream; public final class NettyUtils { + @MustBeClosed public static Stream createCloseableByteBufStream(Stream byteArrStream) { return RefSafeStreamUtils.refSafeMap(byteArrStream, Unpooled::wrappedBuffer); } + @MustBeClosed public static Stream createCloseableByteBufStream(Collection byteArrCollection) { return createCloseableByteBufStream(byteArrCollection.stream()); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java index 3e586e74a..af943bfc5 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java @@ -1,5 +1,6 @@ package org.opensearch.migrations.replay.util; +import com.google.errorprone.annotations.MustBeClosed; import io.netty.util.ReferenceCountUtil; import javax.annotation.Nullable; @@ -10,6 +11,7 @@ private RefSafeHolder(@Nullable T resource) { this.resource = resource; } + @MustBeClosed static public RefSafeHolder create(@Nullable T resource) { return new RefSafeHolder<>(resource); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java index 0e0a0953d..2254f3c38 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeStreamUtils.java @@ -1,5 +1,6 @@ package org.opensearch.migrations.replay.util; +import com.google.errorprone.annotations.MustBeClosed; import io.netty.util.ReferenceCounted; import java.util.ArrayDeque; import java.util.Deque; @@ -7,6 +8,7 @@ import java.util.stream.Stream; public final class RefSafeStreamUtils { + @MustBeClosed public static Stream refSafeMap(Stream inputStream, Function referenceTrackedMappingFunction) { final Deque refCountedTracker = new ArrayDeque<>(); From 1c728340f2829e77d253882dc6882a69575a3a90 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Tue, 23 Apr 2024 16:21:41 -0500 Subject: [PATCH 7/9] Rename to createRefCntNeutralCloseableByteBufStream and prefer non-static imports Signed-off-by: Andre Kurait --- .../opensearch/migrations/replay/HttpByteBufFormatter.java | 2 +- .../migrations/replay/HttpMessageAndTimestamp.java | 5 ++--- .../migrations/replay/ParsedHttpMessagesAsDicts.java | 7 +++---- .../org/opensearch/migrations/replay/util/NettyUtils.java | 6 +++--- .../migrations/replay/RequestSenderOrchestratorTest.java | 5 ++--- 5 files changed, 11 insertions(+), 14 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java index e8eba96a4..272ea38e6 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java @@ -70,7 +70,7 @@ public static String httpPacketBufsToString(HttpMessageType msgType, Stream byteArrs, String lineDelimiter) { // This isn't memory efficient, // but stringifying byte bufs through a full parse and reserializing them was already really slow! - try (var stream = NettyUtils.createCloseableByteBufStream(byteArrs)) { + try (var stream = NettyUtils.createRefCntNeutralCloseableByteBufStream(byteArrs)) { return httpPacketBufsToString(msgType, stream, lineDelimiter); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java index d32b9d076..0529b2b27 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java @@ -1,7 +1,5 @@ package org.opensearch.migrations.replay; -import static org.opensearch.migrations.replay.util.NettyUtils.createCloseableByteBufStream; - import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Lombok; @@ -14,6 +12,7 @@ import java.time.Instant; import java.util.Optional; import java.util.stream.Stream; +import org.opensearch.migrations.replay.util.NettyUtils; @Slf4j @EqualsAndHashCode(exclude = "currentSegmentBytes") @@ -66,7 +65,7 @@ public Stream stream() { } public String format(Optional messageTypeOp) { - try (var bufStream = createCloseableByteBufStream(packetBytes)) { + try (var bufStream = NettyUtils.createRefCntNeutralCloseableByteBufStream(packetBytes)) { var packetBytesAsStr = messageTypeOp.map(mt-> HttpByteBufFormatter.httpPacketBytesToString(mt, packetBytes, HttpByteBufFormatter.LF_LINE_DELIMITER)) .orElseGet(()-> HttpByteBufFormatter.httpPacketBufsToString(bufStream, Utils.MAX_PAYLOAD_SIZE_TO_PRINT)); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index 26143c0e4..66567a7dc 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -1,7 +1,5 @@ package org.opensearch.migrations.replay; -import static org.opensearch.migrations.replay.util.NettyUtils.createCloseableByteBufStream; - import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpHeaders; import java.time.Duration; @@ -16,6 +14,7 @@ import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.replay.datatypes.TransformedPackets; import org.opensearch.migrations.replay.tracing.IReplayContexts; +import org.opensearch.migrations.replay.util.NettyUtils; import org.opensearch.migrations.replay.util.RefSafeHolder; /** @@ -132,7 +131,7 @@ private static Map convertRequest(@NonNull IReplayContexts.ITupl @NonNull List data) { return makeSafeMap(context, () -> { var map = new LinkedHashMap(); - try (var bufStream = createCloseableByteBufStream(data); + try (var bufStream = NettyUtils.createRefCntNeutralCloseableByteBufStream(data); var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpRequestFromBufs(bufStream))) { var message = messageHolder.get(); if (message != null) { @@ -154,7 +153,7 @@ private static Map convertResponse(@NonNull IReplayContexts.ITup @NonNull List data, Duration latency) { return makeSafeMap(context, () -> { var map = new LinkedHashMap(); - try (var bufStream = createCloseableByteBufStream(data); + try (var bufStream = NettyUtils.createRefCntNeutralCloseableByteBufStream(data); var messageHolder = RefSafeHolder.create(HttpByteBufFormatter.parseHttpResponseFromBufs(bufStream))) { var message = messageHolder.get(); if (message != null) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java index 92bea1f31..60b97df6f 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/NettyUtils.java @@ -9,13 +9,13 @@ public final class NettyUtils { @MustBeClosed - public static Stream createCloseableByteBufStream(Stream byteArrStream) { + public static Stream createRefCntNeutralCloseableByteBufStream(Stream byteArrStream) { return RefSafeStreamUtils.refSafeMap(byteArrStream, Unpooled::wrappedBuffer); } @MustBeClosed - public static Stream createCloseableByteBufStream(Collection byteArrCollection) { - return createCloseableByteBufStream(byteArrCollection.stream()); + public static Stream createRefCntNeutralCloseableByteBufStream(Collection byteArrCollection) { + return createRefCntNeutralCloseableByteBufStream(byteArrCollection.stream()); } private NettyUtils() {} diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index 9ca6fc0e5..892155527 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -1,7 +1,5 @@ package org.opensearch.migrations.replay; -import static org.opensearch.migrations.replay.util.NettyUtils.createCloseableByteBufStream; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.FullHttpResponse; @@ -19,6 +17,7 @@ import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture; +import org.opensearch.migrations.replay.util.NettyUtils; import org.opensearch.migrations.replay.util.RefSafeHolder; import org.opensearch.migrations.testutils.SimpleHttpServer; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; @@ -69,7 +68,7 @@ public void testThatSchedulingWorks() throws Exception { Assertions.assertNull(arr.error); Assertions.assertTrue(arr.responseSizeInBytes > 0); var packetBytesArr = arr.responsePackets.stream().map(SimpleEntry::getValue).collect(Collectors.toList()); - try (var bufStream = createCloseableByteBufStream(packetBytesArr); + try (var bufStream = NettyUtils.createRefCntNeutralCloseableByteBufStream(packetBytesArr); var messageHolder = RefSafeHolder.create( HttpByteBufFormatter.parseHttpMessageFromBufs(HttpByteBufFormatter.HttpMessageType.RESPONSE, bufStream))) { From 0b81a2cc6b5af6116fadacbe528e2c46d019503c Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Tue, 23 Apr 2024 16:42:19 -0500 Subject: [PATCH 8/9] Add RefSafeStreamUtilsTest Signed-off-by: Andre Kurait --- .../replay/util/RefSafeStreamUtilsTest.java | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/RefSafeStreamUtilsTest.java diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/RefSafeStreamUtilsTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/RefSafeStreamUtilsTest.java new file mode 100644 index 000000000..2dd3ca729 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/util/RefSafeStreamUtilsTest.java @@ -0,0 +1,119 @@ +package org.opensearch.migrations.replay.util; + +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCounted; + +import java.util.function.Predicate; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; + +class RefSafeStreamUtilsTest { + + @Test + void refSafeMap_shouldMapAndReleaseResources() { + Stream inputStream = Stream.of("a", "b", "c"); + + List result; + try (Stream mappedStream = RefSafeStreamUtils.refSafeMap(inputStream, TestReferenceCounted::new)) { + result = mappedStream.collect(Collectors.toList()); + } + + assertEquals(3, result.size()); + assertTrue(result.stream().allMatch(TestReferenceCounted::isReleased)); + } + + @Test + void refSafeTransform_shouldTransformAndReleaseResources() { + Stream inputStream = Stream.of("a", "b", "c"); + + List refCountedObjects = new ArrayList<>(); + List result = RefSafeStreamUtils.refSafeTransform( + inputStream, + value -> { + TestReferenceCounted refCounted = new TestReferenceCounted(value); + refCountedObjects.add(refCounted); + return refCounted; + }, + stream -> stream.map(TestReferenceCounted::getValue).collect(Collectors.toList()) + ); + + assertEquals(List.of("a", "b", "c"), result); + assertTrue(refCountedObjects.stream().allMatch(TestReferenceCounted::isReleased)); + } + + @Test + void refSafeMap_shouldHandleExceptionDuringMapping() { + List inputStreamConsumedObjects = new ArrayList<>(); + Stream inputStream = Stream.of("a", "b", "c", "d", "e") + .peek(inputStreamConsumedObjects::add); + + List refCountedObjects = new ArrayList<>(); + assertThrows(RuntimeException.class, () -> { + try (Stream mappedStream = RefSafeStreamUtils.refSafeMap(inputStream, + value -> { + if (value.equals("d")) { + throw new RuntimeException("Simulated exception"); + } + TestReferenceCounted refCounted = new TestReferenceCounted(value); + refCountedObjects.add(refCounted); + return refCounted; + })) { + try { + mappedStream.collect(Collectors.toList()); + } finally { + // Expect no release until try-with-resources close + assertEquals(3, refCountedObjects.size()); + assertTrue(refCountedObjects.stream().allMatch(Predicate.not(TestReferenceCounted::isReleased))); + } + } + }); + assertEquals(4, inputStreamConsumedObjects.size()); + assertEquals(3, refCountedObjects.size()); + assertTrue(refCountedObjects.stream().allMatch(TestReferenceCounted::isReleased)); + } + + private static class TestReferenceCounted extends AbstractReferenceCounted { + private final String value; + private boolean released; + + TestReferenceCounted(String value) { + this.value = value; + } + + String getValue() { + return value; + } + + boolean isReleased() { + return released; + } + + @Override + public boolean release() { + if (released) { + throw new AssertionError("TestReferenceCounted object released twice"); + } + try { + return super.release(); + } finally { + released = true; + } + } + + @Override + protected void deallocate() { + // No-op + } + + @Override + public ReferenceCounted touch(Object hint) { + return this; + } + } +} \ No newline at end of file From ae807ff2ab6f68e5d3a9a07b94b76eeb6d4a0363 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Tue, 23 Apr 2024 16:50:58 -0500 Subject: [PATCH 9/9] Remove static import Signed-off-by: Andre Kurait --- .../migrations/replay/HttpByteBufFormatterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java index 2f9bf9731..822c9d8ee 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/HttpByteBufFormatterTest.java @@ -1,6 +1,5 @@ package org.opensearch.migrations.replay; -import static org.opensearch.migrations.replay.util.RefSafeStreamUtils.refSafeTransform; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -8,6 +7,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.opensearch.migrations.replay.util.RefSafeStreamUtils; import org.opensearch.migrations.testutils.CountingNettyResourceLeakDetector; import org.opensearch.migrations.testutils.TestUtilities; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; @@ -146,7 +146,7 @@ private static String prettyPrint(List byteArrays, private static String prettyPrintByteBufs(List byteArrays, HttpByteBufFormatter.HttpMessageType messageType, boolean usePooled) { - return refSafeTransform(byteArrays.stream(), + return RefSafeStreamUtils.refSafeTransform(byteArrays.stream(), b->TestUtilities.getByteBuf(b,usePooled), bbs -> HttpByteBufFormatter.httpPacketBufsToString(messageType, bbs));