diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/humanReadableLogs.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/humanReadableLogs.py index c90be11b5..c6427d018 100755 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/humanReadableLogs.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/humanReadableLogs.py @@ -14,29 +14,32 @@ logger = logging.getLogger(__name__) LOG_JSON_TUPLE_FIELD = "message" -BASE64_ENCODED_TUPLE_PATHS = ["request.body", "primaryResponse.body", "shadowResponse.body"] +BASE64_ENCODED_TUPLE_PATHS = ["sourceRequest.body", "targetRequest.body", "sourceResponse.body", "targetResponse.body"] # TODO: I'm not positive about the capitalization of the Content-Encoding and Content-Type headers. # This version worked on my test cases, but not guaranteed to work in all cases. CONTENT_ENCODING_PATH = { - BASE64_ENCODED_TUPLE_PATHS[0]: "request.Content-Encoding", - BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.Content-Encoding", - BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.Content-Encoding" + BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Content-Encoding", + BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding", + BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Encoding", + BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Content-Encoding" } CONTENT_TYPE_PATH = { - BASE64_ENCODED_TUPLE_PATHS[0]: "request.Content-Type", - BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.Content-Type", - BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.Content-Type" + BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Content-Type", + BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding", + BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Type", + BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Content-Type" } TRANSFER_ENCODING_PATH = { - BASE64_ENCODED_TUPLE_PATHS[0]: "request.Transfer-Encoding", - BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.Transfer-Encoding", - BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.Transfer-Encoding" + BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Transfer-Encoding", + BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding", + BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Transfer-Encoding", + BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Transfer-Encoding" } CONTENT_TYPE_JSON = "application/json" CONTENT_ENCODING_GZIP = "gzip" TRANSFER_ENCODING_CHUNKED = "chunked" -URI_PATH = "request.Request-URI" +URI_PATH = "sourceRequest.Request-URI" BULK_URI_PATH = "_bulk" diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java index 849c9e2e0..7baa15b2e 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java @@ -19,24 +19,24 @@ @Slf4j public class SourceTargetCaptureTuple { private RequestResponsePacketPair sourcePair; - private final List shadowRequestData; - private final List shadowResponseData; + private final List targetRequestData; + private final List targetResponseData; private final AggregatedTransformedResponse.HttpRequestTransformationStatus transformationStatus; private final Throwable errorCause; - Duration shadowResponseDuration; + Duration targetResponseDuration; public SourceTargetCaptureTuple(RequestResponsePacketPair sourcePair, - List shadowRequestData, - List shadowResponseData, + List targetRequestData, + List targetResponseData, AggregatedTransformedResponse.HttpRequestTransformationStatus transformationStatus, Throwable errorCause, - Duration shadowResponseDuration) { + Duration targetResponseDuration) { this.sourcePair = sourcePair; - this.shadowRequestData = shadowRequestData; - this.shadowResponseData = shadowResponseData; + this.targetRequestData = targetRequestData; + this.targetResponseData = targetResponseData; this.transformationStatus = transformationStatus; this.errorCause = errorCause; - this.shadowResponseDuration = shadowResponseDuration; + this.targetResponseDuration = targetResponseDuration; } public static class TupleToFileWriter { @@ -48,7 +48,6 @@ public TupleToFileWriter(OutputStream outputStream){ } private JSONObject jsonFromHttpDataUnsafe(List data) throws IOException { - SequenceInputStream collatedStream = ReplayUtils.byteArraysToInputStream(data); Scanner scanner = new Scanner(collatedStream, StandardCharsets.UTF_8); scanner.useDelimiter("\r\n\r\n"); // The headers are seperated from the body with two newlines. @@ -88,23 +87,26 @@ private JSONObject jsonFromHttpData(List data, Duration latency) throws private JSONObject toJSONObject(SourceTargetCaptureTuple triple) throws IOException { JSONObject meta = new JSONObject(); - meta.put("request", jsonFromHttpData(triple.sourcePair.requestData.packetBytes)); + meta.put("sourceRequest", jsonFromHttpData(triple.sourcePair.requestData.packetBytes)); + meta.put("targetRequest", jsonFromHttpData(triple.targetRequestData)); //log.warn("TODO: These durations are not measuring the same values!"); - meta.put("primaryResponse", jsonFromHttpData(triple.sourcePair.responseData.packetBytes, + meta.put("sourceResponse", jsonFromHttpData(triple.sourcePair.responseData.packetBytes, Duration.between(triple.sourcePair.requestData.getLastPacketTimestamp(), triple.sourcePair.responseData.getLastPacketTimestamp()))); - meta.put("shadowResponse", jsonFromHttpData(triple.shadowResponseData, - triple.shadowResponseDuration)); + meta.put("targetResponse", jsonFromHttpData(triple.targetResponseData, + triple.targetResponseDuration)); + meta.put("connectionId", triple.sourcePair.connectionId); return meta; } /** - * Writes a "triple" object to an output stream as a JSON object. - * The JSON triple is output on one line, and has three objects: "request", "primaryResponse", - * and "shadowResponse". An example of the format is below. + * Writes a tuple object to an output stream as a JSON object. + * The JSON tuple is output on one line, and has several objects: "sourceRequest", "sourceResponse", + * "targetRequest", and "targetResponse". The "connectionId" is also included to aid in debugging. + * An example of the format is below. *

