diff --git a/abm/lib/__init__.py b/abm/lib/__init__.py index 2cdb864..4efc7e1 100644 --- a/abm/lib/__init__.py +++ b/abm/lib/__init__.py @@ -4,14 +4,16 @@ sys.path.append(os.path.dirname(os.path.realpath(__file__))) -# from common import parse_profile - +# Where the workflow invocation data returned by Galaxy will be saved. INVOCATIONS_DIR = "invocations" +# Where workflow runtime metrics will be saved. METRICS_DIR = "metrics" +# Global instance of a YAML parser so we can reuse it if needed. parser = None +# Keys used in various dictionaries. class Keys: NAME = 'name' RUNS = 'runs' diff --git a/abm/lib/benchmark.py b/abm/lib/benchmark.py index 3505158..af2ecf8 100644 --- a/abm/lib/benchmark.py +++ b/abm/lib/benchmark.py @@ -16,13 +16,10 @@ def run_cli(context: Context, args: list): """ - Runs a single workflow defined by *args[0]* + Command line handler to run a single benchmark. - :param args: a list that contains: - args[0] - the path to the benchmark configuration file - args[1] - the prefix to use when creating the new history in Galaxy - args[2] - the name of the experiment, if part of one. This is used to - generate output folder names. + :param context: a context object the defines how to connect to the Galaxy server. + :param args: parameters from the command line :return: True if the workflows completed sucessfully. False otherwise. """ @@ -43,11 +40,15 @@ def run_cli(context: Context, args: list): def run(context: Context, workflow_path, history_prefix: str, experiment: str): - # if len(args) > 1: - # history_prefix = args[1] - # if len(args) > 2: - # experiment = args[2].replace(' ', '_').lower() + """ + Does the actual work of running a benchmark. + :param context: a context object the defines how to connect to the Galaxy server. + :param workflow_path: path to the ABM workflow file. (benchmark really). NOTE this is NOT the Galaxy .ga file. + :param history_prefix: a prefix value used when generating new history names. + :param experiment: the name of the experiment (arbitrary string). Used to generate new history names. + :return: True if the workflow run completed successfully. False otherwise. + """ if os.path.exists(INVOCATIONS_DIR): if not os.path.isdir(INVOCATIONS_DIR): print('ERROR: Can not save invocation status, directory name in use.') @@ -76,7 +77,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): workflows = parse_workflow(workflow_path) if not workflows: print(f"Unable to load any workflow definitions from {workflow_path}") - return + return False print(f"Found {len(workflows)} workflow definitions") for workflow in workflows: @@ -173,7 +174,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): histories = gi.histories.get_histories(name=spec['history']) if len(histories) == 0: print(f"ERROR: History {spec['history']} not foune") - return + return False hid = histories[0]['id'] pairs = 0 paired_list = spec['paired'] @@ -273,6 +274,14 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): def translate(context: Context, args: list): + """ + Translates the human readable names of datasets and workflows in to the Galaxy + ID that is unique to each server. + + :param context: the conext object used to connect to the Galaxy server + :param args: [0] the path to the benchmarking YAML file to translate + :return: Nothing. Prints the translated workflow file to stdout. + """ if len(args) == 0: print('ERROR: no workflow configuration specified') return @@ -315,6 +324,14 @@ def translate(context: Context, args: list): def validate(context: Context, args: list): + """ + Checks to see if the workflow and all datasets defined in the benchmark can + be found on the server. + + :param context: the context object used to connect to the Galaxy instance + :param args: [0] the benchmark YAML file to be validated. + :return: + """ if len(args) == 0: print('ERROR: no workflow configuration specified') return @@ -420,10 +437,10 @@ def validate(context: Context, args: list): def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict): - """Blocks until all jobs defined in the *invocations* to complete. + """Blocks until all jobs defined in *invocations* are complete (in a terminal state). :param gi: The *GalaxyInstance** running the jobs - :param invocations: + :param invocations: a dictionary containing information about the jobs invoked :return: """ wfid = invocations['workflow_id'] @@ -493,6 +510,11 @@ def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict): def parse_workflow(workflow_path: str): + """ + Loads the benchmark YAML file. + :param workflow_path: the path to the file to be loaded. + :return: a dictionary containing the benchmark. + """ if not os.path.exists(workflow_path): print(f'ERROR: could not find workflow file {workflow_path}') return None @@ -511,6 +533,14 @@ def parse_workflow(workflow_path: str): def find_workflow_id(gi, name_or_id): + """ + Resolves the human-readable name for a workflow into the unique ID on the + Galaxy instance. + + :param gi: the connection object to the Galaxy instance + :param name_or_id: the name of the workflow + :return: The Galaxy workflow ID or None if the workflow could not be located + """ try: wf = gi.workflows.show_workflow(name_or_id) return wf['id'] @@ -527,7 +557,14 @@ def find_workflow_id(gi, name_or_id): def find_dataset_id(gi, name_or_id): - # print(f"Finding dataset {name_or_id}") + """ + Resolves the human-readable name if a dataset into the unique ID on the + Galaxy instance + + :param gi: the connection object to the Galaxy instance + :param name_or_id: the name of the dataset. + :return: the Galaxy dataset ID or None if the dataset could not be located. + """ try: ds = gi.datasets.show_dataset(name_or_id) return ds['id'] @@ -552,6 +589,14 @@ def find_dataset_id(gi, name_or_id): def find_collection_id(gi, name): + """ + Resolves a human-readable collection name into the unique Galaxy ID. + + :param gi: the connection object to the Galaxy instance + :param name: the name of the collection to resolve + :return: The unique Galaxy ID of the collection or None if the collection + can not be located. + """ kwargs = {'limit': 10000, 'offset': 0} datasets = gi.datasets.get_datasets(**kwargs) if len(datasets) == 0: @@ -573,6 +618,13 @@ def find_collection_id(gi, name): def test(context: Context, args: list): + """ + Allows running testing code from the command line. + + :param context: a connection object to a Galaxy instance + :param args: varies + :return: varies, typically None. + """ id = 'c90fffcf98b31cd3' gi = connect(context) inputs = gi.workflows.get_workflow_inputs(id, 'PE fastq input') diff --git a/abm/lib/cloudlaunch.py b/abm/lib/cloudlaunch.py index 3a2719d..19a4fc6 100644 --- a/abm/lib/cloudlaunch.py +++ b/abm/lib/cloudlaunch.py @@ -8,6 +8,8 @@ from cloudlaunch_cli.main import create_api_client from common import Context +# DEPRECATED - Cloudlaunch is no longer used to manage Galaxy clusters. + BOLD = '\033[1m' CLEAR = '\033[0m' diff --git a/abm/lib/common.py b/abm/lib/common.py index 96a21eb..cbc035f 100644 --- a/abm/lib/common.py +++ b/abm/lib/common.py @@ -9,8 +9,10 @@ from bioblend.galaxy import dataset_collections from ruamel.yaml import YAML +# Where we will look for our configuration file. PROFILE_SEARCH_PATH = ['~/.abm/profile.yml', '.abm-profile.yml'] +# Deprecated. Do not use. datasets = { "dna": [ "ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR013/ERR013101/ERR013101_1.fastq.gz", @@ -25,6 +27,14 @@ def try_for(f, limit=3): + """ + Tries to invoke the function f. If the function f fails it will be retried + *limit* number of times. + + :param f: the function to invoke + :param limit: how many times the function will be retried + :return: the result of calling f() + """ count = 0 running = True result = None @@ -40,6 +50,13 @@ def try_for(f, limit=3): class Context: + """ + The context object that contains information to connect to a Galaxy instance. + + GALAXY_SERVER: the URL of the Galaxy server to connect to + API_KEY : a user's API key to make API calls on the Galaxy instance + KUBECONFIG: : the kubeconfig file needed to make changes via Helm + """ def __init__(self, *args): if len(args) == 1: arg = args[0] @@ -90,7 +107,12 @@ def connect(context: Context): def _set_active_profile(profile_name: str): - # print(f"Parsing profile for {profile_name}") + """ + Unused. + + :param profile_name: + :return: + """ lib.GALAXY_SERVER, lib.API_KEY, lib.KUBECONFIG = parse_profile(profile_name) return lib.GALAXY_SERVER != None @@ -100,6 +122,11 @@ def get_context(profile_name: str): def get_yaml_parser(): + """ + Returns a singleton instance of a YAML parser. + + :return: a YAML parser. + """ if lib.parser is None: lib.parser = YAML() return lib.parser @@ -124,6 +151,12 @@ def load_profiles(): def save_profiles(profiles: dict): + """ + Write the ABM configuration file. + + :param profiles: the configuration to be saved. + :return: None + """ yaml = get_yaml_parser() for profile_path in PROFILE_SEARCH_PATH: path = os.path.expanduser(profile_path) @@ -161,15 +194,16 @@ def parse_profile(profile_name: str): def run(command, env: dict = None): + """ + Runs a command on the local machine. Used to invoke the helm and kubectl + executables. + + :param command: the command to be invoked + :param env: environment variables for the command. + :return: + """ if env is None: env = os.environ - # if env is not None: - # for name,value in env.items(): - # os.environ[name] = value - # if lib.KUBECONFIG is not None: - # os.environ['KUBECONFIG'] = lib.KUBECONFIG - # local_env = os.environ.copy() - # local_env.update(env) result = subprocess.run(command.split(), capture_output=True, env=env) if result.returncode != 0: raise RuntimeError(result.stderr.decode('utf-8').strip()) @@ -177,6 +211,11 @@ def run(command, env: dict = None): def get_env(context: Context): + """ + Creates a copy of the environment variables as returned by os.environ. + :param context: Ignored + :return: a dictionary of the environment variables + """ copy = os.environ.copy() for key, value in context.__dict__.items(): if value is not None: @@ -185,6 +224,13 @@ def get_env(context: Context): def find_executable(name): + """ + Used the which command on the local machine to find the full path to an + executable. + + :param name: the name of a command line executable or script. + :return: the full path to the executable or an empty string if the executable is not found. + """ return run(f"which {name}") @@ -208,6 +254,7 @@ def find_executable(name): # "swaptotal", # "uname" +# Columns to be defined when generating CSV files. table_header = [ "id", "history_id", @@ -237,6 +284,11 @@ def find_executable(name): ] def print_table_header(): + """ + Prints the table header suitable for inclusion in CSV files. + + :return: None. The table header is printed to stdout. + """ print(','.join(table_header)) diff --git a/abm/lib/dataset.py b/abm/lib/dataset.py index 0fe507f..a4f022c 100644 --- a/abm/lib/dataset.py +++ b/abm/lib/dataset.py @@ -12,9 +12,9 @@ def do_list(context: Context, argv: list): parser = argparse.ArgumentParser() - parser.add_argument('-s', '--state', help='list jobs in this state') - parser.add_argument('--history', help='show jobs in the given history') - parser.add_argument('-t', '--tool', help='only show jobs generate by this tool') + parser.add_argument('-s', '--state', help='list datasets in this state') + parser.add_argument('--history', help='show datasets in the given history') + parser.add_argument('-t', '--tool', help='only show datasets generate by this tool') args = parser.parse_args(argv) kwargs = {'limit': 10000, 'offset': 0, 'deleted': False} gi = connect(context)