Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paired Lists as inputs #212

Merged
merged 3 commits into from
Jun 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 53 additions & 48 deletions abm/lib/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import logging
import argparse
from lib import Keys, INVOCATIONS_DIR, METRICS_DIR
from lib.common import connect, Context, print_json
from bioblend.galaxy import GalaxyInstance
from lib.common import connect, Context, _get_dataset_data, _make_dataset_element, print_json
from bioblend.galaxy import GalaxyInstance, dataset_collections

log = logging.getLogger('abm')

Expand Down Expand Up @@ -56,7 +56,6 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
if os.path.exists(METRICS_DIR):
if not os.path.isdir(METRICS_DIR):
print('ERROR: Can not save metrics, directory name in use.')
#sys.exit(1)
return False
else:
os.mkdir(METRICS_DIR)
Expand Down Expand Up @@ -110,10 +109,18 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
count = 0
for run in workflow[Keys.RUNS]:
count += 1

# Create a new history for this run
if Keys.HISTORY_NAME in run:
output_history_name = f"{history_base_name} {run[Keys.HISTORY_NAME]}"
else:
output_history_name = f"{history_base_name} run {count}"
new_history_name = output_history_name
if history_prefix is not None:
new_history_name = f"{history_prefix} {output_history_name}"
if experiment is not None:
new_history_name = f"{experiment} {new_history_name}"

input_data_size = []
if Keys.INPUTS in run and run[Keys.INPUTS] is not None:
for spec in run[Keys.INPUTS]:
Expand All @@ -138,6 +145,49 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
input_data_size.append(dssize)
print(f"Input dataset ID: {dsname} [{dsid}] {dssize}")
inputs[input[0]] = {'id': dsid, 'src': 'hdca', 'size': dssize}
elif 'paired' in spec:
name = spec['name']
input_names.append(name)
dsdata = _get_dataset_data(gi, name)
if dsdata is not None:
print(f"Found an existing dataset named {name}")
print_json(dsdata)
# Reuse the previously defined collection
dsid = dsdata['id']
dssize = dsdata['size']
input_data_size.append(dssize)
print(f"Input dataset ID: {name} [{dsid}] {dssize}")
inputs[input[0]] = {'id': dsid, 'src': 'hdca', 'size': dssize}
else:
histories = gi.histories.get_histories(name=spec['history'])
if len(histories) == 0:
print(f"ERROR: History {spec['history']} not foune")
return
hid = histories[0]['id']
pairs = 0
paired_list = spec['paired']
for item in paired_list:
elements = []
size = 0
for key in item.keys():
#print(f"Getting dataset for {key} = {item[key]}")
value = _get_dataset_data(gi, item[key])
size += value['size']
elements.append(_make_dataset_element(key, value['id']))
description = dataset_collections.CollectionDescription(
name=name,
type='paired',
elements=elements
)
pairs += 1
# print(json.dumps(description.__dict__, indent=4))
# pprint(description)
collection = gi.histories.create_dataset_collection(
history_id=hid,
collection_description=description
)
print(f"Input dataset paired list: {collection['id']} {size}")
inputs[input[0]] = {'id': collection['id'], 'src':'hdca', 'size':size}
elif Keys.DATASET_ID in spec:
dsname = spec[Keys.DATASET_ID]
input_names.append(dsname)
Expand All @@ -154,11 +204,6 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
else:
raise Exception(f'Invalid input value')
print(f"Running workflow {wfid}")
new_history_name = output_history_name
if history_prefix is not None:
new_history_name = f"{history_prefix} {output_history_name}"
if experiment is not None:
new_history_name = f"{experiment} {new_history_name}"
invocation = gi.workflows.invoke_workflow(wfid, inputs=inputs, history_name=new_history_name)
id = invocation['id']
invocations = gi.invocations.wait_for_invocation(id, 86400, 10, False)
Expand Down Expand Up @@ -417,46 +462,6 @@ def find_dataset_id(gi, name_or_id):
return None


def _get_dataset_data(gi, name_or_id):
def make_result(data):
return {
'id': data['id'],
'size': data['file_size']
}

try:
ds = gi.datasets.show_dataset(name_or_id)
return make_result(ds)
except Exception as e:
pass