* { - * "request": { + * "sourceRequest": { * "Request-URI": XYZ, * "Method": XYZ, * "HTTP-Version": XYZ @@ -112,7 +114,15 @@ private JSONObject toJSONObject(SourceTargetCaptureTuple triple) throws IOExcept * "header-1": XYZ, * "header-2": XYZ * }, - * "primaryResponse": { + * "targetRequest": { + * "Request-URI": XYZ, + * "Method": XYZ, + * "HTTP-Version": XYZ + * "body": XYZ, + * "header-1": XYZ, + * "header-2": XYZ + * }, + * "sourceResponse": { * "HTTP-Version": ABC, * "Status-Code": ABC, * "Reason-Phrase": ABC, @@ -120,14 +130,15 @@ private JSONObject toJSONObject(SourceTargetCaptureTuple triple) throws IOExcept * "body": ABC, * "header-1": ABC * }, - * "shadowResponse": { + * "targetResponse": { * "HTTP-Version": ABC, * "Status-Code": ABC, * "Reason-Phrase": ABC, * "response_time_ms": 123, * "body": ABC, * "header-2": ABC - * } + * }, + * "connectionId": "0242acfffe1d0008-0000000c-00000003-0745a19f7c3c5fc9-121001ff.0" * } * * @param triple the RequestResponseResponseTriple object to be converted into json and written to the stream. @@ -146,9 +157,9 @@ public String toString() { final StringBuilder sb = new StringBuilder("SourceTargetCaptureTuple{"); sb.append("\n diagnosticLabel=").append(sourcePair.connectionId); sb.append("\n sourcePair=").append(sourcePair); - sb.append("\n shadowResponseDuration=").append(shadowResponseDuration); - sb.append("\n shadowRequestData=").append(Utils.packetsToStringTruncated(shadowRequestData)); - sb.append("\n shadowResponseData=").append(Utils.packetsToStringTruncated(shadowResponseData)); + sb.append("\n targetResponseDuration=").append(targetResponseDuration); + sb.append("\n targetRequestData=").append(Utils.packetsToStringTruncated(targetRequestData)); + sb.append("\n targetResponseData=").append(Utils.packetsToStringTruncated(targetResponseData)); sb.append("\n transformStatus=").append(transformationStatus); sb.append("\n errorCause=").append(errorCause==null ? "null" : errorCause.toString()); sb.append('}'); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index 9781c71c6..363fc0305 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -488,7 +488,7 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture "GET / HTTP/1.1\n" + - "HoSt: " + SOURCE_CLUSTER_NAME + "\n" + - "content-length: " + contentLength + "\n"); + contentLength -> "GET / HTTP/1.1\r\n" + + "HoSt: " + SOURCE_CLUSTER_NAME + "\r\n" + + "content-length: " + contentLength + "\r\n"); } private void runRandomPayloadWithTransformer(HttpJsonTransformingConsumer transformingHandler, @@ -88,11 +88,11 @@ public void testMalformedPayloadIsPassedThrough() throws Exception { httpBasicAuthTransformer, testPacketCapture, "TEST"); runRandomPayloadWithTransformer(transformingHandler, dummyAggregatedResponse, testPacketCapture, - contentLength -> "GET / HTTP/1.1\n" + - "HoSt: " + SOURCE_CLUSTER_NAME + "\n" + - "content-type: application/json\n" + - "content-length: " + contentLength + "\n" + - "authorization: Basic YWRtaW46YWRtaW4=\n"); + contentLength -> "GET / HTTP/1.1\r\n" + + "HoSt: " + SOURCE_CLUSTER_NAME + "\r\n" + + "content-type: application/json\r\n" + + "content-length: " + contentLength + "\r\n" + + "authorization: Basic YWRtaW46YWRtaW4=\r\n"); } /** @@ -119,10 +119,10 @@ public void testMalformedPayload_andTypeMappingUri_IsPassedThrough() throws Exce TestUtils.chainedDualWriteHeaderAndPayloadParts(transformingHandler, stringParts, referenceStringBuilder, - contentLength -> "PUT /foo HTTP/1.1\n" + - "HoSt: " + SOURCE_CLUSTER_NAME + "\n" + - "content-type: application/json\n" + - "content-length: " + contentLength + "\n" + contentLength -> "PUT /foo HTTP/1.1\r\n" + + "HoSt: " + SOURCE_CLUSTER_NAME + "\r\n" + + "content-type: application/json\r\n" + + "content-length: " + contentLength + "\r\n" ); var finalizationFuture = allConsumesFuture.thenCompose(v->transformingHandler.finalizeRequest(), diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TestUtils.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TestUtils.java index 42692ab75..6b29859ca 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TestUtils.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TestUtils.java @@ -78,7 +78,7 @@ static DiagnosticTrackableCompletableFuture chainedWriteHeadersAndD StringBuilder referenceStringAccumulator, Function headersGenerator) { var contentLength = stringParts.stream().mapToInt(s->s.length()).sum(); - String headers = headersGenerator.apply(contentLength) + "\n"; + String headers = headersGenerator.apply(contentLength) + "\r\n"; referenceStringAccumulator.append(headers); return chainedWriteHeadersAndDualWritePayloadParts(packetConsumer, stringParts, referenceStringAccumulator, headers); } @@ -134,10 +134,10 @@ static void runPipelineAndValidate(IJsonTransformer transformer, "TEST"); var contentLength = stringParts.stream().mapToInt(s->s.length()).sum(); - var headerString = "GET / HTTP/1.1\n" + - "host: localhost\n" + + var headerString = "GET / HTTP/1.1\r\n" + + "host: localhost\r\n" + (extraHeaders == null ? "" : extraHeaders) + - "content-length: " + contentLength + "\n\n"; + "content-length: " + contentLength + "\r\n\r\n"; var referenceStringBuilder = new StringBuilder(); var allConsumesFuture = chainedWriteHeadersAndDualWritePayloadParts(transformingHandler, stringParts, referenceStringBuilder, headerString);