Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sumarize sort by #256

Merged
merged 1 commit into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 49 additions & 30 deletions abm/lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,37 +192,41 @@ def find_executable(name):
# "swaptotal",
# "uname"

table_header = [
"id",
"history_id",
"history_name",
"state",
"tool_id",
"invocation_id",
"workflow_id",
"cpuacct.usage",
# "end_epoch",
"galaxy_memory_mb",
"galaxy_slots",
# "memory.failcnt",
"memory.limit_in_bytes",
"memory.max_usage_in_bytes",
# "memory.memsw.limit_in_bytes",
# "memory.memsw.max_usage_in_bytes",
# "memory.oom_control.oom_kill_disable",
# "memory.oom_control.under_oom",
"memory.soft_limit_in_bytes",
"memtotal",
"processor_count",
"runtime_seconds",
# "start_epoch",
# "swaptotal",
# "uname"
]

def print_table_header():
print(','.join(table_header))


def summarize_metrics(gi, jobs: list):
table = []
header = [
"id",
"history_id",
"history_name",
"state",
"tool_id",
"invocation_id",
"workflow_id",
"cpuacct.usage",
# "end_epoch",
"galaxy_memory_mb",
"galaxy_slots",
# "memory.failcnt",
"memory.limit_in_bytes",
"memory.max_usage_in_bytes",
# "memory.memsw.limit_in_bytes",
# "memory.memsw.max_usage_in_bytes",
# "memory.oom_control.oom_kill_disable",
# "memory.oom_control.under_oom",
"memory.soft_limit_in_bytes",
"memtotal",
"processor_count",
"runtime_seconds",
# "start_epoch",
# "swaptotal",
# "uname"
]
table.append(header)
# table.append(header)
# print(','.join(header))
for job in jobs:
job_metrics = gi.jobs.get_metrics(job['id'])
Expand All @@ -231,14 +235,14 @@ def summarize_metrics(gi, jobs: list):
if '/' in toolid:
parts = toolid.split('/')
toolid = f'{parts[-2]}/{parts[-1]}'
metrics = metrics_to_dict(job_metrics, header)
metrics = metrics_to_dict(job_metrics, table_header)
metrics['id'] = job.get('id', 'unknown')
metrics['history_id'] = job.get('history_id', 'unknown')
metrics['history_name'] = job.get('history_name', 'unknown')
metrics['state'] = job.get('state', 'unknown')
metrics['tool_id'] = toolid
metrics['invocation_id'] = job.get('invocation_id', 'unknown')
for key in header:
for key in table_header:
if key in metrics:
row.append(metrics[key])
else:
Expand Down Expand Up @@ -334,3 +338,18 @@ def make_result(data):
def _make_dataset_element(name, value):
# print(f"Making dataset element for {name} = {value}({type(value)})")
return dataset_collections.HistoryDatasetElement(name=name, id=value)

def get_float_key(column: int):
def get_key(row: list):
if row[column] == '':
return -1
return float(row[column])
return get_key

def get_str_key(column: int):
# print(f"Getting string key for column {column}")
def get_key(row: list):
# print(f"Sorting by column {column} key {row[column]}")
return row[column]
return get_key

70 changes: 22 additions & 48 deletions abm/lib/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import benchmark
import helm
import yaml
from common import Context, load_profiles, print_markdown_table
from common import Context, load_profiles, print_markdown_table, get_str_key, get_float_key

