diff --git a/abm/lib/benchmark.py b/abm/lib/benchmark.py index 7126101..ec9deeb 100644 --- a/abm/lib/benchmark.py +++ b/abm/lib/benchmark.py @@ -5,8 +5,8 @@ import logging import argparse from lib import Keys, INVOCATIONS_DIR, METRICS_DIR -from lib.common import connect, Context, print_json -from bioblend.galaxy import GalaxyInstance +from lib.common import connect, Context, _get_dataset_data, _make_dataset_element, print_json +from bioblend.galaxy import GalaxyInstance, dataset_collections log = logging.getLogger('abm') @@ -56,7 +56,6 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): if os.path.exists(METRICS_DIR): if not os.path.isdir(METRICS_DIR): print('ERROR: Can not save metrics, directory name in use.') - #sys.exit(1) return False else: os.mkdir(METRICS_DIR) @@ -110,10 +109,18 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): count = 0 for run in workflow[Keys.RUNS]: count += 1 + + # Create a new history for this run if Keys.HISTORY_NAME in run: output_history_name = f"{history_base_name} {run[Keys.HISTORY_NAME]}" else: output_history_name = f"{history_base_name} run {count}" + new_history_name = output_history_name + if history_prefix is not None: + new_history_name = f"{history_prefix} {output_history_name}" + if experiment is not None: + new_history_name = f"{experiment} {new_history_name}" + input_data_size = [] if Keys.INPUTS in run and run[Keys.INPUTS] is not None: for spec in run[Keys.INPUTS]: @@ -138,6 +145,49 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): input_data_size.append(dssize) print(f"Input dataset ID: {dsname} [{dsid}] {dssize}") inputs[input[0]] = {'id': dsid, 'src': 'hdca', 'size': dssize} + elif 'paired' in spec: + name = spec['name'] + input_names.append(name) + dsdata = _get_dataset_data(gi, name) + if dsdata is not None: + print(f"Found an existing dataset named {name}") + print_json(dsdata) + # Reuse the previously defined collection + dsid = dsdata['id'] + dssize = dsdata['size'] + input_data_size.append(dssize) + print(f"Input dataset ID: {name} [{dsid}] {dssize}") + inputs[input[0]] = {'id': dsid, 'src': 'hdca', 'size': dssize} + else: + histories = gi.histories.get_histories(name=spec['history']) + if len(histories) == 0: + print(f"ERROR: History {spec['history']} not foune") + return + hid = histories[0]['id'] + pairs = 0 + paired_list = spec['paired'] + for item in paired_list: + elements = [] + size = 0 + for key in item.keys(): + #print(f"Getting dataset for {key} = {item[key]}") + value = _get_dataset_data(gi, item[key]) + size += value['size'] + elements.append(_make_dataset_element(key, value['id'])) + description = dataset_collections.CollectionDescription( + name=name, + type='paired', + elements=elements + ) + pairs += 1 + # print(json.dumps(description.__dict__, indent=4)) + # pprint(description) + collection = gi.histories.create_dataset_collection( + history_id=hid, + collection_description=description + ) + print(f"Input dataset paired list: {collection['id']} {size}") + inputs[input[0]] = {'id': collection['id'], 'src':'hdca', 'size':size} elif Keys.DATASET_ID in spec: dsname = spec[Keys.DATASET_ID] input_names.append(dsname) @@ -154,11 +204,6 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str): else: raise Exception(f'Invalid input value') print(f"Running workflow {wfid}") - new_history_name = output_history_name - if history_prefix is not None: - new_history_name = f"{history_prefix} {output_history_name}" - if experiment is not None: - new_history_name = f"{experiment} {new_history_name}" invocation = gi.workflows.invoke_workflow(wfid, inputs=inputs, history_name=new_history_name) id = invocation['id'] invocations = gi.invocations.wait_for_invocation(id, 86400, 10, False) @@ -417,46 +462,6 @@ def find_dataset_id(gi, name_or_id): return None -def _get_dataset_data(gi, name_or_id): - def make_result(data): - return { - 'id': data['id'], - 'size': data['file_size'] - } - - try: - ds = gi.datasets.show_dataset(name_or_id) - return make_result(ds) - except Exception as e: - pass - - try: - datasets = gi.datasets.get_datasets(name=name_or_id) # , deleted=True, purged=True) - for ds in datasets: - print_json(ds) - state = True - if 'state' in ds: - state = ds['state'] == 'ok' - if state and not ds['deleted'] and ds['visible']: - # The dict returned by get_datasets does not include the input - # file sizes so we need to make another call to show_datasets - return make_result(gi.datasets.show_dataset(ds['id'])) - # if ds['state'] == 'ok': - # print('state is ok') - # if ds['deleted']: - # print('dataset deleted') - # else: - # print('dataset not deleted') - # if ds['visible']: - # print('dataset visible') - # else: - # print('dataset not visible') - except Exception as e: - pass - - return None - - from pprint import pprint def test(context:Context, args:list): id = 'c90fffcf98b31cd3' diff --git a/abm/lib/common.py b/abm/lib/common.py index f8e83bc..79bedfb 100644 --- a/abm/lib/common.py +++ b/abm/lib/common.py @@ -4,6 +4,8 @@ from ruamel.yaml import YAML import json import bioblend.galaxy +from bioblend.galaxy import dataset_collections + import lib PROFILE_SEARCH_PATH = ['~/.abm/profile.yml', '.abm-profile.yml'] @@ -246,4 +248,42 @@ def get_keys(d: dict): for key in d.keys(): result.append(key) result.sort() - return result \ No newline at end of file + return result + + +def _get_dataset_data(gi, name_or_id): + def make_result(data): + return { + 'id': data['id'], + 'size': data['file_size'], + 'history': data['history_id'] + } + + try: + ds = gi.datasets.show_dataset(name_or_id) + return make_result(ds) + except Exception as e: + pass + + try: + datasets = gi.datasets.get_datasets(name=name_or_id) # , deleted=True, purged=True) + for ds in datasets: + # print_json(ds) + state = True + if 'state' in ds: + state = ds['state'] == 'ok' + if state and not ds['deleted'] and ds['visible']: + # The dict returned by get_datasets does not include the input + # file sizes so we need to make another call to show_datasets + return make_result(gi.datasets.show_dataset(ds['id'])) + except Exception as e: + pass + + return None + + +def _make_dataset_element(name, value): + # print(f"Making dataset element for {name} = {value}({type(value)})") + return dataset_collections.HistoryDatasetElement(name=name, id=value) + + diff --git a/abm/lib/dataset.py b/abm/lib/dataset.py index fc51d8e..ba9a987 100644 --- a/abm/lib/dataset.py +++ b/abm/lib/dataset.py @@ -1,6 +1,7 @@ import json -from common import connect, Context +from bioblend.galaxy import dataset_collections +from common import connect, Context, print_json, _get_dataset_data, _make_dataset_element from pprint import pprint from pathlib import Path @@ -104,6 +105,45 @@ def upload(context: Context, args: list): _import_from_url(gi, history, url) +def collection(context: Context, args: list): + type = 'list:paired' + collection_name = 'collection' + elements = [] + hid = None + gi = connect(context) + while len(args) > 0: + arg = args.pop(0) + if arg == '-t' or arg == '--type': + type = args.pop(0) + elif arg == '-n' or arg == '--name': + collection_name = args.pop(0) + elif '=' in arg: + name,value = arg.split('=') + dataset = _get_dataset_data(gi, value) + if dataset is None: + print(f"ERROR: dataset not found {value}") + return + # print_json(dataset) + if hid is None: + hid = dataset['history'] + elif hid != dataset['history']: + print('ERROR: Datasets must be in the same history') + elements.append(_make_dataset_element(name, dataset['id'])) + + if len(elements) == 0: + print("ERROR: No dataset elements have been defined for the collection") + return + result = gi.histories.create_dataset_collection( + history_id=hid, + collection_description=dataset_collections.CollectionDescription( + name=collection_name, + type=type, + elements=elements + ) + ) + print(json.dumps(result, indent=4)) + + def import_from_config(context: Context, args: list): gi = None key = None diff --git a/abm/lib/menu.yml b/abm/lib/menu.yml index 347595e..4c79b18 100644 --- a/abm/lib/menu.yml +++ b/abm/lib/menu.yml @@ -114,6 +114,10 @@ handler: dataset.rename help: rename a dataset params: HISTORY_ID DATASET_ID NAME + - name: [collection, collect] + handler: dataset.collection + help: Create a dataset collection. Only list:paired is currently supported. + params: "[-n|--name NAME] [-t|--type TYPE] key1=dataset_id [key2=dataset_id...]" - name: - history - hist