Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add script to transform tuples to human readable format #274

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions TrafficCapture/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,45 @@ will send requests to `capture-proxy-domain.com`, using the auth combo `admin`/`

Support for Sigv4 signing and other auth options may be a future option.

#### Understanding Data from the Replayer

The Migration Console can be used to access and help interpret the data from the replayer.

The data generated from the replayer is stored on an Elastic File System volume shared between the Replayer and Migration Console.
It is mounted to the Migration Console at the path `/shared_replayer_output`. The Replayer generates files named `output_tuples.log`.
These files are rolled over as they hit 10 MB to a series of `output_tuples-%d{yyyy-MM-dd-HH:mm}.log` files.

The data in these files is in the format of JSON lines, each of which is a log message containing a specific request-response-response tuple.
The body of the messages is sometimes gzipped which makes it difficult to represent as text in a JSON. Therefore, the body field of all requests
and responses is base64 encoded before it is logged. This makes the files stable, but not human-readable.

We have provided a utility script that can parse these files and output them to a human-readable format: the bodies are
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rephrase to remove "We". In the future, "we" won't be relevant for any readers.

base64 decoded, un-gzipped if applicable, and parsed as JSON if applicable. They're then saved back to JSON format on disk.

To use this utility from the Migration Console,
```sh
$ ./humanReadableLogs.py --help
usage: humanReadableLogs.py [-h] [--outfile OUTFILE] infile

positional arguments:
infile Path to input logged tuple file.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this work on stdin/stdout? Customers will very likely be compressing old (or all) files. Expanding them first to files could be 100-1K times slower than piping them, not to mention much more difficult to manage the space for.


options:
-h, --help show this help message and exit
--outfile OUTFILE Path for output human readable tuple file.

# By default, the output file is the same path as the input file, but the file name is prefixed with `readable-`.
$ ./humanReadableLogs.py /shared_replayer_output/tuples.log
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: humanReadableLogs sounds like what I would make my output files from this program. I'd like to see some sort of verb - "makeLogsHumanReadable", "decodeResultJson", "prettyPrintReplayResults" (my preferred, since this isn't for logs, but effectively, for the primary output, right?)

Input file: /shared_replayer_output/tuples.log; Output file: /shared_replayer_output/readable-tuples.log

