diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java index 75c85f19b..9b8abb459 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java @@ -35,6 +35,7 @@ public class PayloadAccessFaultingMap extends AbstractMap { private boolean payloadWasAccessed; public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) { + disableThrowingPayloadNotLoaded = true; underlyingMap = new TreeMap<>(); isJson = Optional.ofNullable(headers.get("content-type")) .map(list -> list.stream().anyMatch(s -> s.startsWith("application/json"))) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryTransformHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryTransformHandler.java index f1b4b5b58..89acd028f 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryTransformHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryTransformHandler.java @@ -67,7 +67,9 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) httpJsonMessage ); HttpJsonRequestWithFaultingPayload transformedMessage = null; + final var payloadMap = (PayloadAccessFaultingMap) httpJsonMessage.payload(); try { + payloadMap.setDisableThrowingPayloadNotLoaded(false); transformedMessage = transform(transformer, httpJsonMessage); } catch (Exception e) { var payload = (PayloadAccessFaultingMap) httpJsonMessage.payload(); @@ -85,6 +87,8 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) } else{ throw new TransformationException(e); } + } finally { + payloadMap.setDisableThrowingPayloadNotLoaded(true); } if (transformedMessage != null) { @@ -139,22 +143,28 @@ private void handlePayloadNeutralTransformationOrThrow( var pipeline = ctx.pipeline(); if (streamingAuthTransformer != null) { - log.atInfo().setMessage("{} An Authorization Transformation is required for this message. " - + "The headers and payload will be parsed and reformatted.") - .addArgument(diagnosticLabel).log(); + log.info( + diagnosticLabel + + "An Authorization Transformation is required for this message. " + + "The headers and payload will be parsed and reformatted." + ); requestPipelineOrchestrator.addContentRepackingHandlers(ctx, streamingAuthTransformer); ctx.fireChannelRead(httpJsonMessage); } else if (headerFieldsAreIdentical(originalRequest, httpJsonMessage)) { - log.atInfo().setMessage("{} Transformation isn't necessary. " - + "Resetting the processing pipeline to let the caller send the original network bytes as-is.") - .addArgument(diagnosticLabel) - .log(); + log.info( + diagnosticLabel + + "Transformation isn't necessary. " + + "Resetting the processing pipeline to let the caller send the original network bytes as-is." + ); RequestPipelineOrchestrator.removeAllHandlers(pipeline); + } else if (headerFieldIsIdentical("content-encoding", originalRequest, httpJsonMessage) && headerFieldIsIdentical("transfer-encoding", originalRequest, httpJsonMessage)) { - log.atInfo().setMessage("{} There were changes to the headers that require the message to be reformatted " - + "but the payload doesn't need to be transformed.") - .addArgument(diagnosticLabel).log(); + log.info( + diagnosticLabel + + "There were changes to the headers that require the message to be reformatted " + + "but the payload doesn't need to be transformed." + ); // By adding the baseline handlers and removing this and previous handlers in reverse order, // we will cause the upstream handlers to flush their in-progress accumulated ByteBufs downstream // to be processed accordingly @@ -162,9 +172,11 @@ && headerFieldIsIdentical("transfer-encoding", originalRequest, httpJsonMessage) ctx.fireChannelRead(httpJsonMessage); RequestPipelineOrchestrator.removeThisAndPreviousHandlers(pipeline, this); } else { - log.atInfo().setMessage("{} New headers have been specified that require the payload stream to be " - + "reformatted. Setting up the processing pipeline to parse and reformat the request payload.") - .addArgument(diagnosticLabel).log(); + log.info( + diagnosticLabel + + "New headers have been specified that require the payload stream to be " + + "reformatted. Setting up the processing pipeline to parse and reformat the request payload." + ); requestPipelineOrchestrator.addContentRepackingHandlers(ctx, streamingAuthTransformer); ctx.fireChannelRead(httpJsonMessage); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java index 754ac2a4e..35ed26b68 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.function.Predicate; import org.opensearch.migrations.replay.datahandlers.JsonAccumulator; import org.opensearch.migrations.replay.tracing.IReplayContexts; @@ -24,6 +25,7 @@ import io.netty.util.ReferenceCountUtil; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.slf4j.event.Level; /** * This accumulates HttpContent messages through a JsonAccumulator and eventually fires off a @@ -93,7 +95,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } } catch (JacksonException e) { - log.atInfo().setCause(e).setMessage("Error parsing json body. " + + log.atLevel(hasRequestContentTypeMatching(capturedHttpJsonMessage, + // a JacksonException for non-json data doesn't need to be surfaced to a user + v -> v.startsWith("application/json")) ? Level.INFO : Level.TRACE) + .setCause(e).setMessage("Error parsing json body. " + "Will pass all payload bytes directly as a ByteBuf within the payload map").log(); jsonWasInvalid = true; parsedJsonObjects.clear(); @@ -123,7 +128,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception var leftoverBody = accumulatedBody.slice(jsonBodyByteLength, accumulatedBody.readableBytes() - jsonBodyByteLength); - if (jsonBodyByteLength == 0 && isRequestContentTypeNotText(capturedHttpJsonMessage)) { + if (jsonBodyByteLength == 0 && + hasRequestContentTypeMatching(capturedHttpJsonMessage, v -> !v.startsWith("text/"))) + { context.onPayloadSetBinary(); capturedHttpJsonMessage.payload() .put(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY, @@ -157,12 +164,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } - private boolean isRequestContentTypeNotText(HttpJsonMessageWithFaultingPayload message) { + private boolean hasRequestContentTypeMatching(HttpJsonMessageWithFaultingPayload message, + Predicate contentTypeFilter) { // ContentType not text if specified and has a value with / and that value does not start with text/ return Optional.ofNullable(capturedHttpJsonMessage.headers().insensitiveGet(HttpHeaderNames.CONTENT_TYPE.toString())) .map(s -> s.stream() .filter(v -> v.contains("/")) - .filter(v -> !v.startsWith("text/")) + .filter(contentTypeFilter) .count() > 1 ) .orElse(false); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/PayloadNotFoundTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/PayloadNotFoundTest.java index 0308c9b8b..f2e1e7467 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/PayloadNotFoundTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/PayloadNotFoundTest.java @@ -19,7 +19,9 @@ public void testTransformsPropagateExceptionProperly() throws JsonProcessingExce FAULTING_MAP.setPath("/_bulk"); FAULTING_MAP.setHeaders(new ListKeyAdaptingCaseInsensitiveHeadersMap(new StrictCaseInsensitiveHttpHeadersMap())); FAULTING_MAP.headers().put("Content-Type", "application/json"); - FAULTING_MAP.setPayloadFaultMap(new PayloadAccessFaultingMap(FAULTING_MAP.headers().asStrictMap())); + var payloadMap = new PayloadAccessFaultingMap(FAULTING_MAP.headers().asStrictMap()); + FAULTING_MAP.setPayloadFaultMap(payloadMap); + payloadMap.setDisableThrowingPayloadNotLoaded(false); final String EXPECTED = "{\n" + " \"method\": \"PUT\",\n" + " \"URI\": \"/_bulk\",\n"