diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java index ff8c3f665..57abfe43c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java @@ -34,6 +34,7 @@ public class PayloadAccessFaultingMap extends AbstractMap { private boolean disableThrowingPayloadNotLoaded; public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) { + disableThrowingPayloadNotLoaded = true; underlyingMap = new TreeMap<>(); isJson = Optional.ofNullable(headers.get("content-type")) .map(list -> list.stream().anyMatch(s -> s.startsWith("application/json"))) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryTransformHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryTransformHandler.java index 3d74c42ca..3992d34a3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryTransformHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryTransformHandler.java @@ -67,7 +67,9 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) IAuthTransformer authTransformer = requestPipelineOrchestrator.authTransfomerFactory.getAuthTransformer( httpJsonMessage ); + final var payloadMap = (PayloadAccessFaultingMap) httpJsonMessage.payload(); try { + payloadMap.setDisableThrowingPayloadNotLoaded(false); handlePayloadNeutralTransformationOrThrow( ctx, originalHttpJsonMessage, @@ -86,6 +88,8 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) getAuthTransformerAsStreamingTransformer(authTransformer) ); ctx.fireChannelRead(handleAuthHeaders(httpJsonMessage, authTransformer)); + } finally { + payloadMap.setDisableThrowingPayloadNotLoaded(true); } } else if (msg instanceof HttpContent) { ctx.fireChannelRead(msg); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java index 754ac2a4e..89f8ebb49 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java @@ -8,6 +8,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; import org.opensearch.migrations.replay.datahandlers.JsonAccumulator; import org.opensearch.migrations.replay.tracing.IReplayContexts; @@ -24,6 +27,7 @@ import io.netty.util.ReferenceCountUtil; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.slf4j.event.Level; /** * This accumulates HttpContent messages through a JsonAccumulator and eventually fires off a @@ -93,7 +97,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } } catch (JacksonException e) { - log.atInfo().setCause(e).setMessage("Error parsing json body. " + + log.atLevel(hasRequestContentTypeMatching(capturedHttpJsonMessage, v -> v.startsWith("application/json")) + ? Level.INFO : Level.TRACE) + .setCause(e).setMessage("Error parsing json body. " + "Will pass all payload bytes directly as a ByteBuf within the payload map").log(); jsonWasInvalid = true; parsedJsonObjects.clear(); @@ -123,7 +129,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception var leftoverBody = accumulatedBody.slice(jsonBodyByteLength, accumulatedBody.readableBytes() - jsonBodyByteLength); - if (jsonBodyByteLength == 0 && isRequestContentTypeNotText(capturedHttpJsonMessage)) { + if (jsonBodyByteLength == 0 && + hasRequestContentTypeMatching(capturedHttpJsonMessage, v -> !v.startsWith("text/"))) + { context.onPayloadSetBinary(); capturedHttpJsonMessage.payload() .put(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY, @@ -157,12 +165,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } - private boolean isRequestContentTypeNotText(HttpJsonMessageWithFaultingPayload message) { + private boolean hasRequestContentTypeMatching(HttpJsonMessageWithFaultingPayload message, + Predicate cnotentTypeFilter) { // ContentType not text if specified and has a value with / and that value does not start with text/ return Optional.ofNullable(capturedHttpJsonMessage.headers().insensitiveGet(HttpHeaderNames.CONTENT_TYPE.toString())) .map(s -> s.stream() .filter(v -> v.contains("/")) - .filter(v -> !v.startsWith("text/")) + .filter(cnotentTypeFilter) .count() > 1 ) .orElse(false);