# A specific output file can also be specified.
$ ./humanReadableLogs.py /shared_replayer_output/tuples.log --outfile local-tuples.log
Input file: /shared_replayer_output/tuples.log; Output file: local-tuples.log
```

### Capture Kafka Offloader

The Capture Kafka Offloader will act as a Kafka Producer for offloading captured traffic logs to the configured Kafka cluster.

Learn more about its functionality and setup here: [Capture Kafka Offloader](captureKafkaOffloader/README.md)

Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ ENV DEBIAN_FRONTEND noninteractive

RUN apt-get update && \
apt-get install -y --no-install-recommends python3.9 python3-pip python3-dev gcc libc-dev git curl && \
pip3 install opensearch-benchmark
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying these versions resolves a versioning conflict that was happening in some installs (I think it was all new installs and my local just had a cached version that happened to work, but I'm not positive why it was mixed). The newest opensearch-benchmarks (1.1.0) has a dependency that does not work with the newest version of urllib3 (1.26.x) for this version of python, and pip wasn't detecting/resolving the issue itself. Locking to these versions fixes it.

pip3 install urllib3==1.25.11 opensearch-benchmark==1.1.0 tqdm

COPY runTestBenchmarks.sh /root/
RUN chmod ugo+x /root/runTestBenchmarks.sh
COPY humanReadableLogs.py /root/
Comment on lines 9 to +10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, these could be on one line. Same for the next pair. It shrinks the number of layers and makes it easier to read through docker reports.

RUN chmod ug+x /root/runTestBenchmarks.sh
RUN chmod ug+x /root/humanReadableLogs.py
WORKDIR /root

CMD tail -f /dev/null
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#!/usr/bin/env python3

import argparse
import base64
import gzip
import json
import pathlib
from typing import Optional
import logging

from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

logger = logging.getLogger(__name__)

LOG_JSON_TUPLE_FIELD = "message"
BASE64_ENCODED_TUPLE_PATHS = ["request.body", "primaryResponse.body", "shadowResponse.body"]
# TODO: I'm not positive about the capitalization of the Content-Encoding and Content-Type headers.
# This version worked on my test cases, but not guaranteed to work in all cases.
Comment on lines +18 to +19
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP spec says that header names are case-insensitive. The replayer (& netty's HTTP classes) make special accommodations to support that, so you'll need to be prepared for any crazy kind of outPUT.

CONTENT_ENCODING_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "request.content-encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.content-encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.content-encoding"
}
CONTENT_TYPE_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "request.content-type",
BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.content-type",
BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.content-type"
}
CONTENT_TYPE_JSON = "application/json"
CONTENT_ENCODING_GZIP = "gzip"
URI_PATH = "request.Request-URI"
BULK_URI_PATH = "_bulk"


class DictionaryPathException(Exception):
pass


def get_element(element: str, dict_: dict, raise_on_error=False) -> Optional[any]:
keys = element.split('.')
rv = dict_
for key in keys:
try:
rv = rv[key]
except KeyError:
if raise_on_error:
raise DictionaryPathException(f"Key {key} was not present.")
else:
return None
return rv


def set_element(element: str, dict_: dict, value: any) -> None:
keys = element.split('.')
rv = dict_
for key in keys[:-1]:
rv = rv[key]
rv[keys[-1]] = value


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("infile", type=pathlib.Path, help="Path to input logged tuple file.")
parser.add_argument("--outfile", type=pathlib.Path, help="Path for output human readable tuple file.")
return parser.parse_args()


def parse_body_value(raw_value: str, content_encoding: Optional[str],
content_type: Optional[str], is_bulk: bool, line_no: int):
try:
b64decoded = base64.b64decode(raw_value)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd call this something like encodedValue or originalValue. To me, the raw value means that there's been no encoding & clearly, that isn't the case here

except Exception as e:
logger.error(f"Body value on line {line_no} could not be decoded: {e}. Skipping parsing body value.")
return None
is_gzipped = content_encoding is not None and content_encoding == CONTENT_ENCODING_GZIP
is_json = content_type is not None and CONTENT_TYPE_JSON in content_type
if is_gzipped:
try:
unzipped = gzip.decompress(b64decoded)
except Exception as e:
logger.error(f"Body value on line {line_no} should be gzipped but could not be unzipped: {e}. "
"Skipping parsing body value.")
return b64decoded
else:
unzipped = b64decoded
try:
decoded = unzipped.decode("utf-8")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth calling out somewhere that this utility won't work w/ connections that are using different character encodings. This should use the value from the header (which, IIRC, can be a separate header, or included in the content type). I'm thinking that you only need to document this because we're not sure what the lifetime of this tool will be.

except Exception as e:
logger.error(f"Body value on line {line_no} could not be decoded to utf-8: {e}. "
"Skipping parsing body value.")
return unzipped
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you prepend what you're returning with how far you got (for all return statements)? It could be very confusing to a user to see binary - and then not know if it was improperly gunzipped, not decompressed at all, or if the charset was wrong, or if it was a PNG, etc.

if is_json and len(decoded) > 0:
if is_bulk:
try:
return [json.loads(line) for line in decoded.splitlines()]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect that bulk json has one json doc on each line? Is that how ES/OS work? That seems pretty awful.
Aren't newlines normally considered just whitespace within JSON?

except Exception as e:
logger.error("Body value on line {line_no} should be a bulk json (list of json lines) but "
f"could not be parsed: {e}. Skipping parsing body value.")
return decoded
try:
return json.loads(decoded)
except Exception as e:
logger.error(f"Body value on line {line_no} should be a json but could not be parsed: {e}. "
"Skipping parsing body value.")
return decoded
return decoded


def parse_tuple(line: str, line_no: int) -> dict:
item = json.loads(line)
message = item[LOG_JSON_TUPLE_FIELD]
tuple = json.loads(message)
try:
is_bulk_path = BULK_URI_PATH in get_element(URI_PATH, tuple, raise_on_error=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for "_bulk" to be in another part of a path and this not be a bulk request? Are query params included in the path?

except DictionaryPathException as e:
logger.error(f"`{URI_PATH}` on line {line_no} could not be loaded: {e} "
f"Skipping parsing tuple.")
return tuple
for body_path in BASE64_ENCODED_TUPLE_PATHS:
base64value = get_element(body_path, tuple)
if base64value is None:
# This component has no body element, which is potentially valid.
continue
content_encoding = get_element(CONTENT_ENCODING_PATH[body_path], tuple)
content_type = get_element(CONTENT_TYPE_PATH[body_path], tuple)
value = parse_body_value(base64value, content_encoding, content_type, is_bulk_path, line_no)
if value:
set_element(body_path, tuple, value)
return tuple


if __name__ == "__main__":
args = parse_args()
if args.outfile:
outfile = args.outfile
else:
outfile = args.infile.parent / f"readable-{args.infile.name}"
print(f"Input file: {args.infile}; Output file: {outfile}")

logging.basicConfig(level=logging.INFO)
with logging_redirect_tqdm():
Comment on lines +141 to +142
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do these lines do?

with open(args.infile, 'r') as in_f:
with open(outfile, 'w') as out_f:
for i, line in tqdm(enumerate(in_f)):
print(json.dumps(parse_tuple(line, i + 1)), file=out_f)