diff --git a/LICENSE b/LICENSE index cdd6c6d..919ca29 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2021 Galaxy Project +Copyright (c) 2024 Galaxy Project Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/Makefile b/Makefile index 10fc6f2..2a6432c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,15 @@ .PHONY: dist +help: + @echo + @echo "GOALS" + @echo " clean - deletes the dist directory and egg-info" + @echo " dist - creates the distribution package (wheel)" + @echo " format - runs Black and isort" + @echo " test-deploy - deploys to test.pypi.org" + @echo " deploy - deploys to pypi.org" + @echo " release - creates a GitHub release package" + @echo + dist: python3 setup.py sdist bdist_wheel diff --git a/README.md b/README.md index 81e023e..c52b5b2 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ The `kubectl` program is only required when bootstrapping a new Galaxy instance, ### Credentials -You will need an [API key](https://training.galaxyproject.org/training-material/faqs/galaxy/preferences_admin_api_key.html) for every Galaxy instance you would like to intereact with. You will also need the *kubeconfig* file for each Kubernetes cluster. The `abm` script loads the Galaxy server URLs, API keys, and the location of the *kubeconfig* files from a Yaml configuration file that it expects to find in `$HOME/.abm/profile.yml` or `.abm-profile.yml` in the current directory. You can use the `profile-sample.yml` file as a starting point and it includes the URLs for all Galaxy instances we have used to date (December 22, 2021 as of this writing). +You will need an [API key](https://training.galaxyproject.org/training-material/faqs/galaxy/preferences_admin_api_key.html) for every Galaxy instance you would like to intereact with. You will also need the *kubeconfig* file for each Kubernetes cluster. The `abm` script loads the Galaxy server URLs, API keys, and the location of the *kubeconfig* files from a Yaml configuration file that it expects to find in `$HOME/.abm/profile.yml` or `.abm-profile.yml` in the current directory. You can use the `samples/profile.yml` file as a starting point and it includes the URLs for all Galaxy instances we have used to date (December 22, 2021 as of this writing). :bulb: It is now possible (>=2.0.0) to create Galaxy users and their API keys directly with `abm`. diff --git a/abm/__main__.py b/abm/__main__.py index 014bada..58f5877 100644 --- a/abm/__main__.py +++ b/abm/__main__.py @@ -3,7 +3,7 @@ """ The Automated Benchmarking Tool -Copyright 2023 The Galaxy Project. All rights reserved. +Copyright 2024 The Galaxy Project. All rights reserved. """ @@ -64,7 +64,7 @@ def command_list(commands: list): def copyright(): - print(f" Copyright 2023 The Galaxy Project. All Rights Reserved.\n") + print(f" Copyright 2024 The Galaxy Project. All Rights Reserved.\n") def print_main_help(menu_data): diff --git a/abm/lib/__init__.py b/abm/lib/__init__.py index 2cdb864..a17780b 100644 --- a/abm/lib/__init__.py +++ b/abm/lib/__init__.py @@ -4,14 +4,16 @@ sys.path.append(os.path.dirname(os.path.realpath(__file__))) -# from common import parse_profile - +# Where the workflow invocation data returned by Galaxy will be saved. INVOCATIONS_DIR = "invocations" +# Where workflow runtime metrics will be saved. METRICS_DIR = "metrics" +# Global instance of a YAML parser so we can reuse it if needed. parser = None +# Keys used in various dictionaries. class Keys: NAME = 'name' RUNS = 'runs' @@ -22,3 +24,20 @@ class Keys: COLLECTION = 'collection' HISTORY_BASE_NAME = 'output_history_base_name' HISTORY_NAME = 'history_name' + + +# def get_master_api_key(): +# ''' +# Get the master API key from the environment or configuration file. +# ''' +# if 'GALAXY_MASTER_API_KEY' in os.environ: +# return os.environ['GALAXY_MASTER_API_KEY'] +# config_path = os.path.expanduser("~/.abm/config.yml") +# if not os.path.exists(config_path): +# raise RuntimeError(f"ERROR: Configuration file not found: {config_path}") +# with open(config_path, 'r') as f: +# config = yaml.safe_load(f) +# key = config.get('GALAXY_MASTER_API_KEY', None) +# if key == None: +# raise RuntimeError("ERROR: GALAXY_MASTER_API_KEY not found in config.yml") +# return key diff --git a/abm/lib/benchmark.py b/abm/lib/benchmark.py index b7cb745..a1a1b69 100644 --- a/abm/lib/benchmark.py +++ b/abm/lib/benchmark.py @@ -8,7 +8,7 @@ from bioblend.galaxy import GalaxyInstance, dataset_collections from lib import INVOCATIONS_DIR, METRICS_DIR, Keys from lib.common import (Context, _get_dataset_data, _make_dataset_element, - connect, print_json) + connect, print_json, try_for) from lib.history import wait_for log = logging.getLogger('abm') @@ -16,13 +16,10 @@ def run_cli(context: Context, args: list): """ - Runs a single workflow defined by *args[0]* + Command line handler to run a single benchmark. - :param args: a list that contains: - args[0] - the path to the benchmark configuration file - args[1] - the prefix to use when creating the new history in Galaxy - args[2] - the name of the experiment, if part of one. This is used to - generate output folder names. + :param context: a context object the defines how to connect to the Galaxy server. + :param args: parameters from the command line :return: True if the workflows completed sucessfully. False otherwise. """ @@ -43,11 +40,15 @@ def run_cli(context: Context, args: list): def run(context: Context, workflow_path, history_prefix: str, experiment: str): - # if len(args) > 1: - # history_prefix = args[1] - # if len(args) > 2: - # experiment = args[2].replace(' ', '_').lower() + """ + Does the actual work of running a benchmark. + :param context: a context object the defines how to connect to the Galaxy server. + :param workflow_path: path to the ABM workflow file. (benchmark really). NOTE this is NOT the Galaxy .ga file. + :param history_prefix: a prefix value used when generating new history names. + :param experiment: the name of the experiment (arbitrary string). Used to generate new history names. + :return: True if the workflow run completed successfully. False otherwise. + """ if os.path.exists(INVOCATIONS_DIR): if not os.path.isdir(INVOCATIONS_DIR): print('ERROR: Can not save invocation status, directory name in use.') @@ -76,7 +77,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): workflows = parse_workflow(workflow_path) if not workflows: print(f"Unable to load any workflow definitions from {workflow_path}") - return + return False print(f"Found {len(workflows)} workflow definitions") for workflow in workflows: @@ -144,11 +145,13 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): dsid = find_collection_id(gi, dsname) dsdata = _get_dataset_data(gi, dsid) if dsdata is None: - raise Exception( - f"ERROR: unable to resolve {dsname} to a dataset." - ) - dsid = dsdata['id'] - dssize = dsdata['size'] + # raise Exception( + # f"ERROR: unable to resolve {dsname} to a dataset." + # ) + dssize = 0 + else: + dsid = dsdata['id'] + dssize = dsdata['size'] input_data_size.append(dssize) print(f"Input collection ID: {dsname} [{dsid}] {dssize}") inputs[input[0]] = {'id': dsid, 'src': 'hdca', 'size': dssize} @@ -173,7 +176,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): histories = gi.histories.get_histories(name=spec['history']) if len(histories) == 0: print(f"ERROR: History {spec['history']} not foune") - return + return False hid = histories[0]['id'] pairs = 0 paired_list = spec['paired'] @@ -183,7 +186,13 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): for key in item.keys(): # print(f"Getting dataset for {key} = {item[key]}") value = _get_dataset_data(gi, item[key]) - size += value['size'] + if value is None: + print( + f"ERROR: Unable to find dataset {item[key]}" + ) + return + if size in value: + size += value['size'] elements.append( _make_dataset_element(key, value['id']) ) @@ -224,16 +233,20 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): else: raise Exception(f'Invalid input value') print(f"Running workflow {wfid} in history {new_history_name}") - invocation = gi.workflows.invoke_workflow( + f = lambda: gi.workflows.invoke_workflow( wfid, inputs=inputs, history_name=new_history_name ) + invocation = try_for(f, 3) id = invocation['id'] # invocations = gi.invocations.wait_for_invocation(id, 86400, 10, False) + f = lambda: gi.invocations.wait_for_invocation(id, 86400, 10, False) try: - invocations = gi.invocations.wait_for_invocation(id, 86400, 10, False) - except: + invocations = try_for(f, 2) + except Exception as e: + print(f"Exception waiting for invocations") pprint(invocation) sys.exc_info() + raise e print("Waiting for jobs") if history_prefix is not None: parts = history_prefix.split() @@ -265,6 +278,14 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): def translate(context: Context, args: list): + """ + Translates the human readable names of datasets and workflows in to the Galaxy + ID that is unique to each server. + + :param context: the conext object used to connect to the Galaxy server + :param args: [0] the path to the benchmarking YAML file to translate + :return: Nothing. Prints the translated workflow file to stdout. + """ if len(args) == 0: print('ERROR: no workflow configuration specified') return @@ -307,6 +328,14 @@ def translate(context: Context, args: list): def validate(context: Context, args: list): + """ + Checks to see if the workflow and all datasets defined in the benchmark can + be found on the server. + + :param context: the context object used to connect to the Galaxy instance + :param args: [0] the benchmark YAML file to be validated. + :return: + """ if len(args) == 0: print('ERROR: no workflow configuration specified') return @@ -412,10 +441,10 @@ def validate(context: Context, args: list): def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict): - """Blocks until all jobs defined in the *invocations* to complete. + """Blocks until all jobs defined in *invocations* are complete (in a terminal state). :param gi: The *GalaxyInstance** running the jobs - :param invocations: + :param invocations: a dictionary containing information about the jobs invoked :return: """ wfid = invocations['workflow_id'] @@ -429,6 +458,7 @@ def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict): jobs = gi.jobs.get_jobs(history_id=hid) for job in jobs: data = gi.jobs.show_job(job['id'], full_details=True) + data['job_metrics'] = gi.jobs.get_job_metrics(job['id']) metrics = { 'run': run, 'cloud': cloud, @@ -485,6 +515,11 @@ def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict): def parse_workflow(workflow_path: str): + """ + Loads the benchmark YAML file. + :param workflow_path: the path to the file to be loaded. + :return: a dictionary containing the benchmark. + """ if not os.path.exists(workflow_path): print(f'ERROR: could not find workflow file {workflow_path}') return None @@ -503,6 +538,14 @@ def parse_workflow(workflow_path: str): def find_workflow_id(gi, name_or_id): + """ + Resolves the human-readable name for a workflow into the unique ID on the + Galaxy instance. + + :param gi: the connection object to the Galaxy instance + :param name_or_id: the name of the workflow + :return: The Galaxy workflow ID or None if the workflow could not be located + """ try: wf = gi.workflows.show_workflow(name_or_id) return wf['id'] @@ -519,7 +562,14 @@ def find_workflow_id(gi, name_or_id): def find_dataset_id(gi, name_or_id): - # print(f"Finding dataset {name_or_id}") + """ + Resolves the human-readable name if a dataset into the unique ID on the + Galaxy instance + + :param gi: the connection object to the Galaxy instance + :param name_or_id: the name of the dataset. + :return: the Galaxy dataset ID or None if the dataset could not be located. + """ try: ds = gi.datasets.show_dataset(name_or_id) return ds['id'] @@ -544,6 +594,14 @@ def find_dataset_id(gi, name_or_id): def find_collection_id(gi, name): + """ + Resolves a human-readable collection name into the unique Galaxy ID. + + :param gi: the connection object to the Galaxy instance + :param name: the name of the collection to resolve + :return: The unique Galaxy ID of the collection or None if the collection + can not be located. + """ kwargs = {'limit': 10000, 'offset': 0} datasets = gi.datasets.get_datasets(**kwargs) if len(datasets) == 0: @@ -565,7 +623,22 @@ def find_collection_id(gi, name): def test(context: Context, args: list): - id = 'c90fffcf98b31cd3' + """ + Allows running testing code from the command line. + + :param context: a connection object to a Galaxy instance + :param args: varies + :return: varies, typically None. + """ + # id = 'c90fffcf98b31cd3' + # gi = connect(context) + # inputs = gi.workflows.get_workflow_inputs(id, 'PE fastq input') + # pprint(inputs) + gi = connect(context) - inputs = gi.workflows.get_workflow_inputs(id, 'PE fastq input') - pprint(inputs) + print("Calling find_collection_id") + dsid = find_collection_id(gi, args[0]) + print(f"Collection ID: {dsid}") + print("Calling _get_dataset_data") + dsdata = _get_dataset_data(gi, dsid) + pprint(dsdata) diff --git a/abm/lib/cloudlaunch.py b/abm/lib/cloudlaunch.py index 0abd261..19a4fc6 100644 --- a/abm/lib/cloudlaunch.py +++ b/abm/lib/cloudlaunch.py @@ -8,6 +8,8 @@ from cloudlaunch_cli.main import create_api_client from common import Context +# DEPRECATED - Cloudlaunch is no longer used to manage Galaxy clusters. + BOLD = '\033[1m' CLEAR = '\033[0m' @@ -40,7 +42,7 @@ def h1(text): ''' -def list(context: Context, args: list): +def do_list(context: Context, args: list): archived = False filter = None status = lambda t: t.instance_status if t.instance_status else t.status diff --git a/abm/lib/common.py b/abm/lib/common.py index dbb2402..dd5ff2f 100644 --- a/abm/lib/common.py +++ b/abm/lib/common.py @@ -2,14 +2,18 @@ import os import subprocess import sys +from math import ceil +from pathlib import Path import bioblend.galaxy import lib from bioblend.galaxy import dataset_collections from ruamel.yaml import YAML +# Where we will look for our configuration file. PROFILE_SEARCH_PATH = ['~/.abm/profile.yml', '.abm-profile.yml'] +# Deprecated. Do not use. datasets = { "dna": [ "ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR013/ERR013101/ERR013101_1.fastq.gz", @@ -23,25 +27,67 @@ } +def try_for(f, limit=3): + """ + Tries to invoke the function f. If the function f fails it will be retried + *limit* number of times. + + :param f: the function to invoke + :param limit: how many times the function will be retried + :return: the result of calling f() + """ + count = 0 + running = True + result = None + while running: + try: + count += 1 + result = f() + running = False + except Exception as e: + if count >= limit: + raise e + return result + + class Context: + """ + The context object that contains information to connect to a Galaxy instance. + + GALAXY_SERVER: the URL of the Galaxy server to connect to + API_KEY : a user's API key to make API calls on the Galaxy instance + KUBECONFIG: : the kubeconfig file needed to make changes via Helm + """ + def __init__(self, *args): if len(args) == 1: arg = args[0] if type(arg) == str: - self.GALAXY_SERVER, self.API_KEY, self.KUBECONFIG = parse_profile(arg) + ( + self.GALAXY_SERVER, + self.API_KEY, + self.KUBECONFIG, + self.MASTER_KEY, + ) = parse_profile(arg) elif type(arg) == dict: self.GALAXY_SERVER = arg['GALAXY_SERVER'] self.API_KEY = arg['API_KEY'] self.KUBECONFIG = arg['KUBECONFIG'] + if 'MASTER_KEY' in arg: + self.MASTER_KEY = arg['MASTER_KEY'] + else: + self.MASTER_KEY = None else: raise Exception(f'Invalid arg for Context: {type(arg)}') - elif len(args) == 3: + elif len(args) == 3 or len(args) == 4: self.GALAXY_SERVER = args[0] self.API_KEY = args[1] self.KUBECONFIG = args[2] + if len(args) == 4: + self.MASTER_KEY = args[3] else: raise Exception( - f'Invalid args for Context. Expected one or three, found {len(args)}' + f'Invalid args for Context. Expected one or four, found {len(args)}' ) @@ -53,7 +99,7 @@ def print_yaml(obj): get_yaml_parser().dump(obj, sys.stdout) -def connect(context: Context): +def connect(context: Context, use_master_key=False): """ Create a connection to the Galaxy instance @@ -67,15 +113,29 @@ def connect(context: Context): print('ERROR: The Galaxy API key has not been set. Please check your') print(' configuration in ~/.abm/profile.yml and try again.') sys.exit(1) - gi = bioblend.galaxy.GalaxyInstance(url=context.GALAXY_SERVER, key=context.API_KEY) + key = context.API_KEY + if use_master_key: + if context.MASTER_KEY is None: + print('ERROR: The Galaxy master key has not been set. Please check your') + print(' configuration in ~/.abm/profile.yml and try again.') + sys.exit(1) + key = context.MASTER_KEY + gi = bioblend.galaxy.GalaxyInstance(url=context.GALAXY_SERVER, key=key) gi.max_get_attempts = 3 gi.get_retry_delay = 1 return gi def _set_active_profile(profile_name: str): - # print(f"Parsing profile for {profile_name}") - lib.GALAXY_SERVER, lib.API_KEY, lib.KUBECONFIG = parse_profile(profile_name) + """ + Unused. + + :param profile_name: + :return: + """ + lib.GALAXY_SERVER, lib.API_KEY, lib.KUBECONFIG, lib.MASTER_KEY = parse_profile( + profile_name + ) return lib.GALAXY_SERVER != None @@ -84,6 +144,11 @@ def get_context(profile_name: str): def get_yaml_parser(): + """ + Returns a singleton instance of a YAML parser. + + :return: a YAML parser. + """ if lib.parser is None: lib.parser = YAML() return lib.parser @@ -108,6 +173,12 @@ def load_profiles(): def save_profiles(profiles: dict): + """ + Write the ABM configuration file. + + :param profiles: the configuration to be saved. + :return: None + """ yaml = get_yaml_parser() for profile_path in PROFILE_SEARCH_PATH: path = os.path.expanduser(profile_path) @@ -125,10 +196,11 @@ def parse_profile(profile_name: str): :param profile_name: path to the profile to parse :return: a tuple containing the Galaxy URL, API key, and path to the kubeconfig ''' + nones = (None, None, None, None) profiles = load_profiles() if profiles is None: print(f'ERROR: Could not locate an abm profile file in {PROFILE_SEARCH_PATH}') - return None, None, None + return nones if profile_name not in profiles: print(f'ERROR: {profile_name} is not the name of a valid profile.') keys = list(profiles.keys()) @@ -137,23 +209,28 @@ def parse_profile(profile_name: str): ', '.join([f"'{k}'" for k in keys[0:-2]]) + f", and '{keys[-1]}'" ) print(f'The defined profile names are: {quoted_keys}') - return None, None, None + return nones profile = profiles[profile_name] + kube = None + master = 'galaxypassword' if 'kube' in profile: - return (profile['url'], profile['key'], os.path.expanduser(profile['kube'])) - return (profile['url'], profile['key'], None) + kube = os.path.expanduser(profile['kube']) + if 'master' in profile: + master = profile['master'] + return (profile['url'], profile['key'], kube, master) def run(command, env: dict = None): + """ + Runs a command on the local machine. Used to invoke the helm and kubectl + executables. + + :param command: the command to be invoked + :param env: environment variables for the command. + :return: + """ if env is None: env = os.environ - # if env is not None: - # for name,value in env.items(): - # os.environ[name] = value - # if lib.KUBECONFIG is not None: - # os.environ['KUBECONFIG'] = lib.KUBECONFIG - # local_env = os.environ.copy() - # local_env.update(env) result = subprocess.run(command.split(), capture_output=True, env=env) if result.returncode != 0: raise RuntimeError(result.stderr.decode('utf-8').strip()) @@ -161,6 +238,11 @@ def run(command, env: dict = None): def get_env(context: Context): + """ + Creates a copy of the environment variables as returned by os.environ. + :param context: Ignored + :return: a dictionary of the environment variables + """ copy = os.environ.copy() for key, value in context.__dict__.items(): if value is not None: @@ -169,6 +251,13 @@ def get_env(context: Context): def find_executable(name): + """ + Used the which command on the local machine to find the full path to an + executable. + + :param name: the name of a command line executable or script. + :return: the full path to the executable or an empty string if the executable is not found. + """ return run(f"which {name}") @@ -192,53 +281,107 @@ def find_executable(name): # "swaptotal", # "uname" +# Columns to be defined when generating CSV files. +table_header = [ + "id", + "history_id", + "history_name", + "state", + "tool_id", + "invocation_id", + "workflow_id", + "cpuacct.usage", + # "end_epoch", + "galaxy_memory_mb", + "galaxy_slots", + # "memory.failcnt", + "memory.limit_in_bytes", + "memory.peak", + # "memory.max_usage_in_bytes", + # "memory.memsw.limit_in_bytes", + # "memory.memsw.max_usage_in_bytes", + # "memory.oom_control.oom_kill_disable", + # "memory.oom_control.under_oom", + "memory.soft_limit_in_bytes", + "memtotal", + "processor_count", + "runtime_seconds", + # "start_epoch", + # "swaptotal", + # "uname" +] + + +def print_table_header(): + """ + Prints the table header suitable for inclusion in CSV files. + + :return: None. The table header is printed to stdout. + """ + print(','.join(table_header)) + + +history_name_cache = dict() + + +def get_history_name(gi, hid: str) -> str: + if hid in history_name_cache: + return history_name_cache[hid] + history = gi.histories.show_history(hid) + if history is None: + return 'unknown' + name = history['name'] + history_name_cache[hid] = name + return name + def summarize_metrics(gi, jobs: list): - header = [ - "id", - "history_id", - "history_name", - "state", - "tool_id", - "invocation_id", - "workflow_id", - "cpuacct.usage", - # "end_epoch", - "galaxy_memory_mb", - "galaxy_slots", - # "memory.failcnt", - "memory.limit_in_bytes", - "memory.max_usage_in_bytes", - # "memory.memsw.limit_in_bytes", - # "memory.memsw.max_usage_in_bytes", - # "memory.oom_control.oom_kill_disable", - # "memory.oom_control.under_oom", - "memory.soft_limit_in_bytes", - "memtotal", - "processor_count", - "runtime_seconds", - # "start_epoch", - # "swaptotal", - # "uname" - ] - - print(','.join(header)) + table = [] + # table.append(header) + # print(','.join(header)) for job in jobs: job_metrics = gi.jobs.get_metrics(job['id']) row = [] - metrics = metrics_to_dict(job_metrics, header) + toolid = job.get('tool_id', 'unknown') + if '/' in toolid: + parts = toolid.split('/') + toolid = f'{parts[-2]}/{parts[-1]}' + metrics = metrics_to_dict(job_metrics, table_header) metrics['id'] = job.get('id', 'unknown') - metrics['history_id'] = job.get('history_id', 'unknown') - metrics['history_name'] = job.get('history_name', 'unknown') + hid = job.get('history_id', 'unknown') + metrics['history_id'] = hid + metrics['history_name'] = get_history_name(gi, hid) metrics['state'] = job.get('state', 'unknown') - metrics['tool_id'] = job.get('tool_id', 'unknown') + metrics['tool_id'] = toolid metrics['invocation_id'] = job.get('invocation_id', 'unknown') - for key in header: + for key in table_header: if key in metrics: row.append(metrics[key]) else: row.append('') - print(','.join(row), end='\n') + # print(','.join(row), end='\n') + table.append(row) + return table + + +def print_markdown_table(table: list) -> None: + print('| Tool ID | History | State | Memory (GB) | Runtime (sec)|') + print('|---|---|---:|---:|---:|') + GB = 1024 * 1024 * 1024 + for row in table[1:]: + # memory = '' + # if row[11] != '': + # memory = float(row[11]) / GB + # if memory < 0.1: + # memory = 0.1 + # memory = f"{memory:3.1f}" + history = row[2] + state = row[3] + tool_id = row[4] + # cpu = '' if row[7] == '' else float(row[7]) / 10**9 + memory = '' if row[11] == '' else f"{max(0.1, float(row[11]) / GB):3.1f}" + runtime = '' if row[15] == '' else f"{max(1, float(row[15])):5.0f}" + print(f'| {tool_id} | {history} | {state} | {memory} | {runtime} |') def metrics_to_dict(metrics: list, accept: list): @@ -275,7 +418,43 @@ def find_history(gi, name_or_id): return history[0]['id'] +def find_dataset(gi, history_id, name_or_id): + try: + dataset = gi.datasets.show_dataset(name=name_or_id) + return dataset['id'] + except: + pass + + try: + dataset = gi.datasets.show_dataset(name_or_id) + return dataset['id'] + except: + pass + return None + # print("Calling get_datasets") + # datasets = gi.datasets.get_datasets(history_id=history_id, name=name_or_id) + # if datasets is None: + # print("Not found") + # return None + # if len(datasets) == 0: + # print("No datasets found (len == 0)") + # return None + # return datasets[0]['id'] + + +def find_config(name: str) -> str: + if os.path.exists(".abm"): + if os.path.exists(f".abm/{name}"): + return f".abm/{name}" + config = os.path.join(Path.home(), ".abm", name) + if os.path.exists(config): + return config + return None + + def _get_dataset_data(gi, name_or_id): + print(f"Getting dataset data for {name_or_id}") + def make_result(data): return { 'id': data['id'], @@ -285,14 +464,18 @@ def make_result(data): try: ds = gi.datasets.show_dataset(name_or_id) + print(f"Got dataset data for {name_or_id} {ds['id']}") return make_result(ds) except Exception as e: + print(f"Failed to get dataset data for {name_or_id}") pass try: + print("Getting all datasets") datasets = gi.datasets.get_datasets( name=name_or_id ) # , deleted=True, purged=True) + print(f"List of datasets for {name_or_id} is {len(datasets)}") for ds in datasets: # print_json(ds) state = True @@ -301,7 +484,11 @@ def make_result(data): if state and not ds['deleted'] and ds['visible']: # The dict returned by get_datasets does not include the input # file sizes so we need to make another call to show_datasets + print(f"Getting dataset data for {ds['id']}") return make_result(gi.datasets.show_dataset(ds['id'])) + else: + print(f"Skipping dataset {ds['id']}") + print_json(ds) except Exception as e: pass @@ -311,3 +498,21 @@ def make_result(data): def _make_dataset_element(name, value): # print(f"Making dataset element for {name} = {value}({type(value)})") return dataset_collections.HistoryDatasetElement(name=name, id=value) + + +def get_float_key(column: int): + def get_key(row: list): + if row[column] == '': + return -1 + return float(row[column]) + + return get_key + + +def get_str_key(column: int): + # print(f"Getting string key for column {column}") + def get_key(row: list): + # print(f"Sorting by column {column} key {row[column]}") + return row[column] + + return get_key diff --git a/abm/lib/config.py b/abm/lib/config.py index cbb71a5..24d8112 100644 --- a/abm/lib/config.py +++ b/abm/lib/config.py @@ -6,7 +6,7 @@ print_yaml, save_profiles) -def list(context: Context, args: list): +def do_list(context: Context, args: list): profiles = load_profiles() print(f"Loaded {len(profiles)} profiles") for profile in profiles: diff --git a/abm/lib/dataset.py b/abm/lib/dataset.py index 6ae32a8..9ac3811 100644 --- a/abm/lib/dataset.py +++ b/abm/lib/dataset.py @@ -1,3 +1,4 @@ +import argparse import json import os from pathlib import Path @@ -6,21 +7,37 @@ import yaml from bioblend.galaxy import dataset_collections from common import (Context, _get_dataset_data, _make_dataset_element, connect, - find_history, print_json) + find_config, find_dataset, find_history, print_json) -def list(context: Context, args: list): +def do_list(context: Context, argv: list): + parser = argparse.ArgumentParser() + parser.add_argument('-s', '--state', help='list datasets in this state') + parser.add_argument('--history', help='show datasets in the given history') + parser.add_argument('-t', '--tool', help='only show datasets generate by this tool') + args = parser.parse_args(argv) + kwargs = {'limit': 10000, 'offset': 0, 'deleted': False} gi = connect(context) - kwargs = {'limit': 10000, 'offset': 0} - if len(args) > 0: - if args[0] in ['-s', '--state']: - if len(args) != 2: - print("ERROR: Invalid command.") - return - kwargs['state'] = args[1] - else: - print(f"ERROR: Invalid parameter: {args[0]}") + if args.state: + kwargs['state'] = args.state + if args.history: + hid = find_history(gi, args.history) + if hid is None: + print("ERROR: No such history") return + kwargs['history_id'] = hid + if args.tool: + kwargs['tool_id'] = args.tool + + # if len(args) > 0: + # if args[0] in ['-s', '--state']: + # if len(args) != 2: + # print("ERROR: Invalid command.") + # return + # kwargs['state'] = args[1] + # else: + # print(f"ERROR: Invalid parameter: {args[0]}") + # return # datasets = gi.datasets.get_datasets(limit=10000, offset=0) # , deleted=True, purged=True) datasets = gi.datasets.get_datasets(**kwargs) if len(datasets) == 0: @@ -101,6 +118,7 @@ def upload(context: Context, args: list): return if gi is None: gi = connect(context) + history = find_history(gi, history) if name: _import_from_url(gi, history, url, file_name=name) else: @@ -145,45 +163,74 @@ def collection(context: Context, args: list): def import_from_config(context: Context, args: list): + parser = argparse.ArgumentParser() + parser.add_argument( + '-c', + '--create', + help='create a new history for the dataset', + required=False, + default=None, + ) + parser.add_argument( + '-f', + '--file', + help='use instead of the datasets.yml', + required=False, + default=None, + ) + parser.add_argument( + '--history', + help='add datasets to the given history', + required=False, + default=None, + ) + parser.add_argument( + '-n', '--name', help='set the name of the dataset', required=False, default=None + ) + parser.add_argument('keys', help='the key of the dataset to import', nargs='+') gi = None - key = None history = None kwargs = {} - while len(args) > 0: - arg = args.pop(0) - if arg in ['--hs', '--hist', '--history']: - history = args.pop(0) - elif arg in ['-c', '--create']: - gi = connect(context) - history = gi.histories.create_history(args.pop(0)).get('id') - elif arg in ['-n', '--name']: - kwargs['file_name'] = args.pop(0) - elif key is not None: - print(f"ERROR: key already set: {key}") + argv = parser.parse_args(args) + if argv.name is not None: + if len(argv.keys) > 1: + print("ERROR: cannot specify --name with multiple keys") return - else: - key = arg + kwargs['file_name'] = argv.name - configfile = os.path.join(Path.home(), '.abm', 'datasets.yml') - if not os.path.exists(configfile): - print("ERROR: ABM has not been configured to import datasets.") - print(f"Please create {configfile}") + if argv.create is not None and argv.history is not None: + print("ERROR: cannot specify both --create and --history") return + if argv.create is not None: + gi = connect(context) + history = gi.histories.create_history(argv.create).get('id') + if argv.history is not None: + gi = connect(context) + history = find_history(gi, argv.history) + if argv.file is not None: + configfile = argv.file + if not os.path.exists(configfile): + print(f"ERROR: the specified file {configfile} was not found") + return + else: + configfile = find_config("datasets.yml") + if configfile is None: + print("ERROR: ABM has not been configured to import datasets.") + print(f"Please create {configfile}") + return with open(configfile, 'r') as f: datasets = yaml.safe_load(f) - if not key in datasets: - print(f"ERROR: dataset {key} has not been defined.") - return - url = datasets[key] - if gi is None: gi = connect(context) - if history is not None: - history = find_history(gi, history) - - response = gi.tools.put_url(url, history, **kwargs) - print(json.dumps(response, indent=4)) + for key in argv.keys: + if not key in datasets: + print(f"ERROR: dataset {key} has not been defined.") + else: + url = datasets[key] + print(f"Importing {key} from {url}") + response = gi.tools.put_url(url, history, **kwargs) + print(json.dumps(response, indent=4)) def _import_from_url(gi, history, url, **kwargs): @@ -223,9 +270,17 @@ def rename(context: Context, args: list): print("ERROR: please provide the history ID, dataset ID, and new name.") return gi = connect(context) - response = gi.histories.update_dataset(args[0], args[1], name=args[2]) - result = {'state': response['state'], 'name': response['name']} - print(json.dumps(result, indent=4)) + hid = find_history(gi, args[0]) + if hid is None: + print("ERROR: no such history") + return + dsid = find_dataset(gi, hid, args[1]) + if dsid is None: + print("ERROR: no such dataset") + return + response = gi.histories.update_dataset(hid, dsid, name=args[2]) + # result = {'state': response['state'], 'name': response['name']} + print(json.dumps(response, indent=4)) def test(context: Context, args: list): diff --git a/abm/lib/experiment.py b/abm/lib/experiment.py index 9dead51..1e35d6e 100644 --- a/abm/lib/experiment.py +++ b/abm/lib/experiment.py @@ -1,15 +1,18 @@ +import argparse import json import logging import os import threading import traceback from datetime import timedelta +from pprint import pprint from time import perf_counter import benchmark import helm import yaml -from common import Context, load_profiles +from common import (Context, get_float_key, get_str_key, load_profiles, + print_markdown_table) INVOCATIONS_DIR = "invocations" METRICS_DIR = "metrics" @@ -26,21 +29,23 @@ def run(context: Context, args: list): :return: True if the benchmarks completed sucessfully. False otherwise. """ + parser = argparse.ArgumentParser() + parser.add_argument('benchmark_path') + parser.add_argument('-r', '--run-number', default=-1) + argv = parser.parse_args(args) - if len(args) == 0: - print("ERROR: No benchmarking configuration provided.") - return False + benchmark_path = argv.benchmark_path - benchmark_path = args[0] if not os.path.exists(benchmark_path): print(f"ERROR: Benchmarking configuration not found {benchmark_path}") return False with open(benchmark_path, 'r') as f: config = yaml.safe_load(f) + config['start_at'] = argv.run_number + print(f"Starting with run number {argv.run_number}") profiles = load_profiles() - # latch = CountdownLatch(len(config['cloud'])) threads = [] start = perf_counter() for cloud in config['cloud']: @@ -66,6 +71,12 @@ def run_on_cloud(cloud: str, config: dict): context = Context(cloud) namespace = 'galaxy' chart = 'anvil/galaxykubeman' + start = int(config['start_at']) + print(f"Staring run number {start}") + if start < 0: + start = 1 + print(f"Staring run number {start}") + end = start + config['runs'] if 'galaxy' in config: namespace = config['galaxy']['namespace'] chart = config['galaxy']['chart'] @@ -76,16 +87,16 @@ def run_on_cloud(cloud: str, config: dict): if not helm.update(context, [f"rules/{conf}.yml", namespace, chart]): log.warning(f"job configuration not found: rules/{conf}.yml") continue - for n in range(config['runs']): - history_name_prefix = f"{n+1} {cloud} {conf}" - for workflow_conf in config['benchmark_confs']: + for workflow_conf in config['benchmark_confs']: + for n in range(start, end): + history_name_prefix = f"{n} {cloud} {conf}" benchmark.run( context, workflow_conf, history_name_prefix, config['name'] ) else: - for n in range(config['runs']): - history_name_prefix = f"{n+1} {cloud}" - for workflow_conf in config['benchmark_confs']: + for workflow_conf in config['benchmark_confs']: + for n in range(start, end): + history_name_prefix = f"{n} {cloud}" benchmark.run( context, workflow_conf, history_name_prefix, config['name'] ) @@ -114,34 +125,43 @@ def summarize(context: Context, args: list): :param args[0]: The path to the directory containing metrics filees :return: None """ + markdown = False separator = None - input_dirs = [] make_row = make_table_row header_row = "Run,Cloud,Job Conf,Workflow,History,Inputs,Tool,Tool Version,State,Slots,Memory,Runtime (Sec),CPU,Memory Limit (Bytes),Memory Max usage (Bytes)" - for arg in args: - if arg in ['-t', '--tsv']: - if separator is not None: - print('ERROR: The output format is specified more than once') - return - print('tsv') - separator = '\t' - elif arg in ['-c', '--csv']: - if separator is not None: - print('ERROR: The output format is specified more than once') - return - separator = ',' - print('csv') - elif arg in ['-m', '--model']: - if separator is not None: - print('ERROR: The output format is specified more than once') - return - print('making a model') - separator = ',' - make_row = make_model_row - header_row = "job_id,tool_id,tool_version,state,memory.max_usage_in_bytes,cpuacct.usage,process_count,galaxy_slots,runtime_seconds,ref_data_size,input_data_size_1,input_data_size_2" - else: - # print(f"Input dir {arg}") - input_dirs.append(arg) + + parser = argparse.ArgumentParser() + parser.add_argument('dirs', nargs='*') + parser.add_argument('-c', '--csv', action='store_true') + parser.add_argument('-t', '--tsv', action='store_true') + parser.add_argument('-m', '--model', action='store_true') + parser.add_argument('--markdown', action='store_true') + parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool']) + argv = parser.parse_args(args) + + count = 0 + if argv.csv: + separator = ',' + count += 1 + if argv.tsv: + separator = '\t' + count += 1 + if argv.model: + separator = ',' + make_row = make_model_row + count += 1 + if argv.markdown: + markdown = True + count += 1 + + if count == 0: + print("ERROR: no output format selected") + return + if count > 1: + print("ERROR: multiple output formats selected") + return + + input_dirs = argv.dirs if len(input_dirs) == 0: input_dirs.append('metrics') @@ -149,7 +169,14 @@ def summarize(context: Context, args: list): if separator is None: separator = ',' - print(header_row) + if markdown: + print("|Run|Inputs|Job Conf|Tool|State|Runtime (Sec)|Max Memory (GB)|") + print("|---|---|---|---|---|---:|---:|") + else: + print(header_row) + + table = list() + GB = float(1073741824) for input_dir in input_dirs: for file in os.listdir(input_dir): input_path = os.path.join(input_dir, file) @@ -162,14 +189,46 @@ def summarize(context: Context, args: list): # print('Ignoring upload tool') continue row = make_row(data) - print(separator.join([str(x) for x in row])) + table.append(row) except Exception as e: - # Silently fail to allow the remainder of the table to be generated. print(f"Unable to process {input_path}") print(e) traceback.print_exc() + # Silently fail to allow the remainder of the table to be generated. # pass + reverse = True + if argv.sort_by: + comp = get_str_key(6) + if argv.sort_by == 'runtime': + # key = 10 + comp = get_float_key(10) + # elif argv.sort_by == 'cpu': + # comp = get_float_comparator(11) + # #key = 11 + elif argv.sort_by == 'memory': + comp = get_float_key(13) + # key = 13 + elif argv.sort_by == 'tool': + # print('Getting string key accessor.') + comp = get_str_key(6) + reverse = False + # table.sort(key=lambda row: -1 if row[key] == '' else float(row[key]), reverse=True) + table.sort(key=comp, reverse=reverse) + + if markdown: + for row in table: + runtime = '' if len(row[10]) == 0 else f"{float(row[10]):4.1f}" + # cpu = '' if len(row[11]) == 0 else f"{float(row[11])/10**9:4.1f}" + memory = '' if len(row[13]) == 0 else f"{float(row[13])/GB:4.3f}" + # memory = float(row[13]) / GB + print( + f"| {row[0]} | {row[5].split(' ')[0]} |{row[2]} | {row[6]} | {row[7]} | {runtime} | {memory} |" + ) + else: + for row in table: + print(separator.join([str(x) for x in row])) + accept_metrics = [ 'galaxy_slots', @@ -177,7 +236,8 @@ def summarize(context: Context, args: list): 'runtime_seconds', 'cpuacct.usage', 'memory.limit_in_bytes', - 'memory.max_usage_in_bytes', + 'memory.peak' + #'memory.max_usage_in_bytes', ] # ,'memory.soft_limit_in_bytes'] diff --git a/abm/lib/folder.py b/abm/lib/folder.py index 06760a5..4c4abb8 100644 --- a/abm/lib/folder.py +++ b/abm/lib/folder.py @@ -3,7 +3,7 @@ from .common import Context, connect -def list(context: Context, args: list): +def do_list(context: Context, args: list): if len(args) == 0: print("ERROR: no library ID was provided") return diff --git a/abm/lib/helm.py b/abm/lib/helm.py index 52a6a60..b1cfa20 100644 --- a/abm/lib/helm.py +++ b/abm/lib/helm.py @@ -59,7 +59,7 @@ def update(context: Context, args: list): print('Waiting for the new deployments to come online') # Give kubernetes a moment to start processing the update. time.sleep(5) - wait_until_ready(namespace) + wait_until_ready(namespace, env) return True @@ -133,9 +133,9 @@ def wait_for(kubectl: str, namespace: str, name: str, env: dict): # wait_for(kubectl, namespace, 'galaxy-job', env) # wait_for(kubectl, namespace, 'galaxy-web', env) # wait_for(kubectl, namespace, 'galaxy-workflow', env) -def wait_until_ready(namespace: str): +def wait_until_ready(namespace: str, env: dict): kubectl = find_executable('kubectl') - data = run(f"{kubectl} get deployment -n {namespace} -o json") + data = run(f"{kubectl} get deployment -n {namespace} -o json", env) deployment_data = json.loads(data) deployments = list() for deployment in deployment_data['items']: @@ -146,7 +146,8 @@ def wait_until_ready(namespace: str): for deployment in deployments: print( run( - f"{kubectl} rollout status deployment -n {namespace} {deployment} --watch" + f"{kubectl} rollout status deployment -n {namespace} {deployment} --watch", + env, ) ) diff --git a/abm/lib/history.py b/abm/lib/history.py index 656fee0..bc78e95 100644 --- a/abm/lib/history.py +++ b/abm/lib/history.py @@ -1,3 +1,4 @@ +import argparse import json import os import sys @@ -7,13 +8,18 @@ import yaml from bioblend.galaxy.objects import GalaxyInstance -from lib.common import (Context, connect, find_history, parse_profile, - print_json, summarize_metrics) +from lib.common import (Context, connect, find_config, find_history, + get_float_key, get_str_key, parse_profile, print_json, + print_markdown_table, print_table_header, + summarize_metrics, try_for) # # History related functions # +# The number of times a failed job will be restarted. +RESTART_MAX = 3 + def longest_name(histories: list): longest = 0 @@ -190,55 +196,41 @@ def _import(context: Context, args: list): def himport(context: Context, args: list): - def error_message(msg='Invalid command'): - print(f"ERROR: {msg}") - print(f"USAGE: {sys.argv[0]} history import SERVER HISTORY_ID JEHA_ID") - print(f" {sys.argv[0]} history import http://GALAXY_SERVER_URL") - print(f" {sys.argv[0]} history import [dna|rna]") - - wait = True - if '-n' in args: - args.remove('-n') - wait = False - if '--no-wait' in args: - args.remove('--no-wait') - wait = False - - if len(args) == 1: - if 'http' in args[0]: - url = args[0] + parser = argparse.ArgumentParser() + parser.add_argument( + '-n', + '--no-wait', + action='store_true', + help='Do not wait for the import to complete', + default=False, + ) + parser.add_argument( + '-f', + '--file', + help='Use the specified histories.yml file', + required=False, + default=None, + ) + parser.add_argument('identifier', help='The history alias or URL to import') + argv = parser.parse_args(args) + + wait = not argv.no_wait + if argv.identifier.startswith('http'): + url = argv.identifier + else: + if argv.file is not None: + config = argv.file else: - datasets = None - config = f'{os.path.dirname(os.path.abspath(__file__))}/histories.yml' - # First load the histories.yml file that is pacakged with abm - if os.path.exists(config): - with open(config, 'r') as f: - datasets = yaml.safe_load(f) - # Then load the user histories.yml, if any - userfile = os.path.join(Path.home(), ".abm", "histories.yml") - if os.path.exists(userfile): - if datasets is None: - datasets = {} - with open(userfile, 'r') as f: - userdata = yaml.safe_load(f) - for key, item in userdata.items(): - datasets[key] = item - if datasets is None: - error_message("No history URLs have been configured.") - return - if not args[0] in datasets: - error_message('Please specify a URL or name of the history to import') - return - url = datasets[args[0]] - elif len(args) == 3: - server, key = parse_profile(args[0]) - if server is None: - error_message(f"Invalid server profile name: {args[0]}") + config = find_config("histories.yml") + if config is None: + print("ERROR: No histories.yml file found.") return - url = f"{server}history/export_archive?id={args[1]}&jeha_id={args[2]}" - else: - error_message() - return + with open(config, 'r') as f: + histories = yaml.safe_load(f) + if argv.identifier not in histories: + print(f"ERROR: No such history {argv.identifier}") + return + url = histories[argv.identifier] gi = connect(context) print(f"Importing history from {url}") @@ -339,13 +331,20 @@ def tag(context: Context, args: list): def summarize(context: Context, args: list): - if len(args) == 0: + parser = argparse.ArgumentParser() + parser.add_argument('id_list', nargs='+') + parser.add_argument('--markdown', action='store_true') + parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool']) + argv = parser.parse_args(args) + + if len(argv.id_list) == 0: print("ERROR: Provide one or more history ID values.") return gi = connect(context) all_jobs = [] - while len(args) > 0: - hid = find_history(gi, args.pop(0)) + id_list = argv.id_list + while len(id_list) > 0: + hid = find_history(gi, id_list.pop(0)) history = gi.histories.show_history(history_id=hid) jobs = gi.jobs.get_jobs(history_id=hid) for job in jobs: @@ -353,22 +352,25 @@ def summarize(context: Context, args: list): job['history_id'] = hid job['history_name'] = history['name'] job['workflow_id'] = '' - # if 'workflow_id' in invocation: - # job['workflow_id'] = invocation['workflow_id'] all_jobs.append(job) - # invocations = gi.invocations.get_invocations(history_id=hid) - # for invocation in invocations: - # id = invocation['id'] - # #jobs = gi.jobs.get_jobs(history_id=hid, invocation_id=id) - # jobs = gi.jobs.get_jobs(history_id=hid) - # for job in jobs: - # job['invocation_id'] = id - # job['history_id'] = hid - # if 'workflow_id' in invocation: - # job['workflow_id'] = invocation['workflow_id'] - # all_jobs.append(job) - # summarize_metrics(gi, gi.jobs.get_jobs(history_id=args[0])) - summarize_metrics(gi, all_jobs) + table = summarize_metrics(gi, all_jobs) + if argv.sort_by: + reverse = True + get_key = None + if argv.sort_by == 'runtime': + get_key = get_float_key(15) + elif argv.sort_by == 'memory': + get_key = get_float_key(11) + elif argv.sort_by == 'tool': + get_key = get_str_key(4) + reverse = False + table.sort(key=get_key, reverse=reverse) + if argv.markdown: + print_markdown_table(table) + else: + print_table_header() + for row in table: + print(','.join(row)) def wait(context: Context, args: list): @@ -385,15 +387,28 @@ def wait(context: Context, args: list): wait_for(gi, history_id) +def kill_all_jobs(gi: GalaxyInstance, job_list: list): + cancel_states = ['new', 'running', 'paused'] + for job in job_list: + if job['state'] in cancel_states: + print(f"Cancelling job {job['tool_id']}") + gi.jobs.cancel_job(job['id']) + else: + print( + f"Job {job['id']} for tool {job['tool_id']} is in state {job['state']}" + ) + + def wait_for(gi: GalaxyInstance, history_id: str): errored = [] waiting = True job_states = JobStates() + restart_counts = dict() while waiting: restart = [] status_counts = dict() terminal = 0 - job_list = gi.jobs.get_jobs(history_id=history_id) + job_list = try_for(lambda: gi.jobs.get_jobs(history_id=history_id)) for job in job_list: job_states.update(job) state = job['state'] @@ -409,9 +424,18 @@ def wait_for(gi: GalaxyInstance, history_id: str): elif state == 'error': terminal += 1 if id not in errored: - restart.append(id) + tool = job['tool_id'] + if tool in restart_counts: + restart_counts[tool] += 1 + else: + restart_counts[tool] = 1 + if restart_counts[tool] < RESTART_MAX: + restart.append(id) + else: + kill_all_jobs(gi, job_list) + waiting = False errored.append(id) - if len(restart) > 0: + if len(restart) > 0 and waiting: for job in restart: print(f"Restaring job {job}") try: @@ -427,9 +451,6 @@ def wait_for(gi: GalaxyInstance, history_id: str): waiting = False if waiting: time.sleep(30) - # elif state == 'paused': - # paused += 1 - # print(f"{job['id']}\t{job['state']}\t{job['update_time']}\t{job['tool_id']}") class JobStates: diff --git a/abm/lib/invocation.py b/abm/lib/invocation.py index 0e6807f..c76e200 100644 --- a/abm/lib/invocation.py +++ b/abm/lib/invocation.py @@ -1,4 +1,8 @@ -from common import Context, connect, print_json, summarize_metrics +import argparse + +from common import (Context, connect, get_float_key, get_str_key, print_json, + print_markdown_table, print_table_header, print_yaml, + summarize_metrics) def doList(context: Context, args: list): @@ -24,16 +28,44 @@ def doList(context: Context, args: list): print(f'{id}\t{state}\t{workflow}\t{history}') -def summarize(context: Context, args: list): +def show(context: Context, args: list): if len(args) == 0: - print("ERROR: Provide one or more invocation ID values.") + print("ERROR: no invocation ID was provided") return gi = connect(context) - id = args[0] + invocation = gi.invocations.show_invocation(args[0]) + print_yaml(invocation) + + +def summarize(context: Context, args: list): + parser = argparse.ArgumentParser() + parser.add_argument('id', nargs=1) + parser.add_argument('--markdown', action='store_true') + parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool']) + argv = parser.parse_args(args) + gi = connect(context) + id = argv.id[0] all_jobs = [] jobs = gi.jobs.get_jobs(invocation_id=id) for job in jobs: job['invocation_id'] = id job['workflow_id'] = '' all_jobs.append(job) - summarize_metrics(gi, all_jobs) + table = summarize_metrics(gi, all_jobs) + if argv.sort_by: + reverse = True + get_key = None + if argv.sort_by == 'runtime': + get_key = get_float_key(15) + elif argv.sort_by == 'memory': + get_key = get_float_key(11) + elif argv.sort_by == 'tool': + get_key = get_str_key(4) + reverse = False + table.sort(key=get_key, reverse=reverse) + if argv.markdown: + print_markdown_table(table) + else: + print_table_header() + for row in table: + print(','.join(row)) diff --git a/abm/lib/job.py b/abm/lib/job.py index c0b8ff5..92ae693 100644 --- a/abm/lib/job.py +++ b/abm/lib/job.py @@ -1,14 +1,15 @@ +import argparse +import datetime import json import logging import time -from pprint import pprint from .common import Context, connect, find_history, print_json log = logging.getLogger('abm') -def list(context: Context, args: list): +def do_list(context: Context, args: list): state = '' history_id = None log.debug('Processing args') @@ -54,18 +55,27 @@ def show(context: Context, args: list): def wait(context: Context, args: list): - if len(args) != 1: - print("ERROR: Invalid parameters. Job ID is required") - return + parser = argparse.ArgumentParser() + parser.add_argument('job_id') + parser.add_argument('-t', '--timeout', default=-1) + params = parser.parse_args(args) + timeout = params.timeout + job_id = params.job_id gi = connect(context) - state = "Unknown" + start_time = time.time() # we only interested in precision to the second waiting = True while waiting: - job = gi.jobs.show_job(args[0], full_details=False) + job = gi.jobs.show_job(job_id, full_details=False) + if job is None or len(job) == 0: + print(f"Job {job_id} not found.") + return state = job["state"] + if timeout > 0: + if time.time() - start_time > timeout: + waiting = False if state == "ok" or state == "error": waiting = False - else: + if waiting: time.sleep(15) print(json.dumps(job, indent=4)) diff --git a/abm/lib/library.py b/abm/lib/library.py index 8add917..4dbb610 100644 --- a/abm/lib/library.py +++ b/abm/lib/library.py @@ -3,7 +3,7 @@ from .common import Context, connect, datasets -def list(context: Context, args: list): +def do_list(context: Context, args: list): gi = connect(context) if len(args) == 0: for library in gi.libraries.get_libraries(): diff --git a/abm/lib/menu.yml b/abm/lib/menu.yml index 9aafa1f..eab2d00 100644 --- a/abm/lib/menu.yml +++ b/abm/lib/menu.yml @@ -26,18 +26,18 @@ menu: - name: ['upload', 'up'] handler: workflow.upload - params: PATH + params: PATH [-n|--no-tools] help: 'upload a workflow file to the server' - name: ['import', 'imp'] handler: workflow.import_from_config - params: NAME + params: NAME [-n|--no-tools] help: 'import a workflow defined in ~/.abm/workflows.yml' - name: ['download', 'dl'] handler: workflow.download help: 'download a workflow' params: ID PATH - name: ['list', 'ls'] - handler: workflow.list + handler: workflow.do_list help: 'list workflows available on the serer' - name: [show] handler: workflow.show @@ -69,8 +69,8 @@ handler: workflow.inputs - name: [summary, summarize] handler: workflow.summarize - help: generate a CSV with job metrics for all workflow runs - params: ID [ID ...] + help: generate a CSV or markdown table with job metrics for all workflow runs + params: "ID [ID ...] [--markdown] [-s|--sort-by (tool,runtime,memory)" - name: ['test'] handler: workflow.test help: run some test code @@ -81,7 +81,7 @@ menu: - name: ['upload', 'up'] handler: dataset.upload - params: PATH [-id HISTORY_ID | -c "History name"] + params: PATH [--history "History name_or_id" | -c|--create "History name"] [-m|--name "Dataset name"] help: upload a dataset to the server from the specified URL - name: ['download', 'dl'] handler: dataset.download @@ -89,10 +89,10 @@ help: download a dataset from the server - name: ['import', 'imp'] handler: dataset.import_from_config - params: KEY [--hs|--hist|--history HISTORY_ID | -c|--create "History name"] - help: imports a dataset to the server from a URL specified in the datasets.yml config file. + params: '[--hs|--hist|--history HISTORY_ID | -c|--create "History name"] [-m|--name "Dataset name"] KEY [KEY...]' + help: imports one or more datasets to the server from a URL specified in the datasets.yml config file. - name: ['list', 'ls'] - handler: dataset.list + handler: dataset.do_list help: lists all the datasets on the server - name: ['find'] handler: dataset.find @@ -158,8 +158,8 @@ help: show detailed information about a history - name: [summarize, summary, table] handler: history.summarize - params: "ID [ID...]" - help: Generate a CSV table with runtime metrics for all jobs in the history. + params: "ID [ID...] [--markdown] [-s|--sort-by (tool,runtime,memory)]" + help: Generate a CSV or markdown table with runtime metrics for all jobs in the history. - name: [publish, pub] handler: history.publish help: publish the given history @@ -191,7 +191,7 @@ menu: - name: [ list, ls ] help: list all jobs, or jobs in a particular state. Can filter by a history. - handler: job.list + handler: job.do_list params: "[-s|--state ok|running|error|waiting] [-h|--history historyID]" - name: [ show ] help: show detailed information about a job @@ -208,7 +208,7 @@ - name: [wait] help: Wait for a job to finish running handler: job.wait - params: ID + params: "ID [-T|--timeout SECONDS]" - name: [ metrics, stats ] help: display runtime metrics for the job, or a list of jobs contained in a history handler: job.metrics @@ -222,7 +222,7 @@ menu: - name: [list, ls] help: list all users on the Galaxy instance - handler: users.list + handler: users.do_list - name: [api_key, apikey, key] help: obtain the API key for the specified user handler: users.api_key @@ -244,13 +244,13 @@ standalone: true menu: - name: [run] - help: run all benchmarks in an experiment + help: run all benchmarks in an experiment. Use --run-number to specify staring counter. handler: experiment.run - params: PATH + params: "PATH [-r|--run-number N]" - name: [summarize, summary] - help: summarize metrics to a CSV or TSV file. + help: summarize metrics to a CSV, TSV or markdown file. handler: experiment.summarize - params: "[-c, --csv, -t, --tsv]" + params: "[-c, --csv, -t, --tsv, --markdown] [-s|--sort-by (tool,runtime,memory)]" - name: [test] help: playground code handler: experiment.test @@ -262,9 +262,13 @@ help: list all invocations. handler: invocation.doList params: "[-w|--workflow ID] [-h|--history ID]" - - name: [summarize] - help: generate a CSV of job metrics for an invocation + - name: [show] + help: display information about the workflow invocation params: ID + handler: invocation.show + - name: [summarize] + help: generate a CSV or markdown table of job metrics for an invocation + params: "ID [--markdown] [-s|--sort-by (tool, runtime, memory)]" handler: invocation.summarize - name: [helm] help: execute a helm command @@ -293,13 +297,13 @@ - name: [url] help: derive the URL to access this Galaxy instance handler: kubectl.url -- name: [config, conf, cfg] +- name: [config, configuration, conf, cfg] help: manage configuration profiles standalone: true menu: - name: [list, ls] help: list configured servers - handler: config.list + handler: config.do_list - name: [show, sh] help: disply URL, API key, and kube config for a specific cloud. handler: config.show @@ -337,7 +341,7 @@ standalone: true menu: - name: [list, ls] - handler: cloudlaunch.list + handler: cloudlaunch.do_list help: list deployments on all cloud providers - name: [create, launch, new] handler: cloudlaunch.create @@ -352,7 +356,7 @@ menu: - name: [list, ls] help: list all libraries on the server - handler: library.list + handler: library.do_list - name: [show] help: show detailed information about a library handler: library.show @@ -369,7 +373,7 @@ help: manage folders in data libraries menu: - name: [list, ls] - handler: folder.list + handler: folder.do_list help: list the folders in a data library params: LIBRARY_ID - name: [create, new] diff --git a/abm/lib/threads/Latch.py b/abm/lib/threads/Latch.py deleted file mode 100644 index 8db238d..0000000 --- a/abm/lib/threads/Latch.py +++ /dev/null @@ -1,20 +0,0 @@ -import threading - - -class CountdownLatch: - def __init__(self, count=1): - self.count = count - self.lock = threading.Condition - - def count_down(self, count=1): - self.lock.acquire(True) - self.count -= count - if self.count <= 0: - self.lock.notifyAll() - self.lock.release() - - def wait(self): - self.lock.acquire(True) - while self.count > 0: - self.lock.wait() - self.lock.release() diff --git a/abm/lib/users.py b/abm/lib/users.py index 74353fb..ceacd82 100644 --- a/abm/lib/users.py +++ b/abm/lib/users.py @@ -5,10 +5,8 @@ from common import Context, connect -def list(context: Context, args: list): - # TODO the master API key needs to be parameterized or specified in a config file. - context.API_KEY = "galaxypassword" - gi = connect(context) +def do_list(context: Context, args: list): + gi = connect(context, use_master_key=True) user_list = gi.users.get_users() pprint(user_list) @@ -22,9 +20,7 @@ def get_api_key(context: Context, args: list): print("ERROR: no user email given") return - # TODO the master API key needs to be parameterized or specified in a config file. - context.API_KEY = "galaxypassword" - gi = connect(context) + gi = connect(context, use_master_key=True) user_list = gi.users.get_users(f_email=args[0]) if user_list is None or len(user_list) == 0: print("WARNING: no such user") @@ -52,9 +48,7 @@ def create(context: Context, args: list): print(f"ERROR: {email} does not look like a valid email address") return - # TODO the master API key needs to be parameterized or specified in a config file. - context.API_KEY = "galaxypassword" - gi = connect(context) + gi = connect(context, use_master_key=True) user_record = gi.users.create_local_user(name, email, password) id = user_record['id'] key = gi.users.create_user_apikey(id) @@ -67,9 +61,7 @@ def show(context: Context, args: list): print("ERROR: no user email given") return - # TODO the master API key needs to be parameterized or specified in a config file. - context.API_KEY = "galaxypassword" - gi = connect(context) + gi = connect(context, use_master_key=True) id = _get_user_id(gi, args[0]) if id is None: return diff --git a/abm/lib/workflow.py b/abm/lib/workflow.py index 0ba36f7..e14bbf9 100644 --- a/abm/lib/workflow.py +++ b/abm/lib/workflow.py @@ -1,20 +1,22 @@ +import argparse import json import logging import os from pathlib import Path from pprint import pprint -import planemo import requests import yaml -from common import Context, connect, summarize_metrics +from common import (Context, connect, find_config, get_float_key, get_str_key, + print_markdown_table, print_table_header, + summarize_metrics) from planemo.galaxy.workflows import install_shed_repos from planemo.runnable import for_path, for_uri log = logging.getLogger('abm') -def list(context: Context, args: list): +def do_list(context: Context, args: list): gi = connect(context) workflows = gi.workflows.get_workflows(published=True) if len(workflows) == 0: @@ -34,30 +36,48 @@ def delete(context: Context, args: list): def upload(context: Context, args: list): - if len(args) == 0: - print('ERROR: no workflow file given') + path = None + install = True + for arg in args: + if arg in ['-n', '--no-tools']: + print("Skipping tools") + install = False + else: + path = arg + if path is None: + print("ERROR: no workflow given") return - path = args[0] + if path.startswith('http'): import_from_url(context, args) return if not os.path.exists(path): print(f'ERROR: file not found: {path}') return + print("Uploading workflow") gi = connect(context) print("Importing the workflow") pprint(gi.workflows.import_workflow_from_local_path(path, publish=True)) runnable = for_path(path) - print("Installing tools") - result = install_shed_repos(runnable, gi, False) - pprint(result) + if install: + print("Installing tools") + result = install_shed_repos(runnable, gi, False) + pprint(result) def import_from_url(context: Context, args: list): - if len(args) == 0: - print("ERROR: no workflow URL given") + print("Importing workflow from URL") + url = None + install = True + for arg in args: + if arg in ['-n', '--no-tools']: + print("Skipping tools") + install = False + else: + url = arg + if url is None: + print("ERROR: no URL given") return - url = args[0] # There is a bug in ephemeris (for lack of a better term) that assumes all # Runnable objects can be found on the local file system @@ -81,12 +101,6 @@ def import_from_url(context: Context, args: list): input_text = response.text with open(cached_file, 'w') as f: f.write(input_text) - - # response = requests.get(url) - # if (response.status_code != 200): - # print(f"ERROR: There was a problem downloading the workflow: {response.status_code}") - # print(response.reason) - # return try: workflow = json.loads(input_text) except Exception as e: @@ -98,30 +112,45 @@ def import_from_url(context: Context, args: list): result = gi.workflows.import_workflow_dict(workflow, publish=True) print(json.dumps(result, indent=4)) runnable = for_path(cached_file) - # runnable = for_uri(url) - print("Installing tools") - result = install_shed_repos(runnable, gi, False, install_tool_dependencies=True) - pprint(result) + if install: + print("Installing tools") + result = install_shed_repos(runnable, gi, False, install_tool_dependencies=True) + pprint(result) def import_from_config(context: Context, args: list): - if len(args) == 0: + key = None + install = True + config = None + for arg in args: + if arg in ['-n', '--no-tools']: + print("Skipping tools") + install = False + elif arg in ['-f', '--file']: + config = arg + else: + key = arg + if key is None: print("ERROR: no workflow ID given") return - key = args[0] - userfile = os.path.join(Path.home(), ".abm", "workflows.yml") - if not os.path.exists(userfile): + + if config is None: + config = find_config("workflows.yml") + if config is None: print("ERROR: this instance has not been configured to import workflows.") - print(f"Please configure {userfile} to enable workflow imports") + print(f"Please configure a workflows.yml file to enable imports") return - with open(userfile, 'r') as f: + with open(config, 'r') as f: workflows = yaml.safe_load(f) if not key in workflows: print(f"ERROR: no such workflow: {key}") return url = workflows[key] - import_from_url(context, [url]) + argv = [url] + if not install: + argv.append('-n') + import_from_url(context, argv) def download(context: Context, args: list): @@ -164,30 +193,21 @@ def invocation(context: Context, args: list): print("ERROR: Invalid paramaeters. A workflow ID invocation ID are required") return workflow_id = None - invocation_id = None while len(args) > 0: arg = args.pop(0) if arg in ['-w', '--work', '--workflow']: print("Setting workflow id") workflow_id = args.pop(0) - # elif arg in ['-i', '--invoke', '--invocation']: - # invocation_id = args.pop(0) - # print("Setting invocation id") else: print(f'Invalid parameter: "{arg}') return if workflow_id is None: print("ERROR: No workflow ID provided") return - # if invocation_id is None: - # print("ERROR: No invocation ID provided") - # return gi = connect(context) - # result = gi.workflows.show_invocation(workflow_id, invocation_id) invocations = gi.invocations.get_invocations( workflow_id=workflow_id, view='element', step_details=True ) - # print(json.dumps(result, indent=4)) print('ID\tState\tWorkflow\tHistory') for invocation in invocations: id = invocation['id'] @@ -236,11 +256,13 @@ def rename(context: Context, args: list): def summarize(context: Context, args: list): - if len(args) == 0: - print("ERROR: Provide one or more workflow ID values.") - return + parser = argparse.ArgumentParser() + parser.add_argument('id', nargs=1) + parser.add_argument('--markdown', action='store_true') + parser.add_argument('-s', '--sort-by', choices=['runtime', 'memory', 'tool']) + argv = parser.parse_args(args) gi = connect(context) - wid = args[0] + wid = argv.id all_jobs = [] invocations = gi.invocations.get_invocations(workflow_id=wid) for invocation in invocations: @@ -250,4 +272,21 @@ def summarize(context: Context, args: list): job['invocation_id'] = id job['workflow_id'] = wid all_jobs.append(job) - summarize_metrics(gi, all_jobs) + table = summarize_metrics(gi, all_jobs) + if argv.sort_by: + reverse = True + get_key = None + if argv.sort_by == 'runtime': + get_key = get_float_key(15) + elif argv.sort_by == 'memory': + get_key = get_float_key(11) + elif argv.sort_by == 'tool': + get_key = get_str_key(4) + reverse = False + table.sort(key=get_key, reverse=reverse) + if argv.markdown: + print_markdown_table(table) + else: + print_table_header() + for row in table: + print(','.join(row)) diff --git a/bootstrap-config/test.yaml b/bootstrap-config/test.yaml deleted file mode 100644 index bf7822a..0000000 --- a/bootstrap-config/test.yaml +++ /dev/null @@ -1,11 +0,0 @@ -name: Benchmarking DNA -runs: 3 -workflow_conf: - - config/test.yml - - config/dna-named.yml - - config/rna-named.yml -cloud: - - iu2 -job_configs: - - rules/4x8.yml - - rules/8x16.yml \ No newline at end of file diff --git a/bump b/bump index 87bac5a..2da9275 100755 --- a/bump +++ b/bump @@ -19,23 +19,20 @@ def main(): with open(VERSION_FILE, 'r') as f: version_string = f.read().strip() - - parts = version_string.split('.') + + prefix = version_string + suffix = None + release = None + build = None + if '-' in version_string: + # This is a development build + prefix, suffix = version_string.split('-') + release,build = suffix.split('.') + build = int(build) + parts = prefix.split('.') major = int(parts[0]) minor = int(parts[1]) - release = None - if '-' in parts[2]: - revs = parts[2].split('-') - revision = int(revs[0]) - if 'dev' in revs[1]: - release = 'dev' - build = int(revs[1].replace('dev', '')) - elif 'rc' in revs[1]: - release = 'rc' - build = int(revs[1].replace('rc', '')) - else: - revision = int(parts[2]) - build = None + revision = int(parts[2]) if sys.argv[1] in ['major', 'minor', 'revision'] and release is not None: print(f"ERROR: Cannot bump the {sys.argv[1]} version for a development build") @@ -75,7 +72,7 @@ def main(): if build is None: version_string = f"{major}.{minor}.{revision}" else: - version_string = f"{major}.{minor}.{revision}-{release}{build}" + version_string = f"{major}.{minor}.{revision}-{release}.{build}" with open(VERSION_FILE, 'w') as f: f.write(version_string) diff --git a/requirements.txt b/requirements.txt index 7284f55..bc7653c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ bioblend pyyaml planemo -cloudlaunch-cli \ No newline at end of file +cloudlaunch-cli diff --git a/rules/default.yml b/rules/default.yml deleted file mode 100644 index c12ead7..0000000 --- a/rules/default.yml +++ /dev/null @@ -1,106 +0,0 @@ -mappings: - summary_stats: - tool_ids: - - Summary_Statistics1 - docker_container_id_override: cloudve/gsummary:latest - resource_set: small - sam_fasta_dm: - tool_ids: - - toolshed.g2.bx.psu.edu/repos/devteam/data_manager_sam_fasta_index_builder/sam_fasta_index_builder/.* - docker_container_id_override: cloudve/sam-fasta-dm:latest - resource_set: small - bwa_dm: - tool_ids: - - toolshed.g2.bx.psu.edu/repos/devteam/data_manager_bwa_mem_index_builder/bwa_mem_index_builder_data_manager/.* - docker_container_id_override: cloudve/bwa-dm:latest - resource_set: small - prokka: - tool_ids: - - toolshed.g2.bx.psu.edu/repos/crs4/prokka/prokka/1.14.5 - docker_container_id_override: cloudve/prokka:1.14.5 - jbrowse: - tool_ids: - - toolshed.g2.bx.psu.edu/repos/iuc/jbrowse/jbrowse/1.16.5+galaxy6 - docker_container_id_override: cloudve/jbrowse:1.16.5 - lib_galaxy: - tool_ids: - - sort1 - - Grouping1 - docker_container_id_override: galaxy/galaxy-min:21.05 - resource_set: small - set_medium: - tool_ids: - - toolshed.g2.bx.psu.edu/repos/devteam/bowtie2/bowtie2/.* - - toolshed.g2.bx.psu.edu/repos/iuc/bwameth/bwameth/.* - - toolshed.g2.bx.psu.edu/repos/iuc/featurecounts/featurecounts/.* - - toolshed.g2.bx.psu.edu/repos/iuc/hisat2/hisat2/.* - - toolshed.g2.bx.psu.edu/repos/iuc/valet/valet/.* - - toolshed.g2.bx.psu.edu/repos/iuc/varscan_somatic/varscan_somatic/.* - - toolshed.g2.bx.psu.edu/repos/nilesh/rseqc/rseqc_bam2wig/.* - resource_set: medium - set_large: - tool_ids: - - toolshed.g2.bx.psu.edu/repos/devteam/bwa/bwa_mem/.* - - toolshed.g2.bx.psu.edu/repos/devteam/bwa/bwa/.* - - toolshed.g2.bx.psu.edu/repos/bgruening/deeptools_bam_compare/deeptools_bam_compare/.* - - toolshed.g2.bx.psu.edu/repos/bgruening/deeptools_bam_coverage/deeptools_bam_coverage/.* - - toolshed.g2.bx.psu.edu/repos/bgruening/deeptools_bam_pe_fragmentsize/deeptools_bam_pe_fragmentsize/.* - - toolshed.g2.bx.psu.edu/repos/bgruening/deeptools_bigwig_compare/deeptools_bigwig_compare/.* - - toolshed.g2.bx.psu.edu/repos/bgruening/deeptools_compute_gc_bias/deeptools_compute_gc_bias/.* - - toolshed.g2.bx.psu.edu/repos/bgruening/deeptools_compute_matrix/deeptools_compute_matrix/.* - - toolshed.g2.bx.psu.edu/repos/bgruening/deeptools_correct_gc_bias/deeptools_correct_gc_bias/.* - - toolshed.g2.bx.psu.edu/repos/bgruening/deeptools_multi_bam_summary/deeptools_multi_bam_summary/.* - - toolshed.g2.bx.psu.edu/repos/bgruening/deeptools_multi_bigwig_summary/deeptools_multi_bigwig_summary/.* - - toolshed.g2.bx.psu.edu/repos/devteam/freebayes/freebayes/.* - - toolshed.g2.bx.psu.edu/repos/iuc/rgrnastar/rna_star/.* - - toolshed.g2.bx.psu.edu/repos/iuc/rnaspades/rnaspades/.* - - toolshed.g2.bx.psu.edu/repos/iuc/sra_tools/fasterq_dump/.* - resource_set: large - set_2xlarge: - tool_ids: - - toolshed.g2.bx.psu.edu/repos/iuc/unicycler/unicycler/.* - - toolshed.g2.bx.psu.edu/repos/nml/spades/spades/.* - resource_set: 2xlarge - set_mlarge: - tool_ids: - - toolshed.g2.bx.psu.edu/repos/iuc/minimap2/minimap2/.* - - toolshed.g2.bx.psu.edu/repos/iuc/plink/plink/.* - resource_set: mlarge -resources: - resource_sets: - small: - requests: - cpu: 1 - memory: 2G - limits: - cpu: 2 - memory: 5G - medium: - requests: - cpu: 2 - memory: 4G - limits: - cpu: 4 - memory: 10G - large: - requests: - cpu: 4 - memory: 8G - limits: - cpu: 8 - memory: 16G - 2xlarge: - requests: - cpu: 12 - memory: 20G - limits: - cpu: 12 - memory: 24G - mlarge: - requests: - cpu: 2 - memory: 16G - limits: - cpu: 4 - memory: 20G - default_resource_set: small diff --git a/benchmarks/dna-named-2.yml b/samples/benchmarks/dna-named.yml similarity index 100% rename from benchmarks/dna-named-2.yml rename to samples/benchmarks/dna-named.yml diff --git a/benchmarks/example.yml b/samples/benchmarks/example.yml similarity index 100% rename from benchmarks/example.yml rename to samples/benchmarks/example.yml diff --git a/samples/benchmarks/rna-named.yml b/samples/benchmarks/rna-named.yml new file mode 100644 index 0000000..b94c0e0 --- /dev/null +++ b/samples/benchmarks/rna-named.yml @@ -0,0 +1,14 @@ +- workflow_id: d6d3c2119c4849e4 + output_history_base_name: RNA-seq + reference_data: + - name: Reference Transcript (FASTA) + dataset_id: 50a269b7a99356aa + runs: + - history_name: 1 + inputs: + - name: FASTQ RNA Dataset + dataset_id: 28fa757e56346a34 + - history_name: 2 + inputs: + - name: FASTQ RNA Dataset + dataset_id: 1faa2d3b2ed5c436 diff --git a/samples/benchmarks/rules/4x8.yml b/samples/benchmarks/rules/4x8.yml new file mode 100644 index 0000000..e69de29 diff --git a/samples/benchmarks/rules/8x16.yml b/samples/benchmarks/rules/8x16.yml new file mode 100644 index 0000000..e69de29 diff --git a/samples/experiment.yaml b/samples/experiment.yaml new file mode 100644 index 0000000..b4874f0 --- /dev/null +++ b/samples/experiment.yaml @@ -0,0 +1,11 @@ +name: Benchmarking DNA +runs: 3 +workflow_conf: + - benchmarks/example.yml + - benchmarks/dna-named.yml + - benchmarks/rna-named.yml +cloud: + - iu2 +job_configs: + - rules/4x8.yml + - rules/8x16.yml