diff --git a/Makefile b/Makefile index 4cf4735..10fc6f2 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,10 @@ clean: @if [ -e dist/ ] ; then rm -rf dist/ ; fi @if [ -e gxabm.egg-info ] ; then rm -rf gxabm.egg-info ; fi +format: + black -S abm/ + isort abm/ + test-deploy: twine upload -r pypitest dist/* diff --git a/abm/VERSION b/abm/VERSION index 49cdd66..4985c3c 100644 --- a/abm/VERSION +++ b/abm/VERSION @@ -1 +1 @@ -2.7.6 +2.8.0-dev.6 diff --git a/abm/__init__.py b/abm/__init__.py index 2fae8ce..fb8a5b7 100644 --- a/abm/__init__.py +++ b/abm/__init__.py @@ -1,6 +1,9 @@ -import os, sys +import os +import sys + sys.path.append(os.path.dirname(os.path.realpath(__file__))) + def getVersion(): dir = os.path.dirname(os.path.realpath(__file__)) version_file = os.path.join(dir, 'VERSION') diff --git a/abm/__main__.py b/abm/__main__.py index ee48e48..014bada 100644 --- a/abm/__main__.py +++ b/abm/__main__.py @@ -7,17 +7,20 @@ """ -import yaml -import sys -import os import logging -from lib.common import Context +import os +import sys from pprint import pprint -from abm import getVersion +import yaml # These imports are required because we need Python to be load them to the # symbol table so the parse_menu method can find them in globals() -from lib import job, dataset, workflow, history, library, folder, benchmark, helm, kubectl, config, experiment, users, cloudlaunch, invocation +from lib import (benchmark, cloudlaunch, config, dataset, experiment, folder, + helm, history, invocation, job, kubectl, library, users, + workflow) +from lib.common import Context + +from abm import getVersion log = logging.getLogger('abm') handler = logging.StreamHandler() @@ -29,11 +32,12 @@ log.addHandler(handler) -#VERSION = '2.0.0-dev' +# VERSION = '2.0.0-dev' BOLD = '\033[1m' CLEAR = '\033[0m' + def bold(text: str): """ Wraps the text in ANSI control sequences to generate bold text in the terminal. @@ -50,17 +54,19 @@ def bold(text: str): # Commands that do not depend on a cloud instance stand_alone_commands = [] + def head(text): print(bold(text)) -def command_list(commands:list): +def command_list(commands: list): return '|'.join(bold(c) for c in commands) def copyright(): print(f" Copyright 2023 The Galaxy Project. All Rights Reserved.\n") + def print_main_help(menu_data): print() head(" DESCRIPTION") @@ -79,7 +85,9 @@ def print_main_help(menu_data): print(" print this help screen and exit") print() head(" NOTES") - print(f" Available SUBCOMMANDS and OPTIONS depend on the command. Use the {bold('help')} subcommand") + print( + f" Available SUBCOMMANDS and OPTIONS depend on the command. Use the {bold('help')} subcommand" + ) print(f" to learn more about each of the commands. For example:\n") print(f" $> abm workflow help\n") copyright() @@ -92,7 +100,7 @@ def print_help(menu_data, command): submenu = menu_item break if submenu is None: - #print_main_help(menu_data) + # print_main_help(menu_data) print(f"No help for {command} is available") return @@ -106,7 +114,9 @@ def print_help(menu_data, command): print(f" {submenu['help']}\n") head(" SUBCOMMANDS") for menu_item in submenu['menu']: - print(f" {'|'.join(bold(x) for x in menu_item['name'])} {menu_item['params'] if 'params' in menu_item else ''}") + print( + f" {'|'.join(bold(x) for x in menu_item['name'])} {menu_item['params'] if 'params' in menu_item else ''}" + ) print(f" {menu_item['help']}") print(f" {bold('help')}") print(" print this help screen and exit") @@ -181,6 +191,7 @@ def version(): print(f" Galaxy Automated Benchmarking v{version}") copyright() + def _get_logopt(args: list): OPTS = ['-log', '--log', '-logging', '--logging'] for i in range(len(args)): @@ -193,12 +204,20 @@ def entrypoint(): # Check if log level is being set logopt = _get_logopt(sys.argv) if logopt >= 0: - if logopt+1 >= len(sys.argv): + if logopt + 1 >= len(sys.argv): print("ERROR: no log level provided") return level = sys.argv[logopt + 1].upper() - if level not in ['DEBUG', 'INFO', 'WARN', 'WARNING', 'ERROR', 'FATAL', 'CRITICAL']: + if level not in [ + 'DEBUG', + 'INFO', + 'WARN', + 'WARNING', + 'ERROR', + 'FATAL', + 'CRITICAL', + ]: print(f"ERROR: Invalid logging level {sys.argv[logopt + 1]}") return print(f"Setting the log level to {level}") diff --git a/abm/lib/__init__.py b/abm/lib/__init__.py index 6fd7426..2cdb864 100644 --- a/abm/lib/__init__.py +++ b/abm/lib/__init__.py @@ -1,13 +1,17 @@ -import os, sys, json +import json +import os +import sys + sys.path.append(os.path.dirname(os.path.realpath(__file__))) -#from common import parse_profile +# from common import parse_profile INVOCATIONS_DIR = "invocations" METRICS_DIR = "metrics" parser = None + class Keys: NAME = 'name' RUNS = 'runs' @@ -15,7 +19,6 @@ class Keys: REFERENCE_DATA = 'reference_data' WORKFLOW_ID = 'workflow_id' DATASET_ID = 'dataset_id' + COLLECTION = 'collection' HISTORY_BASE_NAME = 'output_history_base_name' HISTORY_NAME = 'history_name' - - diff --git a/abm/lib/benchmark.py b/abm/lib/benchmark.py index deb6f03..b7cb745 100644 --- a/abm/lib/benchmark.py +++ b/abm/lib/benchmark.py @@ -1,15 +1,19 @@ +import argparse +import json +import logging import os import sys -import json + import yaml -import logging -import argparse -from lib import Keys, INVOCATIONS_DIR, METRICS_DIR -from lib.common import connect, Context, _get_dataset_data, _make_dataset_element, print_json from bioblend.galaxy import GalaxyInstance, dataset_collections +from lib import INVOCATIONS_DIR, METRICS_DIR, Keys +from lib.common import (Context, _get_dataset_data, _make_dataset_element, + connect, print_json) +from lib.history import wait_for log = logging.getLogger('abm') + def run_cli(context: Context, args: list): """ Runs a single workflow defined by *args[0]* @@ -31,7 +35,7 @@ def run_cli(context: Context, args: list): parser.add_argument('-p', '--prefix') parser.add_argument('-e', '--experiment') a = parser.parse_args(args) - #workflow_path = args[0] + # workflow_path = args[0] if not os.path.exists(a.workflow_path): print(f'ERROR: can not find workflow configuration {a.workflow_path}') return @@ -44,7 +48,6 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): # if len(args) > 2: # experiment = args[2].replace(' ', '_').lower() - if os.path.exists(INVOCATIONS_DIR): if not os.path.isdir(INVOCATIONS_DIR): print('ERROR: Can not save invocation status, directory name in use.') @@ -52,7 +55,6 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): else: os.mkdir(INVOCATIONS_DIR) - if os.path.exists(METRICS_DIR): if not os.path.isdir(METRICS_DIR): print('ERROR: Can not save metrics, directory name in use.') @@ -98,12 +100,12 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): print(f'ERROR: Invalid input specification for {spec[Keys.NAME]}') return False dsname = spec[Keys.NAME] - #dsid = find_dataset_id(gi, dsname) + # dsid = find_dataset_id(gi, dsname) dsdata = _get_dataset_data(gi, dsname) dsid = dsdata['id'] ref_data_size.append(dsdata['size']) print(f"Reference input dataset {dsid}") - inputs[input[0]] = {'id': dsid, 'src': 'hda', 'size':dsdata['size']} + inputs[input[0]] = {'id': dsid, 'src': 'hda', 'size': dsdata['size']} input_names.append(dsname) count = 0 @@ -126,7 +128,9 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): for spec in run[Keys.INPUTS]: input = gi.workflows.get_workflow_inputs(wfid, spec[Keys.NAME]) if input is None or len(input) == 0: - print(f'ERROR: Invalid input specification for {spec[Keys.NAME]}') + print( + f'ERROR: Invalid input specification for {spec[Keys.NAME]}' + ) return False if 'value' in spec: @@ -135,15 +139,18 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): elif 'collection' in spec: dsname = spec['collection'] input_names.append(dsname) - #inputs.append(dsname) + # inputs.append(dsname) # dsid = find_dataset_id(gi, dsname) - dsdata = _get_dataset_data(gi, dsname) + dsid = find_collection_id(gi, dsname) + dsdata = _get_dataset_data(gi, dsid) if dsdata is None: - raise Exception(f"ERROR: unable to resolve {dsname} to a dataset.") + raise Exception( + f"ERROR: unable to resolve {dsname} to a dataset." + ) dsid = dsdata['id'] dssize = dsdata['size'] input_data_size.append(dssize) - print(f"Input dataset ID: {dsname} [{dsid}] {dssize}") + print(f"Input collection ID: {dsname} [{dsid}] {dssize}") inputs[input[0]] = {'id': dsid, 'src': 'hdca', 'size': dssize} elif 'paired' in spec: name = spec['name'] @@ -157,7 +164,11 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): dssize = dsdata['size'] input_data_size.append(dssize) print(f"Input dataset ID: {name} [{dsid}] {dssize}") - inputs[input[0]] = {'id': dsid, 'src': 'hdca', 'size': dssize} + inputs[input[0]] = { + 'id': dsid, + 'src': 'hdca', + 'size': dssize, + } else: histories = gi.histories.get_histories(name=spec['history']) if len(histories) == 0: @@ -170,32 +181,41 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): elements = [] size = 0 for key in item.keys(): - #print(f"Getting dataset for {key} = {item[key]}") + # print(f"Getting dataset for {key} = {item[key]}") value = _get_dataset_data(gi, item[key]) size += value['size'] - elements.append(_make_dataset_element(key, value['id'])) + elements.append( + _make_dataset_element(key, value['id']) + ) description = dataset_collections.CollectionDescription( name=name, # type='paired', - elements=elements + elements=elements, ) pairs += 1 # print(json.dumps(description.__dict__, indent=4)) # pprint(description) collection = gi.histories.create_dataset_collection( - history_id=hid, - collection_description=description + history_id=hid, collection_description=description ) - print(f"Input dataset paired list: {collection['id']} {size}") - inputs[input[0]] = {'id': collection['id'], 'src':'hdca', 'size':size} + print( + f"Input dataset paired list: {collection['id']} {size}" + ) + inputs[input[0]] = { + 'id': collection['id'], + 'src': 'hdca', + 'size': size, + } elif Keys.DATASET_ID in spec: dsname = spec[Keys.DATASET_ID] input_names.append(dsname) - #inputs.append(dsname) + # inputs.append(dsname) # dsid = find_dataset_id(gi, dsname) dsdata = _get_dataset_data(gi, dsname) if dsdata is None: - raise Exception(f"ERROR: unable to resolve {dsname} to a dataset.") + raise Exception( + f"ERROR: unable to resolve {dsname} to a dataset." + ) dsid = dsdata['id'] dssize = dsdata['size'] input_data_size.append(dssize) @@ -204,15 +224,24 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): else: raise Exception(f'Invalid input value') print(f"Running workflow {wfid} in history {new_history_name}") - invocation = gi.workflows.invoke_workflow(wfid, inputs=inputs, history_name=new_history_name) + invocation = gi.workflows.invoke_workflow( + wfid, inputs=inputs, history_name=new_history_name + ) id = invocation['id'] - #invocations = gi.invocations.wait_for_invocation(id, 86400, 10, False) - invocations = gi.invocations.wait_for_invocation(id, 86400, 10, True) + # invocations = gi.invocations.wait_for_invocation(id, 86400, 10, False) + try: + invocations = gi.invocations.wait_for_invocation(id, 86400, 10, False) + except: + pprint(invocation) + sys.exc_info() print("Waiting for jobs") if history_prefix is not None: parts = history_prefix.split() invocations['run'] = parts[0] - invocations['cloud'] = parts[1] + if len(parts) > 1: + invocations['cloud'] = parts[1] + else: + invocations['cloud'] = 'Unknown' if len(parts) > 2: invocations['job_conf'] = parts[2] else: @@ -225,7 +254,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): invocations['inputs'] = ' '.join(input_names) invocations['ref_data_size'] = ref_data_size invocations['input_data_size'] = input_data_size - #TODO Change this output path. (Change it to what? KS) + # TODO Change this output path. (Change it to what? KS) output_path = os.path.join(invocations_dir, id + '.json') with open(output_path, 'w') as f: json.dump(invocations, f, indent=4) @@ -235,7 +264,6 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): return True - def translate(context: Context, args: list): if len(args) == 0: print('ERROR: no workflow configuration specified') @@ -299,7 +327,9 @@ def validate(context: Context, args: list): wfid = None if wfid is None: - print(f"The workflow '{workflow[Keys.WORKFLOW_ID]}' does not exist on this server.") + print( + f"The workflow '{workflow[Keys.WORKFLOW_ID]}' does not exist on this server." + ) return else: print(f"Workflow: {workflow[Keys.WORKFLOW_ID]} -> {wfid}") @@ -315,40 +345,66 @@ def validate(context: Context, args: list): if input is None or len(input) == 0: print(f'ERROR: Invalid input specification for {spec[Keys.NAME]}') errors += 1 - #sys.exit(1) + # sys.exit(1) else: dsid = find_dataset_id(gi, spec[Keys.DATASET_ID]) if dsid is None: - print(f"ERROR: Reference dataset not found {spec[Keys.DATASET_ID]}") + print( + f"ERROR: Reference dataset not found {spec[Keys.DATASET_ID]}" + ) errors += 1 else: - print(f"Reference input dataset {spec[Keys.DATASET_ID]} -> {dsid}") + print( + f"Reference input dataset {spec[Keys.DATASET_ID]} -> {dsid}" + ) inputs[input[0]] = {'id': dsid, 'src': 'hda'} - #count = 0 + # count = 0 for run in workflow[Keys.RUNS]: - #count += 1 + # count += 1 if Keys.INPUTS in run and run[Keys.INPUTS] is not None: for spec in run[Keys.INPUTS]: input = gi.workflows.get_workflow_inputs(wfid, spec[Keys.NAME]) if input is None or len(input) == 0: - print(f'ERROR: Invalid input specification for {spec[Keys.NAME]}') + print( + f'ERROR: Invalid input specification for {spec[Keys.NAME]}' + ) errors += 1 else: - dsid = find_dataset_id(gi, spec[Keys.DATASET_ID]) + key = None + if Keys.DATASET_ID in spec: + dsid = find_dataset_id(gi, spec[Keys.DATASET_ID]) + key = Keys.DATASET_ID + elif Keys.COLLECTION in spec: + print(f"Trying to find collection {spec[Keys.COLLECTION]}") + dsid = find_collection_id(gi, spec[Keys.COLLECTION]) + key = Keys.COLLECTION + elif 'name' in spec and 'value' in spec: + key = 'name' + dsid = spec['value'] + else: + print("dataset_id nor collection found in spec") + pprint(input) + pprint(spec) + errors += 1 + break if dsid is None: - print(f"ERROR: Dataset not found {spec[Keys.DATASET_ID]}") + print(f"ERROR: Dataset not found {spec[key]}") errors += 1 else: - print(f"Input dataset: {spec[Keys.DATASET_ID]} -> {dsid}") + print(f"Input dataset: {spec[key]} -> {dsid}") inputs[input[0]] = {'id': dsid, 'src': 'hda'} if errors == 0: - print("This workflow configuration is valid and can be executed on this server.") + print( + "This workflow configuration is valid and can be executed on this server." + ) else: print("---------------------------------") print("WARNING") - print("The above problems need to be corrected before this workflow configuration can be used.") + print( + "The above problems need to be corrected before this workflow configuration can be used." + ) print("---------------------------------") total_errors += errors @@ -356,7 +412,7 @@ def validate(context: Context, args: list): def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict): - """ Blocks until all jobs defined in the *invocations* to complete. + """Blocks until all jobs defined in the *invocations* to complete. :param gi: The *GalaxyInstance** running the jobs :param invocations: @@ -369,41 +425,63 @@ def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict): conf = invocations['job_conf'] inputs = invocations['inputs'] output_dir = invocations['output_dir'] - for step in invocations['steps']: - job_id = step['job_id'] - if job_id is not None: - retries = 3 - done = False - while not done and retries >= 0: - print(f"Waiting for job {job_id} on {context.GALAXY_SERVER}") - try: - # TDOD Should retry if anything throws an exception. - status = gi.jobs.wait_for_job(job_id, 86400, 10, False) - data = gi.jobs.show_job(job_id, full_details=True) - metrics = { - 'run': run, - 'cloud': cloud, - 'job_conf': conf, - 'workflow_id': wfid, - 'history_id': hid, - 'inputs': inputs, - 'metrics': data, - 'status': status, - 'server': context.GALAXY_SERVER, - 'ref_data_size': invocations['ref_data_size'], - 'input_data_size': invocations['input_data_size'] - } - output_path = os.path.join(output_dir, f"{job_id}.json") - with open(output_path, "w") as f: - json.dump(metrics, f, indent=4) - print(f"Wrote metrics to {output_path}") - done = True - except ConnectionError as e: - print(f"ERROR: connection dropped while waiting for {job_id}") - retries -= 1 - except Exception as e: - print(f"ERROR: {e}") - retries -= 1 + wait_for(gi, hid) + jobs = gi.jobs.get_jobs(history_id=hid) + for job in jobs: + data = gi.jobs.show_job(job['id'], full_details=True) + metrics = { + 'run': run, + 'cloud': cloud, + 'job_conf': conf, + 'workflow_id': wfid, + 'history_id': hid, + 'inputs': inputs, + 'metrics': data, + 'status': job['state'], + 'server': context.GALAXY_SERVER, + 'ref_data_size': invocations['ref_data_size'], + 'input_data_size': invocations['input_data_size'], + } + output_path = os.path.join(output_dir, f"{job['id']}.json") + with open(output_path, "w") as f: + json.dump(metrics, f, indent=4) + print(f"Wrote metrics to {output_path}") + + # for step in invocations['steps']: + # job_id = step['job_id'] + # if job_id is not None: + # retries = 3 + # done = False + # while not done and retries >= 0: + # print(f"Waiting for job {job_id} on {context.GALAXY_SERVER}") + # try: + # # TDOD Should retry if anything throws an exception. + # status = gi.jobs.wait_for_job(job_id, 86400, 10, False) + # data = gi.jobs.show_job(job_id, full_details=True) + # metrics = { + # 'run': run, + # 'cloud': cloud, + # 'job_conf': conf, + # 'workflow_id': wfid, + # 'history_id': hid, + # 'inputs': inputs, + # 'metrics': data, + # 'status': status, + # 'server': context.GALAXY_SERVER, + # 'ref_data_size': invocations['ref_data_size'], + # 'input_data_size': invocations['input_data_size'] + # } + # output_path = os.path.join(output_dir, f"{job_id}.json") + # with open(output_path, "w") as f: + # json.dump(metrics, f, indent=4) + # print(f"Wrote metrics to {output_path}") + # done = True + # except ConnectionError as e: + # print(f"ERROR: connection dropped while waiting for {job_id}") + # retries -= 1 + # except Exception as e: + # print(f"ERROR: {e}") + # retries -= 1 def parse_workflow(workflow_path: str): @@ -436,7 +514,7 @@ def find_workflow_id(gi, name_or_id): return wf[0]['id'] except: pass - #print(f"Warning: unable to find workflow {name_or_id}") + # print(f"Warning: unable to find workflow {name_or_id}") return None @@ -449,7 +527,9 @@ def find_dataset_id(gi, name_or_id): pass try: - datasets = gi.datasets.get_datasets(name=name_or_id) # , deleted=True, purged=True) + datasets = gi.datasets.get_datasets( + name=name_or_id + ) # , deleted=True, purged=True) for ds in datasets: if ds['state'] == 'ok' and not ds['deleted'] and ds['visible']: return ds['id'] @@ -459,13 +539,33 @@ def find_dataset_id(gi, name_or_id): except: print('Caught an exception') print(sys.exc_info()) - #print(f"Warning: unable to find dataset {name_or_id}") + # print(f"Warning: unable to find dataset {name_or_id}") + return None + + +def find_collection_id(gi, name): + kwargs = {'limit': 10000, 'offset': 0} + datasets = gi.datasets.get_datasets(**kwargs) + if len(datasets) == 0: + print('No datasets found') + return None + for dataset in datasets: + # print(f"Checking if dataset {dataset['name']} == {name}") + if dataset['type'] == 'collection' and dataset['name'].strip() == name: + if ( + dataset['populated_state'] == 'ok' + and not dataset['deleted'] + and dataset['visible'] + ): + return dataset['id'] return None from pprint import pprint -def test(context:Context, args:list): + + +def test(context: Context, args: list): id = 'c90fffcf98b31cd3' gi = connect(context) inputs = gi.workflows.get_workflow_inputs(id, 'PE fastq input') - pprint(inputs) \ No newline at end of file + pprint(inputs) diff --git a/abm/lib/cloudlaunch.py b/abm/lib/cloudlaunch.py index ef25fe7..0abd261 100644 --- a/abm/lib/cloudlaunch.py +++ b/abm/lib/cloudlaunch.py @@ -1,20 +1,21 @@ -import os -import json -import arrow -import requests import configparser +import json +import os import traceback -from common import Context +import arrow +import requests from cloudlaunch_cli.main import create_api_client - +from common import Context BOLD = '\033[1m' CLEAR = '\033[0m' + def h1(text): return f"{BOLD}{text}{CLEAR}" + list_help = f''' {h1('NAME')} list @@ -38,6 +39,7 @@ def h1(text): ''' + def list(context: Context, args: list): archived = False filter = None @@ -48,7 +50,9 @@ def list(context: Context, args: list): if arg in ['-a', '--archived', 'archived']: archived = True elif arg in ['-r', '--running', 'running']: - filter = lambda d: 'running' in status(d.latest_task) or ('LAUNCH' == d.latest_task.action and 'SUCCESS' == status(d.latest_task)) + filter = lambda d: 'running' in status(d.latest_task) or ( + 'LAUNCH' == d.latest_task.action and 'SUCCESS' == status(d.latest_task) + ) elif arg in ['-d', '--deleted', 'deleted']: filter = lambda d: 'DELETE' in d.latest_task.action elif arg in ['-l', '--launch', 'launch']: @@ -64,46 +68,35 @@ def list(context: Context, args: list): deployments = create_api_client().deployments.list(archived=archived) if filter is not None: - deployments = [ d for d in deployments if filter(d) ] + deployments = [d for d in deployments if filter(d)] if n is not None and len(deployments) > n: deployments = deployments[:n] _print_deployments(deployments) - def create(context: Context, args: list): cloud = None region = None - params = { - 'application': 'cloudman-20', - 'application_version': 'dev' - } + params = {'application': 'cloudman-20', 'application_version': 'dev'} config = { "config_cloudlaunch": { "rootStorageType": "volume", "rootStorageSize": 42, - "keyPair": "" + "keyPair": "", }, - "config_cloudman2": { - "clusterPassword": "gvl_letmein" - } - } - targets = { - 'aws': 11, - 'gcp': 16 + "config_cloudman2": {"clusterPassword": "gvl_letmein"}, } + targets = {'aws': 11, 'gcp': 16} regions = { 'aws': { 'us-east-1': 11, 'us-east-2': 12, 'us-west-1': 13, 'us-west-2': 14, - 'us-east-1b': 36 + 'us-east-1b': 36, }, - 'gcp': { - 'us-central1': 16 - } + 'gcp': {'us-central1': 16}, } while len(args) > 0: arg = args.pop(0) @@ -112,7 +105,7 @@ def create(context: Context, args: list): print(f"ERROR: the cloud provider has already been specified: {cloud}") return cloud = arg - #params['deployment_target_id'] = targets[cloud] + # params['deployment_target_id'] = targets[cloud] elif arg in ['-c', '--config']: filepath = args.pop(0) with open(filepath, 'r') as f: @@ -126,7 +119,9 @@ def create(context: Context, args: list): elif arg in ['-r', '--region']: region = args.pop(0) elif 'name' in params: - print(f"ERROR: the cluster name has already been specified: {params['name']}") + print( + f"ERROR: the cluster name has already been specified: {params['name']}" + ) return else: params['name'] = arg @@ -162,7 +157,7 @@ def create(context: Context, args: list): _print_deployments([new_deployment]) except Exception as e: print("Unable to launch the cluster") - #traceback.print_tb(e.__traceback__) + # traceback.print_tb(e.__traceback__) print(e) @@ -182,20 +177,22 @@ def delete(context: Context, args: list): config = configparser.ConfigParser() config.read(configfile) - #cloudlaunch_client = create_api_client(args[0]) - #cloudlaunch_client.deployments.delete(args[1]) + # cloudlaunch_client = create_api_client(args[0]) + # cloudlaunch_client.deployments.delete(args[1]) url = config['cloudlaunch-cli']['url'] token = config['cloudlaunch-cli']['token'] headers = { 'Accept': 'application/json', 'Content-type': 'application/json', - 'Authorization': f"Token {token}" + 'Authorization': f"Token {token}", } data = dict(action='DELETE') for id in args: print(f"URL is: {url}/deployments/{id}/tasks/") - response = requests.post(f"{url}/deployments/{id}/tasks/", json=data, headers=headers) + response = requests.post( + f"{url}/deployments/{id}/tasks/", json=data, headers=headers + ) if response.status_code < 300: print(f"Deleted deployment {id}") else: @@ -206,25 +203,36 @@ def delete(context: Context, args: list): def _print_deployments(deployments): if len(deployments) > 0: - print("{:6s} {:24s} {:6s} {:15s} {:15s} {:s}".format( - "ID", "Name", "Cloud", "Created", "Address", "Status")) + print( + "{:6s} {:24s} {:6s} {:15s} {:15s} {:s}".format( + "ID", "Name", "Cloud", "Created", "Address", "Status" + ) + ) else: print("No deployments.") for deployment in deployments: created_date = arrow.get(deployment.added) latest_task = deployment.latest_task - latest_task_status = latest_task.instance_status \ - if latest_task.instance_status else latest_task.status + latest_task_status = ( + latest_task.instance_status + if latest_task.instance_status + else latest_task.status + ) latest_task_display = "{action}:{latest_task_status}".format( - action=latest_task.action, - latest_task_status=latest_task_status) + action=latest_task.action, latest_task_status=latest_task_status + ) ip_address = deployment.public_ip if deployment.public_ip else 'N/A' cloud = deployment._data['deployment_target']['target_zone']['cloud']['id'] - print("{identifier:6d} {name:24.24s} {cloud:6.6s} {created_date:15.15s} " - "{ip_address:15.15s} {latest_task_display}".format( - identifier=deployment._id, cloud=cloud, - created_date=created_date.humanize(), - latest_task_display=latest_task_display, - ip_address=ip_address, **deployment._data)) - #pprint(deployment._data) - #print() \ No newline at end of file + print( + "{identifier:6d} {name:24.24s} {cloud:6.6s} {created_date:15.15s} " + "{ip_address:15.15s} {latest_task_display}".format( + identifier=deployment._id, + cloud=cloud, + created_date=created_date.humanize(), + latest_task_display=latest_task_display, + ip_address=ip_address, + **deployment._data, + ) + ) + # pprint(deployment._data) + # print() diff --git a/abm/lib/common.py b/abm/lib/common.py index 267fc1b..dbb2402 100644 --- a/abm/lib/common.py +++ b/abm/lib/common.py @@ -1,12 +1,12 @@ +import json import os -import sys import subprocess -from ruamel.yaml import YAML -import json -import bioblend.galaxy -from bioblend.galaxy import dataset_collections +import sys +import bioblend.galaxy import lib +from bioblend.galaxy import dataset_collections +from ruamel.yaml import YAML PROFILE_SEARCH_PATH = ['~/.abm/profile.yml', '.abm-profile.yml'] @@ -17,11 +17,12 @@ "ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR592/SRR592109/SRR592109_2.fastq.gz", "ftp://ftp.sra.ebi.ac.uk/vol1/fastq/SRR233/SRR233167/SRR233167_2.fastq.gz", "ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR047/ERR047678/ERR047678_1.fastq.gz", - "ftp://ftp-trace.ncbi.nlm.nih.gov/ReferenceSamples/giab/data/AshkenazimTrio/HG003_NA24149_father/NIST_Illumina_2x250bps/reads/D2_S1_L002_R1_002.fastq.gz" + "ftp://ftp-trace.ncbi.nlm.nih.gov/ReferenceSamples/giab/data/AshkenazimTrio/HG003_NA24149_father/NIST_Illumina_2x250bps/reads/D2_S1_L002_R1_002.fastq.gz", ], - "rna": [] + "rna": [], } + class Context: def __init__(self, *args): if len(args) == 1: @@ -39,8 +40,9 @@ def __init__(self, *args): self.API_KEY = args[1] self.KUBECONFIG = args[2] else: - raise Exception(f'Invalid args for Context. Expected one or three, found {len(args)}') - + raise Exception( + f'Invalid args for Context. Expected one or three, found {len(args)}' + ) def print_json(obj, indent=2): @@ -51,7 +53,7 @@ def print_yaml(obj): get_yaml_parser().dump(obj, sys.stdout) -def connect(context:Context): +def connect(context: Context): """ Create a connection to the Galaxy instance @@ -71,7 +73,6 @@ def connect(context:Context): return gi - def _set_active_profile(profile_name: str): # print(f"Parsing profile for {profile_name}") lib.GALAXY_SERVER, lib.API_KEY, lib.KUBECONFIG = parse_profile(profile_name) @@ -132,7 +133,9 @@ def parse_profile(profile_name: str): print(f'ERROR: {profile_name} is not the name of a valid profile.') keys = list(profiles.keys()) if len(keys) > 0: - quoted_keys = ', '.join([f"'{k}'" for k in keys[0:-2]]) + f", and '{keys[-1]}'" + quoted_keys = ( + ', '.join([f"'{k}'" for k in keys[0:-2]]) + f", and '{keys[-1]}'" + ) print(f'The defined profile names are: {quoted_keys}') return None, None, None profile = profiles[profile_name] @@ -141,7 +144,7 @@ def parse_profile(profile_name: str): return (profile['url'], profile['key'], None) -def run(command, env:dict= None): +def run(command, env: dict = None): if env is None: env = os.environ # if env is not None: @@ -159,7 +162,7 @@ def run(command, env:dict= None): def get_env(context: Context): copy = os.environ.copy() - for key,value in context.__dict__.items(): + for key, value in context.__dict__.items(): if value is not None: copy[key] = value return copy @@ -189,33 +192,34 @@ def find_executable(name): # "swaptotal", # "uname" + def summarize_metrics(gi, jobs: list): - header= [ - "id", - "history_id", - "history_name", - "state", - "tool_id", - "invocation_id", - "workflow_id", - "cpuacct.usage", - # "end_epoch", - "galaxy_memory_mb", - "galaxy_slots", - # "memory.failcnt", - "memory.limit_in_bytes", - "memory.max_usage_in_bytes", - # "memory.memsw.limit_in_bytes", - # "memory.memsw.max_usage_in_bytes", - # "memory.oom_control.oom_kill_disable", - # "memory.oom_control.under_oom", - "memory.soft_limit_in_bytes", - "memtotal", - "processor_count", - "runtime_seconds", - # "start_epoch", - # "swaptotal", - # "uname" + header = [ + "id", + "history_id", + "history_name", + "state", + "tool_id", + "invocation_id", + "workflow_id", + "cpuacct.usage", + # "end_epoch", + "galaxy_memory_mb", + "galaxy_slots", + # "memory.failcnt", + "memory.limit_in_bytes", + "memory.max_usage_in_bytes", + # "memory.memsw.limit_in_bytes", + # "memory.memsw.max_usage_in_bytes", + # "memory.oom_control.oom_kill_disable", + # "memory.oom_control.under_oom", + "memory.soft_limit_in_bytes", + "memtotal", + "processor_count", + "runtime_seconds", + # "start_epoch", + # "swaptotal", + # "uname" ] print(','.join(header)) @@ -223,12 +227,12 @@ def summarize_metrics(gi, jobs: list): job_metrics = gi.jobs.get_metrics(job['id']) row = [] metrics = metrics_to_dict(job_metrics, header) - metrics['id'] = job['id'] - metrics['history_id'] = job['history_id'] - metrics['history_name'] = job['history_name'] - metrics['state'] = job['state'] - metrics['tool_id'] = job['tool_id'] - metrics['invocation_id'] = job['invocation_id'] + metrics['id'] = job.get('id', 'unknown') + metrics['history_id'] = job.get('history_id', 'unknown') + metrics['history_name'] = job.get('history_name', 'unknown') + metrics['state'] = job.get('state', 'unknown') + metrics['tool_id'] = job.get('tool_id', 'unknown') + metrics['invocation_id'] = job.get('invocation_id', 'unknown') for key in header: if key in metrics: row.append(metrics[key]) @@ -237,7 +241,7 @@ def summarize_metrics(gi, jobs: list): print(','.join(row), end='\n') -def metrics_to_dict(metrics: list, accept:list): +def metrics_to_dict(metrics: list, accept: list): result = dict() for m in metrics: key = m['name'] @@ -276,7 +280,7 @@ def make_result(data): return { 'id': data['id'], 'size': data['file_size'], - 'history': data['history_id'] + 'history': data['history_id'], } try: @@ -286,7 +290,9 @@ def make_result(data): pass try: - datasets = gi.datasets.get_datasets(name=name_or_id) # , deleted=True, purged=True) + datasets = gi.datasets.get_datasets( + name=name_or_id + ) # , deleted=True, purged=True) for ds in datasets: # print_json(ds) state = True @@ -305,5 +311,3 @@ def make_result(data): def _make_dataset_element(name, value): # print(f"Making dataset element for {name} = {value}({type(value)})") return dataset_collections.HistoryDatasetElement(name=name, id=value) - - diff --git a/abm/lib/config.py b/abm/lib/config.py index 34da927..cbb71a5 100644 --- a/abm/lib/config.py +++ b/abm/lib/config.py @@ -1,9 +1,10 @@ -from common import load_profiles, save_profiles, get_yaml_parser, Context - import os -import yaml from pathlib import Path -from common import print_json, print_yaml + +import yaml +from common import (Context, get_yaml_parser, load_profiles, print_json, + print_yaml, save_profiles) + def list(context: Context, args: list): profiles = load_profiles() @@ -21,7 +22,7 @@ def create(context: Context, args: list): if profile_name in profiles: print("ERROR: a cloud configuration with that name already exists.") return - profile = { 'url': "", 'key': '', 'kube': args[1]} + profile = {'url': "", 'key': '', 'kube': args[1]} profiles[profile_name] = profile save_profiles(profiles) print_json(profile) @@ -83,6 +84,7 @@ def show(context: Context, args: list): return print_json(profiles[args[0]]) + def workflows(context: Context, args: list): userfile = os.path.join(Path.home(), ".abm", "workflows.yml") if len(args) == 0 or args[0] in ['list', 'ls']: @@ -90,12 +92,16 @@ def workflows(context: Context, args: list): if workflows is None: return print(f"Workflows defined in {userfile}") - for key,url in workflows.items(): + for key, url in workflows.items(): print(f"{key:10} {url}") - elif args[0] in ['delete', 'del','rm']: - print(f"Deleting workflows is not supported at this time. Please edit {userfile} directly.") + elif args[0] in ['delete', 'del', 'rm']: + print( + f"Deleting workflows is not supported at this time. Please edit {userfile} directly." + ) elif args[0] in ['add', 'new']: - print(f"Adding workflows is not supported at this time. Please edit {userfile} directly.") + print( + f"Adding workflows is not supported at this time. Please edit {userfile} directly." + ) else: print(f"ERROR: Unrecognized command {args[0]}") @@ -107,17 +113,20 @@ def datasets(context: Context, args: list): if datasets is None: return print(f"Datasets defined in {userfile}") - for key,url in datasets.items(): + for key, url in datasets.items(): print(f"{key:10} {url}") - elif args[0] in ['delete', 'del','rm']: - print(f"Deleting datasets is not supported at this time. Please edit {userfile} directly.") + elif args[0] in ['delete', 'del', 'rm']: + print( + f"Deleting datasets is not supported at this time. Please edit {userfile} directly." + ) elif args[0] in ['add', 'new']: - print(f"Adding datasets is not supported at this time. Please edit {userfile} directly.") + print( + f"Adding datasets is not supported at this time. Please edit {userfile} directly." + ) else: print(f"ERROR: Unrecognized command {args[0]}") - def histories(context: Context, args: list): userfile = os.path.join(Path.home(), ".abm", "histories.yml") if len(args) == 0 or args[0] in ['list', 'ls']: @@ -125,12 +134,16 @@ def histories(context: Context, args: list): if histories is None: return print(f"Datasets defined in {userfile}") - for key,url in histories.items(): + for key, url in histories.items(): print(f"{key:10} {url}") - elif args[0] in ['delete', 'del','rm']: - print(f"Deleting history entries is not supported at this time. Please edit {userfile} directly.") + elif args[0] in ['delete', 'del', 'rm']: + print( + f"Deleting history entries is not supported at this time. Please edit {userfile} directly." + ) elif args[0] in ['add', 'new']: - print(f"Adding dataset entries is not supported at this time. Please edit {userfile} directly.") + print( + f"Adding dataset entries is not supported at this time. Please edit {userfile} directly." + ) else: print(f"ERROR: Unrecognized command {args[0]}") @@ -150,6 +163,7 @@ def histories(context: Context, args: list): # return None # return _load_config(userfile) + def _load_config(filepath): if not os.path.exists(filepath): print(f"ERROR: configuration file not found: {filepath}") diff --git a/abm/lib/dataset.py b/abm/lib/dataset.py index 103d78f..6ae32a8 100644 --- a/abm/lib/dataset.py +++ b/abm/lib/dataset.py @@ -1,19 +1,17 @@ import json - -from bioblend.galaxy import dataset_collections -from common import connect, Context, print_json, _get_dataset_data, _make_dataset_element, find_history -from pprint import pprint +import os from pathlib import Path +from pprint import pprint -import os import yaml +from bioblend.galaxy import dataset_collections +from common import (Context, _get_dataset_data, _make_dataset_element, connect, + find_history, print_json) + def list(context: Context, args: list): gi = connect(context) - kwargs = { - 'limit': 10000, - 'offset': 0 - } + kwargs = {'limit': 10000, 'offset': 0} if len(args) > 0: if args[0] in ['-s', '--state']: if len(args) != 2: @@ -23,7 +21,7 @@ def list(context: Context, args: list): else: print(f"ERROR: Invalid parameter: {args[0]}") return - #datasets = gi.datasets.get_datasets(limit=10000, offset=0) # , deleted=True, purged=True) + # datasets = gi.datasets.get_datasets(limit=10000, offset=0) # , deleted=True, purged=True) datasets = gi.datasets.get_datasets(**kwargs) if len(datasets) == 0: print('No datasets found') @@ -32,7 +30,9 @@ def list(context: Context, args: list): print('ID\tHistory\tType\tDeleted\tState\tName') for dataset in datasets: state = dataset['state'] if 'state' in dataset else 'unknown' - print(f"{dataset['id']}\t{dataset['history_id']}\t{dataset['history_content_type']}\t{dataset['deleted']}\t{state}\t{dataset['name']}") + print( + f"{dataset['id']}\t{dataset['history_id']}\t{dataset['history_content_type']}\t{dataset['deleted']}\t{state}\t{dataset['name']}" + ) def clean(context: Context, args: list): @@ -41,7 +41,9 @@ def clean(context: Context, args: list): else: invalid_states = args gi = connect(context) - datasets = gi.datasets.get_datasets(limit=10000, offset=0) # , deleted=True, purged=True) + datasets = gi.datasets.get_datasets( + limit=10000, offset=0 + ) # , deleted=True, purged=True) if len(datasets) == 0: print('No datasets found') return @@ -118,7 +120,7 @@ def collection(context: Context, args: list): elif arg == '-n' or arg == '--name': collection_name = args.pop(0) elif '=' in arg: - name,value = arg.split('=') + name, value = arg.split('=') dataset = _get_dataset_data(gi, value) if dataset is None: print(f"ERROR: dataset not found {value}") @@ -136,10 +138,8 @@ def collection(context: Context, args: list): result = gi.histories.create_dataset_collection( history_id=hid, collection_description=dataset_collections.CollectionDescription( - name=collection_name, - type=type, - elements=elements - ) + name=collection_name, type=type, elements=elements + ), ) print(json.dumps(result, indent=4)) @@ -181,7 +181,7 @@ def import_from_config(context: Context, args: list): gi = connect(context) if history is not None: history = find_history(gi, history) - + response = gi.tools.put_url(url, history, **kwargs) print(json.dumps(response, indent=4)) @@ -224,10 +224,7 @@ def rename(context: Context, args: list): return gi = connect(context) response = gi.histories.update_dataset(args[0], args[1], name=args[2]) - result = { - 'state': response['state'], - 'name': response['name'] - } + result = {'state': response['state'], 'name': response['name']} print(json.dumps(result, indent=4)) diff --git a/abm/lib/experiment.py b/abm/lib/experiment.py index 178f82e..9dead51 100644 --- a/abm/lib/experiment.py +++ b/abm/lib/experiment.py @@ -1,21 +1,22 @@ -import os -import threading - -import yaml import json -import helm -import benchmark import logging +import os +import threading import traceback -from common import load_profiles, Context -from time import perf_counter from datetime import timedelta +from time import perf_counter + +import benchmark +import helm +import yaml +from common import Context, load_profiles INVOCATIONS_DIR = "invocations" METRICS_DIR = "metrics" log = logging.getLogger('abm') + def run(context: Context, args: list): """ Runs a single benchmark defined by *args[0]* @@ -78,12 +79,16 @@ def run_on_cloud(cloud: str, config: dict): for n in range(config['runs']): history_name_prefix = f"{n+1} {cloud} {conf}" for workflow_conf in config['benchmark_confs']: - benchmark.run(context, workflow_conf, history_name_prefix, config['name']) + benchmark.run( + context, workflow_conf, history_name_prefix, config['name'] + ) else: for n in range(config['runs']): history_name_prefix = f"{n+1} {cloud}" for workflow_conf in config['benchmark_confs']: - benchmark.run(context, workflow_conf, history_name_prefix, config['name']) + benchmark.run( + context, workflow_conf, history_name_prefix, config['name'] + ) def test(context: Context, args: list): @@ -94,7 +99,7 @@ def test(context: Context, args: list): print(data) -def parse_toolid(id:str) -> str: +def parse_toolid(id: str) -> str: parts = id.split('/') if len(parts) < 2: return f"{id},unknown" @@ -135,7 +140,7 @@ def summarize(context: Context, args: list): make_row = make_model_row header_row = "job_id,tool_id,tool_version,state,memory.max_usage_in_bytes,cpuacct.usage,process_count,galaxy_slots,runtime_seconds,ref_data_size,input_data_size_1,input_data_size_2" else: - print(f"Input dir {arg}") + # print(f"Input dir {arg}") input_dirs.append(arg) if len(input_dirs) == 0: @@ -154,22 +159,33 @@ def summarize(context: Context, args: list): with open(input_path, 'r') as f: data = json.load(f) if data['metrics']['tool_id'] == 'upload1': - #print('Ignoring upload tool') + # print('Ignoring upload tool') continue row = make_row(data) - print(separator.join([ str(x) for x in row])) + print(separator.join([str(x) for x in row])) except Exception as e: # Silently fail to allow the remainder of the table to be generated. print(f"Unable to process {input_path}") print(e) - traceback.print_exc( ) - #pass + traceback.print_exc() + # pass + +accept_metrics = [ + 'galaxy_slots', + 'galaxy_memory_mb', + 'runtime_seconds', + 'cpuacct.usage', + 'memory.limit_in_bytes', + 'memory.max_usage_in_bytes', +] # ,'memory.soft_limit_in_bytes'] -accept_metrics = ['galaxy_slots', 'galaxy_memory_mb', 'runtime_seconds', 'cpuacct.usage','memory.limit_in_bytes', 'memory.max_usage_in_bytes'] #,'memory.soft_limit_in_bytes'] def make_table_row(data: dict): - row = [ str(data[key]) for key in ['run', 'cloud', 'job_conf', 'workflow_id', 'history_id', 'inputs']] + row = [ + str(data[key]) + for key in ['run', 'cloud', 'job_conf', 'workflow_id', 'history_id', 'inputs'] + ] row.append(parse_toolid(data['metrics']['tool_id'])) row.append(data['metrics']['state']) for e in _get_metrics(data['metrics']['job_metrics']): @@ -186,7 +202,7 @@ def make_model_row(data: dict): row.append(tool_id.split('/')[-1]) row.append(metrics['state']) job_metrics = parse_job_metrics(metrics['job_metrics']) - row.append(job_metrics.get('memory.max_usage_in_bytes',0)) + row.append(job_metrics.get('memory.max_usage_in_bytes', 0)) row.append(job_metrics.get('cpuacct.usage', 0)) row.append(job_metrics.get('processor_count', 0)) row.append(job_metrics.get('galaxy_slots', 0)) @@ -199,6 +215,7 @@ def make_model_row(data: dict): row.append(size) return row + def _get_metrics(metrics: list): row = [''] * len(accept_metrics) for job_metrics in metrics: @@ -210,6 +227,7 @@ def _get_metrics(metrics: list): pass return row + def add_metrics_to_row(metrics_list: list, row: list): for job_metrics in metrics_list: if job_metrics['name'] in accept_metrics: @@ -224,4 +242,4 @@ def parse_job_metrics(metrics_list: list): metrics = {} for job_metrics in metrics_list: metrics[job_metrics['name']] = job_metrics['raw_value'] - return metrics \ No newline at end of file + return metrics diff --git a/abm/lib/folder.py b/abm/lib/folder.py index a8ceb4f..06760a5 100644 --- a/abm/lib/folder.py +++ b/abm/lib/folder.py @@ -1,5 +1,7 @@ from pprint import pprint -from .common import connect, Context + +from .common import Context, connect + def list(context: Context, args: list): if len(args) == 0: @@ -12,7 +14,9 @@ def list(context: Context, args: list): def create(context: Context, args: list): if len(args) < 2: - print("ERROR: Invalid parameters. Required the library ID, folder name and folder description (optional") + print( + "ERROR: Invalid parameters. Required the library ID, folder name and folder description (optional" + ) return library_id = args[0] folder_name = args[1] @@ -26,5 +30,3 @@ def create(context: Context, args: list): def delete(context: Context, args: list): print("This functionality has not been implemented yet.") - - diff --git a/abm/lib/helm.py b/abm/lib/helm.py index ca0a6b4..52a6a60 100644 --- a/abm/lib/helm.py +++ b/abm/lib/helm.py @@ -1,17 +1,21 @@ +import argparse +import json import os import time -import argparse -from common import run, find_executable, get_env, Context + +from common import Context, find_executable, get_env, run def rollback(context: Context, args: list): helm = find_executable('helm') - #helm = 'helm' + # helm = 'helm' if helm is None: print('ERROR: helm is not available on the $PATH') return - print(f"Rolling back deployment on {context.GALAXY_SERVER} KUBECONFIG: {context.KUBECONFIG}") + print( + f"Rolling back deployment on {context.GALAXY_SERVER} KUBECONFIG: {context.KUBECONFIG}" + ) if len(args) > 0: command = f"{helm} rollback " + ' '.join(args) else: @@ -39,7 +43,7 @@ def update(context: Context, args: list): print(f"Applying rules {values} to {context.GALAXY_SERVER}") print(f"Helm update namespace: {namespace}") print(f"Helm update chart: {chart}") - #command = f'{helm} upgrade galaxy {chart} -n {namespace} --reuse-values --set-file jobs.rules."container_mapper_rules\.yml".content={rules}' + # command = f'{helm} upgrade galaxy {chart} -n {namespace} --reuse-values --set-file jobs.rules."container_mapper_rules\.yml".content={rules}' command = f'{helm} upgrade galaxy {chart} -n {namespace} --reuse-values -f {values}' env = get_env(context) try: @@ -54,8 +58,8 @@ def update(context: Context, args: list): print('Waiting for the new deployments to come online') # Give kubernetes a moment to start processing the update. - time.sleep(30) - wait_until_ready(namespace, env) + time.sleep(5) + wait_until_ready(namespace) return True @@ -87,11 +91,10 @@ def update_cli(context: Context, args: list): def wait(context: Context, args: list): namespace = args[0] if len(args) > 0 else 'galaxy' - env = get_env(context) - wait_until_ready(namespace, get_env(context)) + wait_until_ready(namespace) # , get_env(context)) -def filter(lines:list, item:str): +def filter(lines: list, item: str): result = [] for line in lines: if item in line: @@ -99,7 +102,7 @@ def filter(lines:list, item:str): return result -def wait_for(kubectl:str, namespace:str, name: str, env: dict): +def wait_for(kubectl: str, namespace: str, name: str, env: dict): print(f"Waiting for {name} on {env['GALAXY_SERVER']} to be in the Running state") waiting = True while waiting: @@ -122,16 +125,31 @@ def wait_for(kubectl:str, namespace:str, name: str, env: dict): print(f"{name} is running") -def wait_until_ready(namespace: str, env: dict): +# def wait_until_ready(namespace: str, env: dict): +# kubectl = find_executable('kubectl') +# if kubectl is None: +# print('ERROR: kubectl is not available on the $PATH') +# return +# wait_for(kubectl, namespace, 'galaxy-job', env) +# wait_for(kubectl, namespace, 'galaxy-web', env) +# wait_for(kubectl, namespace, 'galaxy-workflow', env) +def wait_until_ready(namespace: str): kubectl = find_executable('kubectl') - if kubectl is None: - print('ERROR: kubectl is not available on the $PATH') - return - wait_for(kubectl, namespace, 'galaxy-job', env) - wait_for(kubectl, namespace, 'galaxy-web', env) - wait_for(kubectl, namespace, 'galaxy-workflow', env) - - -def list(context: Context, args: list): + data = run(f"{kubectl} get deployment -n {namespace} -o json") + deployment_data = json.loads(data) + deployments = list() + for deployment in deployment_data['items']: + metadata = deployment['metadata'] + name = metadata['name'] + if 'job' in name or 'web' in name or 'workflow' in name: + deployments.append(name) + for deployment in deployments: + print( + run( + f"{kubectl} rollout status deployment -n {namespace} {deployment} --watch" + ) + ) + + +def _list(context: Context, args: list): print("Not implemented") - diff --git a/abm/lib/history.py b/abm/lib/history.py index 884b452..656fee0 100644 --- a/abm/lib/history.py +++ b/abm/lib/history.py @@ -1,16 +1,20 @@ import json import os import sys -import yaml - -from lib.common import connect, parse_profile, Context, summarize_metrics, find_history, print_json -from pprint import pprint +import time from pathlib import Path +from pprint import pprint + +import yaml +from bioblend.galaxy.objects import GalaxyInstance +from lib.common import (Context, connect, find_history, parse_profile, + print_json, summarize_metrics) # # History related functions # + def longest_name(histories: list): longest = 0 for history in histories: @@ -29,21 +33,23 @@ def print_histories(histories: list): if len(histories) == 0: print("There are no available histories.") return - + id_width = len(histories[0]['id']) name_width = longest_name(histories) - print(f"{'ID':<{id_width}} {'Name':<{name_width}} Deleted Public Tags" ) + print(f"{'ID':<{id_width}} {'Name':<{name_width}} Deleted Public Tags") for history in histories: - print(f"{history['id']:<{id_width}} {history['name']:<{name_width}} {pad(history['deleted'])} {pad(history['published'])} {', '.join(history['tags'])}") + print( + f"{history['id']:<{id_width}} {history['name']:<{name_width}} {pad(history['deleted'])} {pad(history['published'])} {', '.join(history['tags'])}" + ) -def list(context: Context, args: list): +def _list(context: Context, args: list): gi = connect(context) print_histories(gi.histories.get_histories()) if len(args) > 0: - if args[0] in [ 'all', '-a', '--all' ]: + if args[0] in ['all', '-a', '--all']: print('Histories Published by all users') print_histories(gi.histories.get_published_histories()) @@ -125,7 +131,7 @@ def export(context: Context, args: list): if '--no-wait' in args: wait = False args.remove('--no-wait') - if '-n' in args: + if '-n' in args: wait = False args.remove('-w') if len(args) == 0: @@ -184,7 +190,7 @@ def _import(context: Context, args: list): def himport(context: Context, args: list): - def error_message(msg = 'Invalid command'): + def error_message(msg='Invalid command'): print(f"ERROR: {msg}") print(f"USAGE: {sys.argv[0]} history import SERVER HISTORY_ID JEHA_ID") print(f" {sys.argv[0]} history import http://GALAXY_SERVER_URL") @@ -209,7 +215,7 @@ def error_message(msg = 'Invalid command'): with open(config, 'r') as f: datasets = yaml.safe_load(f) # Then load the user histories.yml, if any - userfile = os.path.join(Path.home(),".abm", "histories.yml") + userfile = os.path.join(Path.home(), ".abm", "histories.yml") if os.path.exists(userfile): if datasets is None: datasets = {} @@ -260,7 +266,7 @@ def create(context: Context, args: list): print(json.dumps(id, indent=4)) -def delete(context: Context, args:list): +def delete(context: Context, args: list): if len(args) != 1: print('ERROR: please provide the history ID') return @@ -272,7 +278,7 @@ def delete(context: Context, args:list): print(f"Deleted history {args[0]}") -def copy(context:Context, args:list): +def copy(context: Context, args: list): if len(args) != 2: print("ERROR: Invalid parameters. Provide a history ID and new history name.") return @@ -287,7 +293,7 @@ def copy(context:Context, args:list): print(json.dumps(new_history, indent=4)) -def purge(context: Context, args:list): +def purge(context: Context, args: list): if len(args) != 1: print("ERROR: Please pass a string used to filter histories to be deleted.") print("Use 'abm history purge *' to remove ALL histories.") @@ -318,7 +324,9 @@ def tag(context: Context, args: list): replace = True args.remove('-r') if len(args) < 2: - print("ERROR: Invalid command. Please provide the history ID and one or more tags.") + print( + "ERROR: Invalid command. Please provide the history ID and one or more tags." + ) return gi = connect(context) @@ -329,6 +337,7 @@ def tag(context: Context, args: list): gi.histories.update_history(hid, tags=args) print(f"Set history tags to: {', '.join(args)}") + def summarize(context: Context, args: list): if len(args) == 0: print("ERROR: Provide one or more history ID values.") @@ -360,3 +369,81 @@ def summarize(context: Context, args: list): # all_jobs.append(job) # summarize_metrics(gi, gi.jobs.get_jobs(history_id=args[0])) summarize_metrics(gi, all_jobs) + + +def wait(context: Context, args: list): + state = '' + if len(args) == 0: + print("ERROR: No history ID provided") + return + + gi = connect(context) + history_id = find_history(gi, args[0]) + if history_id is None: + print("ERROR: No such history") + return + wait_for(gi, history_id) + + +def wait_for(gi: GalaxyInstance, history_id: str): + errored = [] + waiting = True + job_states = JobStates() + while waiting: + restart = [] + status_counts = dict() + terminal = 0 + job_list = gi.jobs.get_jobs(history_id=history_id) + for job in job_list: + job_states.update(job) + state = job['state'] + id = job['id'] + # Count how many jobs are in each state. + if state not in status_counts: + status_counts[state] = 1 + else: + status_counts[state] += 1 + # Count jobs in a terminal state and mark failed jobs for a restart + if state == 'ok': + terminal += 1 + elif state == 'error': + terminal += 1 + if id not in errored: + restart.append(id) + errored.append(id) + if len(restart) > 0: + for job in restart: + print(f"Restaring job {job}") + try: + gi.jobs.rerun_job(job, remap=True) + except: + try: + gi.jobs.rerun_job(job, remap=False) + except: + print(f"Failed to restart job {job}") + waiting = False + elif len(job_list) == terminal: + print("All jobs are in a terminal state") + waiting = False + if waiting: + time.sleep(30) + # elif state == 'paused': + # paused += 1 + # print(f"{job['id']}\t{job['state']}\t{job['update_time']}\t{job['tool_id']}") + + +class JobStates: + def __init__(self): + self._jobs = dict() + + def update(self, job): + id = job['id'] + state = job['state'] + tool = job['tool_id'] + if '/' in tool: + tool = tool.split('/')[-2] + if id not in self._jobs: + print(f"Job {id} {tool} state {state}") + elif state != self._jobs[id]: + print(f"Job {id} {tool} {self._jobs[id]} -> {state}") + self._jobs[id] = state diff --git a/abm/lib/invocation.py b/abm/lib/invocation.py index 09cf94b..0e6807f 100644 --- a/abm/lib/invocation.py +++ b/abm/lib/invocation.py @@ -1,4 +1,5 @@ -from common import Context, connect, summarize_metrics, print_json +from common import Context, connect, print_json, summarize_metrics + def doList(context: Context, args: list): wid = None diff --git a/abm/lib/job.py b/abm/lib/job.py index 66bccf5..c0b8ff5 100644 --- a/abm/lib/job.py +++ b/abm/lib/job.py @@ -1,9 +1,9 @@ import json +import logging import time - -from .common import connect, Context, print_json, find_history from pprint import pprint -import logging + +from .common import Context, connect, find_history, print_json log = logging.getLogger('abm') @@ -53,7 +53,7 @@ def show(context: Context, args: list): print(json.dumps(job, indent=4)) -def wait(context:Context, args: list): +def wait(context: Context, args: list): if len(args) != 1: print("ERROR: Invalid parameters. Job ID is required") return @@ -89,22 +89,26 @@ def metrics(context: Context, args: list): job_list = gi.jobs.get_jobs(history_id=history_id) metrics = [] for job in job_list: - metrics.append({ - 'job_id': job['id'], - 'job_state': job['state'], - 'tool_id': job['tool_id'], - 'job_metrics': gi.jobs.get_metrics(job['id']) - }) + metrics.append( + { + 'job_id': job['id'], + 'job_state': job['state'], + 'tool_id': job['tool_id'], + 'job_metrics': gi.jobs.get_metrics(job['id']), + } + ) else: print(f"ERROR: Unrecognized argument {arg}") else: job = gi.jobs.show_job(args[0]) - metrics = [{ - 'job_id': job['id'], - 'job_state': job['state'], - 'tool_id': job['tool_id'], - 'job_metrics': gi.jobs.get_metrics(args[0]) - }] + metrics = [ + { + 'job_id': job['id'], + 'job_state': job['state'], + 'tool_id': job['tool_id'], + 'job_metrics': gi.jobs.get_metrics(args[0]), + } + ] print(json.dumps(metrics, indent=4)) # metrics = {} # for m in gi.jobs.get_metrics(args[0]): @@ -134,14 +138,18 @@ def cancel(context: Context, args: list): return if state or history: if len(jobs) > 0: - print("ERROR: To many parameters. Either filter by state or history, or list job IDs") + print( + "ERROR: To many parameters. Either filter by state or history, or list job IDs" + ) return - jobs = [ job['id'] for job in gi.jobs.get_jobs(state=state, history_id=history) ] + jobs = [job['id'] for job in gi.jobs.get_jobs(state=state, history_id=history)] for job in jobs: if gi.jobs.cancel_job(job): print(f"Job {job} canceled") else: - print(f"ERROR: Unable to cancel {job}, job was already in a terminal state.") + print( + f"ERROR: Unable to cancel {job}, job was already in a terminal state." + ) def problems(context: Context, args: list): @@ -153,9 +161,16 @@ def problems(context: Context, args: list): def rerun(context: Context, args: list): + remap = False + if '-r' in args: + remap = True + args.remove('-r') + if '--remap' in args: + remap = True + args.remove('--remap') if len(args) == 0: print("ERROR: no job ID provided") return gi = connect(context) - result = gi.jobs.rerun_job(args[0], remap=True) - print_json(result) \ No newline at end of file + result = gi.jobs.rerun_job(args[0], remap=remap) + print_json(result) diff --git a/abm/lib/kubectl.py b/abm/lib/kubectl.py index 1e3e103..bde766d 100644 --- a/abm/lib/kubectl.py +++ b/abm/lib/kubectl.py @@ -1,7 +1,7 @@ import json - from pprint import pprint -from common import get_env, run, find_executable, Context + +from common import Context, find_executable, get_env, run def pods(context: Context, args: list): @@ -45,4 +45,4 @@ def get_url(context: Context, args: list): protocol = ports['name'] port = ports['port'] ip = data['status']['loadBalancer']['ingress'][0]['ip'] - return f"{protocol}://{ip}:{port}/{name}/" \ No newline at end of file + return f"{protocol}://{ip}:{port}/{name}/" diff --git a/abm/lib/library.py b/abm/lib/library.py index 73f5d01..8add917 100644 --- a/abm/lib/library.py +++ b/abm/lib/library.py @@ -1,5 +1,6 @@ from pprint import pprint -from .common import connect, datasets, Context + +from .common import Context, connect, datasets def list(context: Context, args: list): @@ -32,7 +33,9 @@ def delete(context: Context, args: list): def upload(context: Context, args: list): if len(args) != 3: - print("ERROR: Invalid parameters. Specify the library and folder names and the dataset to upload") + print( + "ERROR: Invalid parameters. Specify the library and folder names and the dataset to upload" + ) return gi = connect(context) libraries = gi.libraries.get_libraries(name=args[0]) @@ -51,7 +54,9 @@ def upload(context: Context, args: list): return folder_id = folders[0]['id'] dataset_url = datasets['dna'][int(args[2])] - result = gi.libraries.upload_file_from_url(library_id, dataset_url, folder_id=folder_id) + result = gi.libraries.upload_file_from_url( + library_id, dataset_url, folder_id=folder_id + ) pprint(result) return @@ -72,4 +77,3 @@ def download(context: Context, args: list): def show(context: Context, args: list): print("library show not implemented") - diff --git a/abm/lib/menu.yml b/abm/lib/menu.yml index 4c79b18..9aafa1f 100644 --- a/abm/lib/menu.yml +++ b/abm/lib/menu.yml @@ -125,7 +125,7 @@ help: manage histories menu: - name: ['list', 'ls'] - handler: history.list + handler: history._list help: list histories on the server. params: "[-a|--all]" - name: ['import', 'imp', 'im'] @@ -182,6 +182,10 @@ params: STR help: delete all histories that contain STR in the name. Use * to purge all histories. handler: history.purge + - name: [ wait ] + handler: history.wait + help: Wait for all jobs in the history to enter a terminal state (ok or error) + params: ID - name: [ jobs, job ] help: manage jobs on the server menu: @@ -278,7 +282,7 @@ help: wait for the deployment to come online - name: [list, ls] help: list some things... - handler: helm.list + handler: helm._list - name: [kubectl, kube, k] help: execute a kubectl command menu: diff --git a/abm/lib/threads/Latch.py b/abm/lib/threads/Latch.py index ff7296c..8db238d 100644 --- a/abm/lib/threads/Latch.py +++ b/abm/lib/threads/Latch.py @@ -1,19 +1,18 @@ import threading + class CountdownLatch: - def __init__(self, count = 1): + def __init__(self, count=1): self.count = count self.lock = threading.Condition - - def count_down(self, count = 1): + def count_down(self, count=1): self.lock.acquire(True) self.count -= count if self.count <= 0: self.lock.notifyAll() self.lock.release() - def wait(self): self.lock.acquire(True) while self.count > 0: diff --git a/abm/lib/users.py b/abm/lib/users.py index ddf7763..74353fb 100644 --- a/abm/lib/users.py +++ b/abm/lib/users.py @@ -1,9 +1,7 @@ import json - from pprint import pprint from bioblend.galaxy import GalaxyInstance - from common import Context, connect @@ -42,7 +40,7 @@ def get_api_key(context: Context, args: list): return key -def create(context: Context, args:list): +def create(context: Context, args: list): if len(args) != 3: print("ERROR: Please specify the username, email, and password") return @@ -60,16 +58,11 @@ def create(context: Context, args:list): user_record = gi.users.create_local_user(name, email, password) id = user_record['id'] key = gi.users.create_user_apikey(id) - result = { - 'name': name, - 'email': email, - 'id': id, - 'key': key - } + result = {'name': name, 'email': email, 'id': id, 'key': key} print(json.dumps(result, indent=4)) -def show(context: Context, args:list): +def show(context: Context, args: list): if len(args) == 0: print("ERROR: no user email given") return @@ -84,7 +77,7 @@ def show(context: Context, args:list): print(json.dumps(result, indent=4)) -def usage(context:Context, args:list): +def usage(context: Context, args: list): if len(args) == 0: print("ERROR: no user email given") return @@ -117,5 +110,7 @@ def _get_user_id(gi: GalaxyInstance, name_or_email: str) -> str: print("WARNING: no such user") return None if len(user_list) > 1: - print("WARNING: more than one user with that email address. Returning the first") + print( + "WARNING: more than one user with that email address. Returning the first" + ) return user_list[0]['id'] diff --git a/abm/lib/workflow.py b/abm/lib/workflow.py index 35a20f5..0ba36f7 100644 --- a/abm/lib/workflow.py +++ b/abm/lib/workflow.py @@ -1,19 +1,19 @@ -import os import json import logging - +import os +from pathlib import Path from pprint import pprint +import planemo import requests import yaml -import planemo -from planemo.runnable import for_path, for_uri +from common import Context, connect, summarize_metrics from planemo.galaxy.workflows import install_shed_repos -from common import connect, Context, summarize_metrics -from pathlib import Path +from planemo.runnable import for_path, for_uri log = logging.getLogger('abm') + def list(context: Context, args: list): gi = connect(context) workflows = gi.workflows.get_workflows(published=True) @@ -53,7 +53,7 @@ def upload(context: Context, args: list): pprint(result) -def import_from_url(context: Context, args:list): +def import_from_url(context: Context, args: list): if len(args) == 0: print("ERROR: no workflow URL given") return @@ -72,8 +72,10 @@ def import_from_url(context: Context, args:list): input_text = f.read() else: response = requests.get(url) - if (response.status_code != 200): - print(f"ERROR: There was a problem downloading the workflow: {response.status_code}") + if response.status_code != 200: + print( + f"ERROR: There was a problem downloading the workflow: {response.status_code}" + ) print(response.reason) return input_text = response.text @@ -102,7 +104,7 @@ def import_from_url(context: Context, args:list): pprint(result) -def import_from_config(context: Context, args:list): +def import_from_config(context: Context, args: list): if len(args) == 0: print("ERROR: no workflow ID given") return @@ -119,7 +121,7 @@ def import_from_config(context: Context, args:list): return url = workflows[key] - import_from_url(context, [ url ]) + import_from_url(context, [url]) def download(context: Context, args: list): @@ -145,7 +147,7 @@ def show(context: Context, args: list): print(json.dumps(result, indent=4)) -def inputs(context:Context, args:list): +def inputs(context: Context, args: list): if len(args) == 0: print('ERROR: no workflow ID given') return @@ -157,7 +159,7 @@ def inputs(context:Context, args:list): print(json.dumps(input_dict, indent=4)) -def invocation(context:Context, args:list): +def invocation(context: Context, args: list): if len(args) != 2: print("ERROR: Invalid paramaeters. A workflow ID invocation ID are required") return @@ -182,7 +184,9 @@ def invocation(context:Context, args:list): # return gi = connect(context) # result = gi.workflows.show_invocation(workflow_id, invocation_id) - invocations = gi.invocations.get_invocations(workflow_id=workflow_id, view='element', step_details=True) + invocations = gi.invocations.get_invocations( + workflow_id=workflow_id, view='element', step_details=True + ) # print(json.dumps(result, indent=4)) print('ID\tState\tWorkflow\tHistory') for invocation in invocations: @@ -215,7 +219,7 @@ def test(context: Context, args: list): def publish(context: Context, args: list): if len(args) != 1: - print("USAGE: publish ID" ) + print("USAGE: publish ID") return gi = connect(context) result = gi.workflows.update_workflow(args[0], published=True) @@ -230,6 +234,7 @@ def rename(context: Context, args: list): result = gi.workflows.update_workflow(args[0], name=args[1]) print(f"Renamed workflow to {result['name']}") + def summarize(context: Context, args: list): if len(args) == 0: print("ERROR: Provide one or more workflow ID values.") diff --git a/setup.sh b/setup.sh index a113158..d7c73b1 100644 --- a/setup.sh +++ b/setup.sh @@ -1,2 +1,2 @@ -alias abm="python3 -m abm" +alias abm="python -m abm" source .venv/bin/activate