INVOCATIONS_DIR = "invocations"
METRICS_DIR = "metrics"
Expand Down Expand Up @@ -124,46 +124,16 @@ def summarize(context: Context, args: list):
"""
markdown = False
separator = None
input_dirs = []
make_row = make_table_row
header_row = "Run,Cloud,Job Conf,Workflow,History,Inputs,Tool,Tool Version,State,Slots,Memory,Runtime (Sec),CPU,Memory Limit (Bytes),Memory Max usage (Bytes)"
# for arg in args:
# if arg in ['-t', '--tsv']:
# if separator is not None or markdown:
# print('ERROR: The output format is specified more than once')
# return
# print('tsv')
# separator = '\t'
# elif arg in ['-c', '--csv']:
# if separator is not None or markdown:
# print('ERROR: The output format is specified more than once')
# return
# separator = ','
# print('csv')
# elif arg in ['-m', '--model']:
# if separator is not None or markdown:
# print('ERROR: The output format is specified more than once')
# return
# print('making a model')
# separator = ','
# make_row = make_model_row
# header_row = "job_id,tool_id,tool_version,state,memory.max_usage_in_bytes,cpuacct.usage,process_count,galaxy_slots,runtime_seconds,ref_data_size,input_data_size_1,input_data_size_2"
# elif arg == '--markdown':
# if separator is not None or markdown:
# print('ERROR: The output format is specified more than once')
# return
# markdown = True
# else:
# # print(f"Input dir {arg}")
# input_dirs.append(arg)

parser = argparse.ArgumentParser()
parser.add_argument('dirs', nargs='*')
parser.add_argument('-c', '--csv', action='store_true')
parser.add_argument('-t', '--tsv', action='store_true')
parser.add_argument('-m', '--model', action='store_true')
parser.add_argument('--markdown', action='store_true')
parser.add_argument('-s', '--sort-by', choices=['cpu', 'runtime', 'memory'])
parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool'])
argv = parser.parse_args(args)

count = 0
Expand Down Expand Up @@ -197,8 +167,8 @@ def summarize(context: Context, args: list):
separator = ','

if markdown:
print("|Run|Job Conf|Tool|State|Runtime (Sec)|CPU (Sec) |Max Memory (GB)|")
print("|---|---|---|---|---:|---:|---:|")
print("|Run|Inputs|Job Conf|Tool|State|Runtime (Sec)|Max Memory (GB)|")
print("|---|---|---|---|---|---:|---:|")
else:
print(header_row)

Expand All @@ -218,34 +188,38 @@ def summarize(context: Context, args: list):
row = make_row(data)
table.append(row)
except Exception as e:
# Silently fail to allow the remainder of the table to be generated.
print(f"Unable to process {input_path}")
print(e)
traceback.print_exc()
# Silently fail to allow the remainder of the table to be generated.
# pass

def comparator(row):
print('key', row[key])
print('type', type(row[key]))
return row[key]

reverse = True
if argv.sort_by:
key = 0
comp = get_str_key(6)
if argv.sort_by == 'runtime':
key = 10
elif argv.sort_by == 'cpu':
key = 11
# key = 10
comp = get_float_key(10)
# elif argv.sort_by == 'cpu':
# comp = get_float_comparator(11)
# #key = 11
elif argv.sort_by == 'memory':
key = 13
table.sort(key=lambda row: -1 if row[key] == '' else float(row[key]), reverse=True)
comp = get_float_key(13)
# key = 13
elif argv.sort_by == 'tool':
# print('Getting string key accessor.')
comp = get_str_key(6)
reverse = False
# table.sort(key=lambda row: -1 if row[key] == '' else float(row[key]), reverse=True)
table.sort(key=comp, reverse=reverse)

if markdown:
for row in table:
runtime = '' if len(row[10]) == 0 else f"{float(row[10]):4.1f}"
cpu = '' if len(row[11]) == 0 else f"{float(row[11])/10**9:4.1f}"
# cpu = '' if len(row[11]) == 0 else f"{float(row[11])/10**9:4.1f}"
memory = '' if len(row[13]) == 0 else f"{float(row[13])/GB:4.3f}"
# memory = float(row[13]) / GB
print(f"| {row[0]} | {row[2]} | {row[6]} | {row[7]} | {runtime} | {cpu} | {memory} |")
print(f"| {row[0]} | {row[5].split(' ')[0]} |{row[2]} | {row[6]} | {row[7]} | {runtime} | {memory} |")
else:
for row in table:
print(separator.join([str(x) for x in row]))
Expand Down
51 changes: 25 additions & 26 deletions abm/lib/history.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import json
import os
import sys
Expand All @@ -8,7 +9,8 @@
import yaml
from bioblend.galaxy.objects import GalaxyInstance
from lib.common import (Context, connect, find_history, parse_profile,
print_json, summarize_metrics, print_markdown_table)
print_json, summarize_metrics, print_markdown_table,
get_float_key, get_str_key, print_table_header)

#
# History related functions
Expand Down Expand Up @@ -339,44 +341,44 @@ def tag(context: Context, args: list):


def summarize(context: Context, args: list):
markdown = False
if '--markdown' in args:
markdown = True
args.remove('--markdown')
parser = argparse.ArgumentParser()
parser.add_argument('id_list', nargs='+')
parser.add_argument('--markdown', action='store_true')
parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool'])
argv = parser.parse_args(args)

if len(args) == 0:
if len(argv.id_list) == 0:
print("ERROR: Provide one or more history ID values.")
return
gi = connect(context)
all_jobs = []
while len(args) > 0:
hid = find_history(gi, args.pop(0))
id_list = argv.id_list
while len(id_list) > 0:
hid = find_history(gi, id_list.pop(0))
history = gi.histories.show_history(history_id=hid)
jobs = gi.jobs.get_jobs(history_id=hid)
for job in jobs:
job['invocation_id'] = ''
job['history_id'] = hid
job['history_name'] = history['name']
job['workflow_id'] = ''
# if 'workflow_id' in invocation:
# job['workflow_id'] = invocation['workflow_id']
all_jobs.append(job)
# invocations = gi.invocations.get_invocations(history_id=hid)
# for invocation in invocations:
# id = invocation['id']
# #jobs = gi.jobs.get_jobs(history_id=hid, invocation_id=id)
# jobs = gi.jobs.get_jobs(history_id=hid)
# for job in jobs:
# job['invocation_id'] = id
# job['history_id'] = hid
# if 'workflow_id' in invocation:
# job['workflow_id'] = invocation['workflow_id']
# all_jobs.append(job)
# summarize_metrics(gi, gi.jobs.get_jobs(history_id=args[0]))
table = summarize_metrics(gi, all_jobs)
if markdown:
if argv.sort_by:
reverse = True
get_key = None
if argv.sort_by == 'runtime':
get_key = get_float_key(15)
elif argv.sort_by == 'memory':
get_key = get_float_key(11)
elif argv.sort_by == 'tool':
get_key = get_str_key(4)
reverse = False
table.sort(key=get_key, reverse=reverse)
if argv.markdown:
print_markdown_table(table)
else:
print_table_header()
for row in table:
print(','.join(row))

Expand Down Expand Up @@ -437,9 +439,6 @@ def wait_for(gi: GalaxyInstance, history_id: str):
waiting = False
if waiting:
time.sleep(30)
# elif state == 'paused':
# paused += 1
# print(f"{job['id']}\t{job['state']}\t{job['update_time']}\t{job['tool_id']}")


class JobStates:
Expand Down
31 changes: 21 additions & 10 deletions abm/lib/invocation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from common import Context, connect, print_json, summarize_metrics, print_markdown_table
import argparse
from common import Context, connect, print_json, summarize_metrics, print_markdown_table, get_float_key, get_str_key, \
print_table_header


def doList(context: Context, args: list):
Expand All @@ -25,14 +27,11 @@ def doList(context: Context, args: list):


def summarize(context: Context, args: list):
markdown = False
if '--markdown' in args:
markdown = True
args.remove('--markdown')

if len(args) == 0:
print("ERROR: Provide one or more invocation ID values.")
return
parser = argparse.ArgumentParser()
parser.add_argument('id', nargs=1)
parser.add_argument('--markdown', action='store_true')
parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool'])
argv = parser.parse_args(args)
gi = connect(context)
id = args[0]
all_jobs = []
Expand All @@ -42,8 +41,20 @@ def summarize(context: Context, args: list):
job['workflow_id'] = ''
all_jobs.append(job)
table = summarize_metrics(gi, all_jobs)
if markdown:
if argv.sort_by:
reverse = True
get_key = None
if argv.sort_by == 'runtime':
get_key = get_float_key(15)
elif argv.sort_by == 'memory':
get_key = get_float_key(11)
elif argv.sort_by == 'tool':
get_key = get_str_key(4)
reverse = False
table.sort(key=get_key, reverse=reverse)
if argv.markdown:
print_markdown_table(table)
else:
print_table_header()
for row in table:
print(','.join(row))
Loading