try:
datasets = gi.datasets.get_datasets(name=name_or_id) # , deleted=True, purged=True)
for ds in datasets:
print_json(ds)
state = True
if 'state' in ds:
state = ds['state'] == 'ok'
if state and not ds['deleted'] and ds['visible']:
# The dict returned by get_datasets does not include the input
# file sizes so we need to make another call to show_datasets
return make_result(gi.datasets.show_dataset(ds['id']))
# if ds['state'] == 'ok':
# print('state is ok')
# if ds['deleted']:
# print('dataset deleted')
# else:
# print('dataset not deleted')
# if ds['visible']:
# print('dataset visible')
# else:
# print('dataset not visible')
except Exception as e:
pass

return None


from pprint import pprint
def test(context:Context, args:list):
id = 'c90fffcf98b31cd3'
Expand Down
42 changes: 41 additions & 1 deletion abm/lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from ruamel.yaml import YAML
import json
import bioblend.galaxy
from bioblend.galaxy import dataset_collections

import lib

PROFILE_SEARCH_PATH = ['~/.abm/profile.yml', '.abm-profile.yml']
Expand Down Expand Up @@ -246,4 +248,42 @@ def get_keys(d: dict):
for key in d.keys():
result.append(key)
result.sort()
return result
return result


def _get_dataset_data(gi, name_or_id):
def make_result(data):
return {
'id': data['id'],
'size': data['file_size'],
'history': data['history_id']
}

try:
ds = gi.datasets.show_dataset(name_or_id)
return make_result(ds)
except Exception as e:
pass

try:
datasets = gi.datasets.get_datasets(name=name_or_id) # , deleted=True, purged=True)
for ds in datasets:
# print_json(ds)
state = True
if 'state' in ds:
state = ds['state'] == 'ok'
if state and not ds['deleted'] and ds['visible']:
# The dict returned by get_datasets does not include the input
# file sizes so we need to make another call to show_datasets
return make_result(gi.datasets.show_dataset(ds['id']))
except Exception as e:
pass

return None


def _make_dataset_element(name, value):
# print(f"Making dataset element for {name} = {value}({type(value)})")
return dataset_collections.HistoryDatasetElement(name=name, id=value)


42 changes: 41 additions & 1 deletion abm/lib/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

from common import connect, Context
from bioblend.galaxy import dataset_collections
from common import connect, Context, print_json, _get_dataset_data, _make_dataset_element
from pprint import pprint
from pathlib import Path

Expand Down Expand Up @@ -104,6 +105,45 @@ def upload(context: Context, args: list):
_import_from_url(gi, history, url)


def collection(context: Context, args: list):
type = 'list:paired'
collection_name = 'collection'
elements = []
hid = None
gi = connect(context)
while len(args) > 0:
arg = args.pop(0)
if arg == '-t' or arg == '--type':
type = args.pop(0)
elif arg == '-n' or arg == '--name':
collection_name = args.pop(0)
elif '=' in arg:
name,value = arg.split('=')
dataset = _get_dataset_data(gi, value)
if dataset is None:
print(f"ERROR: dataset not found {value}")
return
# print_json(dataset)
if hid is None:
hid = dataset['history']
elif hid != dataset['history']:
print('ERROR: Datasets must be in the same history')
elements.append(_make_dataset_element(name, dataset['id']))

if len(elements) == 0:
print("ERROR: No dataset elements have been defined for the collection")
return
result = gi.histories.create_dataset_collection(
history_id=hid,
collection_description=dataset_collections.CollectionDescription(
name=collection_name,
type=type,
elements=elements
)
)
print(json.dumps(result, indent=4))


def import_from_config(context: Context, args: list):
gi = None
key = None
Expand Down
4 changes: 4 additions & 0 deletions abm/lib/menu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@
handler: dataset.rename
help: rename a dataset
params: HISTORY_ID DATASET_ID NAME
- name: [collection, collect]
handler: dataset.collection
help: Create a dataset collection. Only list:paired is currently supported.
params: "[-n|--name NAME] [-t|--type TYPE] key1=dataset_id [key2=dataset_id...]"
- name:
- history
- hist
Expand Down