Skip to content

Commit

Permalink
Merge branch 'SaferPayloadFaults' into TypeMappingsTransformation
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Schohn <[email protected]>

# Conflicts:
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyDecodedHttpRequestPreliminaryTransformHandler.java
  • Loading branch information
gregschohn committed Dec 9, 2024
2 parents 39ead16 + 09377de commit 1df1642
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class PayloadAccessFaultingMap extends AbstractMap<String, Object> {
private boolean payloadWasAccessed;

public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) {
disableThrowingPayloadNotLoaded = true;
underlyingMap = new TreeMap<>();
isJson = Optional.ofNullable(headers.get("content-type"))
.map(list -> list.stream().anyMatch(s -> s.startsWith("application/json")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
httpJsonMessage
);
HttpJsonRequestWithFaultingPayload transformedMessage = null;
final var payloadMap = (PayloadAccessFaultingMap) httpJsonMessage.payload();
try {
payloadMap.setDisableThrowingPayloadNotLoaded(false);
transformedMessage = transform(transformer, httpJsonMessage);
} catch (Exception e) {
var payload = (PayloadAccessFaultingMap) httpJsonMessage.payload();
Expand All @@ -85,6 +87,8 @@ public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg)
} else{
throw new TransformationException(e);
}
} finally {
payloadMap.setDisableThrowingPayloadNotLoaded(true);
}

if (transformedMessage != null) {
Expand Down Expand Up @@ -139,32 +143,40 @@ private void handlePayloadNeutralTransformationOrThrow(

var pipeline = ctx.pipeline();
if (streamingAuthTransformer != null) {
log.atInfo().setMessage("{} An Authorization Transformation is required for this message. "
+ "The headers and payload will be parsed and reformatted.")
.addArgument(diagnosticLabel).log();
log.info(
diagnosticLabel
+ "An Authorization Transformation is required for this message. "
+ "The headers and payload will be parsed and reformatted."
);
requestPipelineOrchestrator.addContentRepackingHandlers(ctx, streamingAuthTransformer);
ctx.fireChannelRead(httpJsonMessage);
} else if (headerFieldsAreIdentical(originalRequest, httpJsonMessage)) {
log.atInfo().setMessage("{} Transformation isn't necessary. "
+ "Resetting the processing pipeline to let the caller send the original network bytes as-is.")
.addArgument(diagnosticLabel)
.log();
log.info(
diagnosticLabel
+ "Transformation isn't necessary. "
+ "Resetting the processing pipeline to let the caller send the original network bytes as-is."
);
RequestPipelineOrchestrator.removeAllHandlers(pipeline);

} else if (headerFieldIsIdentical("content-encoding", originalRequest, httpJsonMessage)
&& headerFieldIsIdentical("transfer-encoding", originalRequest, httpJsonMessage)) {
log.atInfo().setMessage("{} There were changes to the headers that require the message to be reformatted "
+ "but the payload doesn't need to be transformed.")
.addArgument(diagnosticLabel).log();
log.info(
diagnosticLabel
+ "There were changes to the headers that require the message to be reformatted "
+ "but the payload doesn't need to be transformed."
);
// By adding the baseline handlers and removing this and previous handlers in reverse order,
// we will cause the upstream handlers to flush their in-progress accumulated ByteBufs downstream
// to be processed accordingly
requestPipelineOrchestrator.addBaselineHandlers(pipeline);
ctx.fireChannelRead(httpJsonMessage);
RequestPipelineOrchestrator.removeThisAndPreviousHandlers(pipeline, this);
} else {
log.atInfo().setMessage("{} New headers have been specified that require the payload stream to be "
+ "reformatted. Setting up the processing pipeline to parse and reformat the request payload.")
.addArgument(diagnosticLabel).log();
log.info(
diagnosticLabel
+ "New headers have been specified that require the payload stream to be "
+ "reformatted. Setting up the processing pipeline to parse and reformat the request payload."
);
requestPipelineOrchestrator.addContentRepackingHandlers(ctx, streamingAuthTransformer);
ctx.fireChannelRead(httpJsonMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;

import org.opensearch.migrations.replay.datahandlers.JsonAccumulator;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
Expand All @@ -24,6 +25,7 @@
import io.netty.util.ReferenceCountUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.event.Level;

/**
* This accumulates HttpContent messages through a JsonAccumulator and eventually fires off a
Expand Down Expand Up @@ -93,7 +95,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}
} catch (JacksonException e) {
log.atInfo().setCause(e).setMessage("Error parsing json body. " +
log.atLevel(hasRequestContentTypeMatching(capturedHttpJsonMessage,
// a JacksonException for non-json data doesn't need to be surfaced to a user
v -> v.startsWith("application/json")) ? Level.INFO : Level.TRACE)
.setCause(e).setMessage("Error parsing json body. " +
"Will pass all payload bytes directly as a ByteBuf within the payload map").log();
jsonWasInvalid = true;
parsedJsonObjects.clear();
Expand Down Expand Up @@ -123,7 +128,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

var leftoverBody = accumulatedBody.slice(jsonBodyByteLength,
accumulatedBody.readableBytes() - jsonBodyByteLength);
if (jsonBodyByteLength == 0 && isRequestContentTypeNotText(capturedHttpJsonMessage)) {
if (jsonBodyByteLength == 0 &&
hasRequestContentTypeMatching(capturedHttpJsonMessage, v -> !v.startsWith("text/")))
{
context.onPayloadSetBinary();
capturedHttpJsonMessage.payload()
.put(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY,
Expand Down Expand Up @@ -157,12 +164,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}

private boolean isRequestContentTypeNotText(HttpJsonMessageWithFaultingPayload message) {
private boolean hasRequestContentTypeMatching(HttpJsonMessageWithFaultingPayload message,
Predicate<String> contentTypeFilter) {
// ContentType not text if specified and has a value with / and that value does not start with text/
return Optional.ofNullable(capturedHttpJsonMessage.headers().insensitiveGet(HttpHeaderNames.CONTENT_TYPE.toString()))
.map(s -> s.stream()
.filter(v -> v.contains("/"))
.filter(v -> !v.startsWith("text/"))
.filter(contentTypeFilter)
.count() > 1
)
.orElse(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public void testTransformsPropagateExceptionProperly() throws JsonProcessingExce
FAULTING_MAP.setPath("/_bulk");
FAULTING_MAP.setHeaders(new ListKeyAdaptingCaseInsensitiveHeadersMap(new StrictCaseInsensitiveHttpHeadersMap()));
FAULTING_MAP.headers().put("Content-Type", "application/json");
FAULTING_MAP.setPayloadFaultMap(new PayloadAccessFaultingMap(FAULTING_MAP.headers().asStrictMap()));
var payloadMap = new PayloadAccessFaultingMap(FAULTING_MAP.headers().asStrictMap());
FAULTING_MAP.setPayloadFaultMap(payloadMap);
payloadMap.setDisableThrowingPayloadNotLoaded(false);
final String EXPECTED = "{\n"
+ " \"method\": \"PUT\",\n"
+ " \"URI\": \"/_bulk\",\n"
Expand Down

0 comments on commit 1df1642

Please sign in to comment.