Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Python data compression algorithm #172

Merged
merged 14 commits into from
Feb 5, 2021
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,27 @@ The script allows the user to specify variables, their units, plotting
style (stacked vs overlaid), as well as the format of the output image.
Use `-h` for more information.


### Data Compression

The `prmon_compress_output.py` script (Python3) can be used to compress the output file
while keeping the most relevant information.
The compression algorithm works as follows, for each separate series:
* For rapidly changing metrics: for any three points A, B, and C, if the linear
interpolation between A and C passes by B ± *threshold* * *range(metric)*, then B is deleted.
The percentage threshold is set by the `--precision` parameter.
* For more steady metrics: only keep the changepoints.

The time index of the final output will be the union of the algorithm outputs of the single
time series. Each series will have a NA value where a point was deleted at a kept index, unless
specified by the `--interpolate` parameter.

Example:
```sh
prmon_compress_output.py --input prmon.txt --precision 0.3 --interpolate
```


## Feedback and Contributions

We're very happy to get feedback on prmon as well as suggestions for future
Expand Down
159 changes: 159 additions & 0 deletions package/scripts/prmon_compress_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#! /usr/bin/env python3
"""prmon output smart compression script"""

import argparse
import os
import sys

try:
import pandas as pd
except ImportError:
print("{0: <8}:: This script needs pandas.".format("ERROR"))
sys.exit(-1)


CHANGING_METRICS = [
"vmem",
"pss",
"rss",
"swap",
"rchar",
"wchar",
"read_bytes",
"write_bytes",
"rx_packets",
"tx_packets",
"rx_bytes",
"tx_bytes",
"gpufbmem",
"gpumempct",
"gpusmpct",
]

STEADY_METRICS = ["nprocs", "nthreads", "ngpus"]


def interp_drop(p1, p2, p3, eps):
"""Computes interpolation and checks if middle point falls within threshold"""
t = p1[1] + (p3[1] - p1[1]) / (p3[0] - p1[0]) * (p2[0] - p1[0])
return abs(t - p2[1]) < eps


def reduce_changing_metric(df, metric, precision):
"""Iteratively compress metric"""
metric_series = df[metric]
metric_redux = metric_series.copy()
dyn_range = metric_series.max() - metric_series.min()
eps = dyn_range * precision
idx = 0
while True:
metriclen = len(metric_redux)
if idx == metriclen - 2:
break
p1 = (metric_redux.index[idx], metric_redux.iloc[idx])
p2 = (metric_redux.index[idx + 1], metric_redux.iloc[idx + 1])
p3 = (metric_redux.index[idx + 2], metric_redux.iloc[idx + 2])
if interp_drop(p1, p2, p3, eps):
metric_redux = metric_redux.drop(metric_redux.index[idx + 1])
else:
idx += 1
return metric_redux


def reduce_steady_metric(df, metric):
"""For more steady metrics just keep the changing points"""
metric = df[metric]
return metric[metric != metric.shift(1)]


def compress_prmon_output(df, precision, skip_interpolate):
"""Compress full df. Final index is the union of the compressed series indexes.
Points without values for a series are either linearly interpolated,
for fast-changing metrics, or forward-filled, for steady metrics"""
if len(df) > 2:
present_changing_metrics = [
metric for metric in CHANGING_METRICS if metric in df.columns
]
present_steady_metrics = [
metric for metric in STEADY_METRICS if metric in df.columns
]
reduced_changing_metrics = [
reduce_changing_metric(df, metric, precision)
for metric in present_changing_metrics
]
reduced_steady_metrics = [
reduce_steady_metric(df, metric) for metric in present_steady_metrics
]
final_df = pd.concat(reduced_changing_metrics + reduced_steady_metrics, axis=1)
if not skip_interpolate:
final_df[present_changing_metrics] = final_df[
present_changing_metrics
].interpolate(method="index")
final_df[present_steady_metrics] = final_df[present_steady_metrics].ffill(
downcast="infer"
)
final_df = final_df.round(0)
final_df = final_df.astype("Int64", errors="ignore")
return final_df
return df


def main():
"""Main compression function"""
parser = argparse.ArgumentParser(
description="Configurable smart compression script"
)

parser.add_argument(
"--input",
type=str,
default="prmon.txt",
help="PrMon TXT output that will be used as input",
)

parser.add_argument(
"--output",
type=str,
default="prmon_compressed.txt",
help="name of the output compressed text file",
)

parser.add_argument(
"--precision",
type=lambda x: float(x)
if 0 < float(x) < 1
else parser.exit(-1, "Precision must be strictly between 0 and 1"),
default=0.1,
help="precision value for interpolation threshold",
)

parser.add_argument(
"--skip-interpolate",
default=False,
action="store_true",
help="""Whether to skip interpolation of the final obtained df,
and leave NAs for the different metrics""",
)

parser.add_argument(
"--delete-original",
default=False,
action="store_true",
help="""Add this to delete the original, uncompressed
file""",
)

args = parser.parse_args()

df = pd.read_csv(
args.input, sep="\t", index_col="Time", engine="c", na_filter=False
)
compressed_df = compress_prmon_output(df, args.precision, args.skip_interpolate)
compressed_df["wtime"] = df[df.index.isin(compressed_df.index)]["wtime"]
compressed_df.to_csv(args.output, sep="\t")
if args.delete_original:
os.remove(args.input)


if "__main__" in __name__:
main()