From c0086e7490412e3b93fa112edc3ba93feebcd929 Mon Sep 17 00:00:00 2001 From: Keith Suderman Date: Thu, 14 Dec 2023 11:09:01 -0500 Subject: [PATCH] Add --sort-by to all summarize commands --- abm/lib/common.py | 79 +++++++++++++++++++++++++++---------------- abm/lib/experiment.py | 70 ++++++++++++-------------------------- abm/lib/history.py | 51 ++++++++++++++-------------- abm/lib/invocation.py | 31 +++++++++++------ abm/lib/workflow.py | 49 ++++++++++++--------------- 5 files changed, 139 insertions(+), 141 deletions(-) diff --git a/abm/lib/common.py b/abm/lib/common.py index 09b2c60..7814fc6 100644 --- a/abm/lib/common.py +++ b/abm/lib/common.py @@ -192,37 +192,41 @@ def find_executable(name): # "swaptotal", # "uname" +table_header = [ + "id", + "history_id", + "history_name", + "state", + "tool_id", + "invocation_id", + "workflow_id", + "cpuacct.usage", + # "end_epoch", + "galaxy_memory_mb", + "galaxy_slots", + # "memory.failcnt", + "memory.limit_in_bytes", + "memory.max_usage_in_bytes", + # "memory.memsw.limit_in_bytes", + # "memory.memsw.max_usage_in_bytes", + # "memory.oom_control.oom_kill_disable", + # "memory.oom_control.under_oom", + "memory.soft_limit_in_bytes", + "memtotal", + "processor_count", + "runtime_seconds", + # "start_epoch", + # "swaptotal", + # "uname" +] + +def print_table_header(): + print(','.join(table_header)) + def summarize_metrics(gi, jobs: list): table = [] - header = [ - "id", - "history_id", - "history_name", - "state", - "tool_id", - "invocation_id", - "workflow_id", - "cpuacct.usage", - # "end_epoch", - "galaxy_memory_mb", - "galaxy_slots", - # "memory.failcnt", - "memory.limit_in_bytes", - "memory.max_usage_in_bytes", - # "memory.memsw.limit_in_bytes", - # "memory.memsw.max_usage_in_bytes", - # "memory.oom_control.oom_kill_disable", - # "memory.oom_control.under_oom", - "memory.soft_limit_in_bytes", - "memtotal", - "processor_count", - "runtime_seconds", - # "start_epoch", - # "swaptotal", - # "uname" - ] - table.append(header) + # table.append(header) # print(','.join(header)) for job in jobs: job_metrics = gi.jobs.get_metrics(job['id']) @@ -231,14 +235,14 @@ def summarize_metrics(gi, jobs: list): if '/' in toolid: parts = toolid.split('/') toolid = f'{parts[-2]}/{parts[-1]}' - metrics = metrics_to_dict(job_metrics, header) + metrics = metrics_to_dict(job_metrics, table_header) metrics['id'] = job.get('id', 'unknown') metrics['history_id'] = job.get('history_id', 'unknown') metrics['history_name'] = job.get('history_name', 'unknown') metrics['state'] = job.get('state', 'unknown') metrics['tool_id'] = toolid metrics['invocation_id'] = job.get('invocation_id', 'unknown') - for key in header: + for key in table_header: if key in metrics: row.append(metrics[key]) else: @@ -334,3 +338,18 @@ def make_result(data): def _make_dataset_element(name, value): # print(f"Making dataset element for {name} = {value}({type(value)})") return dataset_collections.HistoryDatasetElement(name=name, id=value) + +def get_float_key(column: int): + def get_key(row: list): + if row[column] == '': + return -1 + return float(row[column]) + return get_key + +def get_str_key(column: int): + # print(f"Getting string key for column {column}") + def get_key(row: list): + # print(f"Sorting by column {column} key {row[column]}") + return row[column] + return get_key + diff --git a/abm/lib/experiment.py b/abm/lib/experiment.py index 81c0c5c..2c03563 100644 --- a/abm/lib/experiment.py +++ b/abm/lib/experiment.py @@ -11,7 +11,7 @@ import benchmark import helm import yaml -from common import Context, load_profiles, print_markdown_table +from common import Context, load_profiles, print_markdown_table, get_str_key, get_float_key INVOCATIONS_DIR = "invocations" METRICS_DIR = "metrics" @@ -124,38 +124,8 @@ def summarize(context: Context, args: list): """ markdown = False separator = None - input_dirs = [] make_row = make_table_row header_row = "Run,Cloud,Job Conf,Workflow,History,Inputs,Tool,Tool Version,State,Slots,Memory,Runtime (Sec),CPU,Memory Limit (Bytes),Memory Max usage (Bytes)" - # for arg in args: - # if arg in ['-t', '--tsv']: - # if separator is not None or markdown: - # print('ERROR: The output format is specified more than once') - # return - # print('tsv') - # separator = '\t' - # elif arg in ['-c', '--csv']: - # if separator is not None or markdown: - # print('ERROR: The output format is specified more than once') - # return - # separator = ',' - # print('csv') - # elif arg in ['-m', '--model']: - # if separator is not None or markdown: - # print('ERROR: The output format is specified more than once') - # return - # print('making a model') - # separator = ',' - # make_row = make_model_row - # header_row = "job_id,tool_id,tool_version,state,memory.max_usage_in_bytes,cpuacct.usage,process_count,galaxy_slots,runtime_seconds,ref_data_size,input_data_size_1,input_data_size_2" - # elif arg == '--markdown': - # if separator is not None or markdown: - # print('ERROR: The output format is specified more than once') - # return - # markdown = True - # else: - # # print(f"Input dir {arg}") - # input_dirs.append(arg) parser = argparse.ArgumentParser() parser.add_argument('dirs', nargs='*') @@ -163,7 +133,7 @@ def summarize(context: Context, args: list): parser.add_argument('-t', '--tsv', action='store_true') parser.add_argument('-m', '--model', action='store_true') parser.add_argument('--markdown', action='store_true') - parser.add_argument('-s', '--sort-by', choices=['cpu', 'runtime', 'memory']) + parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool']) argv = parser.parse_args(args) count = 0 @@ -197,8 +167,8 @@ def summarize(context: Context, args: list): separator = ',' if markdown: - print("|Run|Job Conf|Tool|State|Runtime (Sec)|CPU (Sec) |Max Memory (GB)|") - print("|---|---|---|---|---:|---:|---:|") + print("|Run|Inputs|Job Conf|Tool|State|Runtime (Sec)|Max Memory (GB)|") + print("|---|---|---|---|---|---:|---:|") else: print(header_row) @@ -218,34 +188,38 @@ def summarize(context: Context, args: list): row = make_row(data) table.append(row) except Exception as e: - # Silently fail to allow the remainder of the table to be generated. print(f"Unable to process {input_path}") print(e) traceback.print_exc() + # Silently fail to allow the remainder of the table to be generated. # pass - def comparator(row): - print('key', row[key]) - print('type', type(row[key])) - return row[key] - + reverse = True if argv.sort_by: - key = 0 + comp = get_str_key(6) if argv.sort_by == 'runtime': - key = 10 - elif argv.sort_by == 'cpu': - key = 11 + # key = 10 + comp = get_float_key(10) + # elif argv.sort_by == 'cpu': + # comp = get_float_comparator(11) + # #key = 11 elif argv.sort_by == 'memory': - key = 13 - table.sort(key=lambda row: -1 if row[key] == '' else float(row[key]), reverse=True) + comp = get_float_key(13) + # key = 13 + elif argv.sort_by == 'tool': + # print('Getting string key accessor.') + comp = get_str_key(6) + reverse = False + # table.sort(key=lambda row: -1 if row[key] == '' else float(row[key]), reverse=True) + table.sort(key=comp, reverse=reverse) if markdown: for row in table: runtime = '' if len(row[10]) == 0 else f"{float(row[10]):4.1f}" - cpu = '' if len(row[11]) == 0 else f"{float(row[11])/10**9:4.1f}" + # cpu = '' if len(row[11]) == 0 else f"{float(row[11])/10**9:4.1f}" memory = '' if len(row[13]) == 0 else f"{float(row[13])/GB:4.3f}" # memory = float(row[13]) / GB - print(f"| {row[0]} | {row[2]} | {row[6]} | {row[7]} | {runtime} | {cpu} | {memory} |") + print(f"| {row[0]} | {row[5].split(' ')[0]} |{row[2]} | {row[6]} | {row[7]} | {runtime} | {memory} |") else: for row in table: print(separator.join([str(x) for x in row])) diff --git a/abm/lib/history.py b/abm/lib/history.py index 04e1fcb..1f6dda4 100644 --- a/abm/lib/history.py +++ b/abm/lib/history.py @@ -1,3 +1,4 @@ +import argparse import json import os import sys @@ -8,7 +9,8 @@ import yaml from bioblend.galaxy.objects import GalaxyInstance from lib.common import (Context, connect, find_history, parse_profile, - print_json, summarize_metrics, print_markdown_table) + print_json, summarize_metrics, print_markdown_table, + get_float_key, get_str_key, print_table_header) # # History related functions @@ -339,18 +341,20 @@ def tag(context: Context, args: list): def summarize(context: Context, args: list): - markdown = False - if '--markdown' in args: - markdown = True - args.remove('--markdown') + parser = argparse.ArgumentParser() + parser.add_argument('id_list', nargs='+') + parser.add_argument('--markdown', action='store_true') + parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool']) + argv = parser.parse_args(args) - if len(args) == 0: + if len(argv.id_list) == 0: print("ERROR: Provide one or more history ID values.") return gi = connect(context) all_jobs = [] - while len(args) > 0: - hid = find_history(gi, args.pop(0)) + id_list = argv.id_list + while len(id_list) > 0: + hid = find_history(gi, id_list.pop(0)) history = gi.histories.show_history(history_id=hid) jobs = gi.jobs.get_jobs(history_id=hid) for job in jobs: @@ -358,25 +362,23 @@ def summarize(context: Context, args: list): job['history_id'] = hid job['history_name'] = history['name'] job['workflow_id'] = '' - # if 'workflow_id' in invocation: - # job['workflow_id'] = invocation['workflow_id'] all_jobs.append(job) - # invocations = gi.invocations.get_invocations(history_id=hid) - # for invocation in invocations: - # id = invocation['id'] - # #jobs = gi.jobs.get_jobs(history_id=hid, invocation_id=id) - # jobs = gi.jobs.get_jobs(history_id=hid) - # for job in jobs: - # job['invocation_id'] = id - # job['history_id'] = hid - # if 'workflow_id' in invocation: - # job['workflow_id'] = invocation['workflow_id'] - # all_jobs.append(job) - # summarize_metrics(gi, gi.jobs.get_jobs(history_id=args[0])) table = summarize_metrics(gi, all_jobs) - if markdown: + if argv.sort_by: + reverse = True + get_key = None + if argv.sort_by == 'runtime': + get_key = get_float_key(15) + elif argv.sort_by == 'memory': + get_key = get_float_key(11) + elif argv.sort_by == 'tool': + get_key = get_str_key(4) + reverse = False + table.sort(key=get_key, reverse=reverse) + if argv.markdown: print_markdown_table(table) else: + print_table_header() for row in table: print(','.join(row)) @@ -437,9 +439,6 @@ def wait_for(gi: GalaxyInstance, history_id: str): waiting = False if waiting: time.sleep(30) - # elif state == 'paused': - # paused += 1 - # print(f"{job['id']}\t{job['state']}\t{job['update_time']}\t{job['tool_id']}") class JobStates: diff --git a/abm/lib/invocation.py b/abm/lib/invocation.py index c531471..00baa9f 100644 --- a/abm/lib/invocation.py +++ b/abm/lib/invocation.py @@ -1,4 +1,6 @@ -from common import Context, connect, print_json, summarize_metrics, print_markdown_table +import argparse +from common import Context, connect, print_json, summarize_metrics, print_markdown_table, get_float_key, get_str_key, \ + print_table_header def doList(context: Context, args: list): @@ -25,14 +27,11 @@ def doList(context: Context, args: list): def summarize(context: Context, args: list): - markdown = False - if '--markdown' in args: - markdown = True - args.remove('--markdown') - - if len(args) == 0: - print("ERROR: Provide one or more invocation ID values.") - return + parser = argparse.ArgumentParser() + parser.add_argument('id', nargs=1) + parser.add_argument('--markdown', action='store_true') + parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool']) + argv = parser.parse_args(args) gi = connect(context) id = args[0] all_jobs = [] @@ -42,8 +41,20 @@ def summarize(context: Context, args: list): job['workflow_id'] = '' all_jobs.append(job) table = summarize_metrics(gi, all_jobs) - if markdown: + if argv.sort_by: + reverse = True + get_key = None + if argv.sort_by == 'runtime': + get_key = get_float_key(15) + elif argv.sort_by == 'memory': + get_key = get_float_key(11) + elif argv.sort_by == 'tool': + get_key = get_str_key(4) + reverse = False + table.sort(key=get_key, reverse=reverse) + if argv.markdown: print_markdown_table(table) else: + print_table_header() for row in table: print(','.join(row)) diff --git a/abm/lib/workflow.py b/abm/lib/workflow.py index 54c79a3..1e21e77 100644 --- a/abm/lib/workflow.py +++ b/abm/lib/workflow.py @@ -1,13 +1,14 @@ +import argparse import json import logging import os from pathlib import Path from pprint import pprint -import planemo import requests import yaml -from common import Context, connect, summarize_metrics, print_markdown_table +from common import Context, connect, summarize_metrics, print_markdown_table, get_float_key, get_str_key, \ + print_table_header from planemo.galaxy.workflows import install_shed_repos from planemo.runnable import for_path, for_uri @@ -81,12 +82,6 @@ def import_from_url(context: Context, args: list): input_text = response.text with open(cached_file, 'w') as f: f.write(input_text) - - # response = requests.get(url) - # if (response.status_code != 200): - # print(f"ERROR: There was a problem downloading the workflow: {response.status_code}") - # print(response.reason) - # return try: workflow = json.loads(input_text) except Exception as e: @@ -164,30 +159,21 @@ def invocation(context: Context, args: list): print("ERROR: Invalid paramaeters. A workflow ID invocation ID are required") return workflow_id = None - invocation_id = None while len(args) > 0: arg = args.pop(0) if arg in ['-w', '--work', '--workflow']: print("Setting workflow id") workflow_id = args.pop(0) - # elif arg in ['-i', '--invoke', '--invocation']: - # invocation_id = args.pop(0) - # print("Setting invocation id") else: print(f'Invalid parameter: "{arg}') return if workflow_id is None: print("ERROR: No workflow ID provided") return - # if invocation_id is None: - # print("ERROR: No invocation ID provided") - # return gi = connect(context) - # result = gi.workflows.show_invocation(workflow_id, invocation_id) invocations = gi.invocations.get_invocations( workflow_id=workflow_id, view='element', step_details=True ) - # print(json.dumps(result, indent=4)) print('ID\tState\tWorkflow\tHistory') for invocation in invocations: id = invocation['id'] @@ -236,16 +222,13 @@ def rename(context: Context, args: list): def summarize(context: Context, args: list): - markdown = False - if '--markdown' in args: - markdown = True - args.remove('--markdown') - - if len(args) == 0: - print("ERROR: Provide one or more workflow ID values.") - return + parser = argparse.ArgumentParser() + parser.add_argument('id', nargs=1) + parser.add_argument('--markdown', action='store_true') + parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool']) + argv = parser.parse_args(args) gi = connect(context) - wid = args[0] + wid = argv.id all_jobs = [] invocations = gi.invocations.get_invocations(workflow_id=wid) for invocation in invocations: @@ -256,8 +239,20 @@ def summarize(context: Context, args: list): job['workflow_id'] = wid all_jobs.append(job) table = summarize_metrics(gi, all_jobs) - if markdown: + if argv.sort_by: + reverse = True + get_key = None + if argv.sort_by == 'runtime': + get_key = get_float_key(15) + elif argv.sort_by == 'memory': + get_key = get_float_key(11) + elif argv.sort_by == 'tool': + get_key = get_str_key(4) + reverse = False + table.sort(key=get_key, reverse=reverse) + if argv.markdown: print_markdown_table(table) else: + print_table_header() for row in table: print(','.join(row))