Skip to content

Commit

Permalink
Merge pull request #187 from galaxyproject/dev
Browse files Browse the repository at this point in the history
Release 2.3.0
  • Loading branch information
ksuderman authored Apr 27, 2023
2 parents 19f901e + 8d183ad commit d749a40
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 40 deletions.
2 changes: 1 addition & 1 deletion abm/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.2.0
2.3.0
71 changes: 55 additions & 16 deletions abm/lib/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import argparse
from lib import Keys, INVOCATIONS_DIR, METRICS_DIR
from lib.common import connect, Context
from lib.common import connect, Context, print_json
from bioblend.galaxy import GalaxyInstance

log = logging.getLogger('abm')
Expand Down Expand Up @@ -122,19 +122,37 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
print(f'ERROR: Invalid input specification for {spec[Keys.NAME]}')
return False

dsname = spec[Keys.DATASET_ID]
input_names.append(dsname)
#inputs.append(dsname)
# dsid = find_dataset_id(gi, dsname)
dsdata = _get_dataset_data(gi, dsname)
if dsdata is None:
raise Exception(f"ERROR: unable to resolve {dsname} to a dataset.")
dsid = dsdata['id']
dssize = dsdata['size']
input_data_size.append(dssize)
print(f"Input dataset ID: {dsname} [{dsid}] {dssize}")
inputs[input[0]] = {'id': dsid, 'src': 'hda', 'size': dssize}

if 'value' in spec:
inputs[input[0]] = spec['value']
print(f"Input data value: {spec['value']}")
elif 'collection' in spec:
dsname = spec['collection']
input_names.append(dsname)
#inputs.append(dsname)
# dsid = find_dataset_id(gi, dsname)
dsdata = _get_dataset_data(gi, dsname)
if dsdata is None:
raise Exception(f"ERROR: unable to resolve {dsname} to a dataset.")
dsid = dsdata['id']
dssize = dsdata['size']
input_data_size.append(dssize)
print(f"Input dataset ID: {dsname} [{dsid}] {dssize}")
inputs[input[0]] = {'id': dsid, 'src': 'hdca', 'size': dssize}
elif Keys.DATASET_ID in spec:
dsname = spec[Keys.DATASET_ID]
input_names.append(dsname)
#inputs.append(dsname)
# dsid = find_dataset_id(gi, dsname)
dsdata = _get_dataset_data(gi, dsname)
if dsdata is None:
raise Exception(f"ERROR: unable to resolve {dsname} to a dataset.")
dsid = dsdata['id']
dssize = dsdata['size']
input_data_size.append(dssize)
print(f"Input dataset ID: {dsname} [{dsid}] {dssize}")
inputs[input[0]] = {'id': dsid, 'src': 'hda', 'size': dssize}
else:
raise Exception(f'Invalid input value')
print(f"Running workflow {wfid}")
new_history_name = output_history_name
if history_prefix is not None:
Expand Down Expand Up @@ -415,12 +433,33 @@ def make_result(data):
try:
datasets = gi.datasets.get_datasets(name=name_or_id) # , deleted=True, purged=True)
for ds in datasets:
if ds['state'] == 'ok' and not ds['deleted'] and ds['visible']:
print_json(ds)
state = True
if 'state' in ds:
state = ds['state'] == 'ok'
if state and not ds['deleted'] and ds['visible']:
# The dict returned by get_datasets does not include the input
# file sizes so we need to make another call to show_datasets
return make_result(gi.datasets.show_dataset(ds['id']))
# if ds['state'] == 'ok':
# print('state is ok')
# if ds['deleted']:
# print('dataset deleted')
# else:
# print('dataset not deleted')
# if ds['visible']:
# print('dataset visible')
# else:
# print('dataset not visible')
except Exception as e:
print(e)
pass

return None


from pprint import pprint
def test(context:Context, args:list):
id = 'c90fffcf98b31cd3'
gi = connect(context)
inputs = gi.workflows.get_workflow_inputs(id, 'PE fastq input')
pprint(inputs)
4 changes: 2 additions & 2 deletions abm/lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def __init__(self, *args):



def print_json(obj):
print(json.dumps(obj, indent=2))
def print_json(obj, indent=2):
print(json.dumps(obj, indent=indent))


def print_yaml(obj):
Expand Down
15 changes: 11 additions & 4 deletions abm/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import yaml


def list(context: Context, args: list):
gi = connect(context)
kwargs = {
Expand All @@ -29,11 +28,10 @@ def list(context: Context, args: list):
print('No datasets found')
return
print(f'Found {len(datasets)} datasets')
print('ID\tHistory\tDeleted\tState\tName')
print('ID\tHistory\tType\tDeleted\tState\tName')
for dataset in datasets:
state = dataset['state'] if 'state' in dataset else 'unknown'
print(f"{dataset['id']}\t{dataset['history_id']}\t{dataset['deleted']}\t{state}\t{dataset['name']}")
#pprint(dataset)
print(f"{dataset['id']}\t{dataset['history_id']}\t{dataset['history_content_type']}\t{dataset['deleted']}\t{state}\t{dataset['name']}")


def clean(context: Context, args: list):
Expand Down Expand Up @@ -62,6 +60,15 @@ def show(context: Context, args: list):
print(json.dumps(result, indent=4))


def get(context: Context, args: list):
if len(args) == 0:
print("ERROR: no dataset ID provided")
return
gi = connect(context)
result = gi.datasets.get_datasets(args[0])
print(json.dumps(result, indent=4))


def delete(context: Context, args: list):
# gi = connect(context)
print("dataset delete not implemented")
Expand Down
24 changes: 16 additions & 8 deletions abm/lib/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import helm
import benchmark
import logging
import traceback
from common import load_profiles, Context
from time import perf_counter
from datetime import timedelta
Expand Down Expand Up @@ -113,20 +114,24 @@ def summarize(context: Context, args: list):
if separator is not None:
print('ERROR: The output format is specified more than once')
return
print('tsv')
separator = '\t'
elif arg in ['-c', '--csv']:
if separator is not None:
print('ERROR: The output format is specified more than once')
return
separator = ','
print('csv')
elif arg in ['-m', '--model']:
if separator is not None:
print('ERROR: The output format is specified more than once')
return
print('making a model')
separator = ','
make_row = make_model_row
header_row = "job_id,tool_id,tool_version,state,memory.max_usage_in_bytes,cpuacct.usage,process_count,galaxy_slots,runtime_seconds,ref_data_size,input_data_size_1,input_data_size_2"
else:
print(f"Input dir {arg}")
input_dirs.append(arg)

if len(input_dirs) == 0:
Expand All @@ -142,32 +147,35 @@ def summarize(context: Context, args: list):
if not os.path.isfile(input_path) or not input_path.endswith('.json'):
continue
try:
print(f"Loading {input_path}")
with open(input_path, 'r') as f:
data = json.load(f)
if data['metrics']['tool_id'] == 'upload1':
if data['job_metrics']['tool_id'] == 'upload1':
print('Ignoring upload tool')
continue
row = make_row(data)
print(separator.join([ str(x) for x in row]))
except Exception as e:
# Silently fail to allow the remainder of the table to be generated.
# print(f"Unable to process {input_path}")
# print(e)
pass
print(f"Unable to process {input_path}")
print(e)
traceback.print_exc( )
#pass


accept_metrics = ['galaxy_slots', 'galaxy_memory_mb', 'runtime_seconds', 'cpuacct.usage','memory.limit_in_bytes', 'memory.max_usage_in_bytes'] #,'memory.soft_limit_in_bytes']

def make_table_row(data: dict):
row = [ str(data[key]) for key in ['run', 'cloud', 'job_conf', 'workflow_id', 'history_id', 'inputs']]
row.append(parse_toolid(data['metrics']['tool_id']))
row.append(data['metrics']['state'])
for e in _get_metrics(data['metrics']['job_metrics']):
row.append(parse_toolid(data['job_metrics']['tool_id']))
row.append(data['job_metrics']['state'])
for e in _get_metrics(data['job_metrics']['job_metrics']):
row.append(e)
return row


def make_model_row(data: dict):
metrics = data['metrics']
metrics = data['job_metrics']
row = []
row.append(metrics['id'])
tool_id = metrics['tool_id']
Expand Down
34 changes: 30 additions & 4 deletions abm/lib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

def list(context: Context, args: list):
state = ''
history_id = None
log.debug('Processing args')
log_state = False
while len(args) > 0:
Expand All @@ -18,14 +19,16 @@ def list(context: Context, args: list):
return
state = args.pop(0)
log_state = True

elif arg in ['-h', '--history']:
history_id = args.pop(0)
log.debug(f"Getting jobs from history {history_id}")
log.debug('Connecting to the Galaxy server')
gi = connect(context)
if log_state:
log.debug(f"Getting jobs with state {state}")
else:
log.debug("Getting job list")
job_list = gi.jobs.get_jobs(state=state)
job_list = gi.jobs.get_jobs(state=state, history_id=history_id)
log.debug(f"Iterating over job list with {len(job_list)} items")
for job in job_list:
print(f"{job['id']}\t{job['state']}\t{job['update_time']}\t{job['tool_id']}")
Expand All @@ -51,7 +54,30 @@ def metrics(context: Context, args: list):
print("ERROR: no job ID provided")
return
gi = connect(context)
metrics = gi.jobs.get_metrics(args[0])
if len(args) > 1:
arg = args.pop(0)
if arg in ['-h', '--history']:
history_id = args.pop(0)
log.debug(f"Getting metrics for jobs from history {history_id}")
job_list = gi.jobs.get_jobs(history_id=history_id)
metrics = []
for job in job_list:
metrics.append({
'job_id': job['id'],
'job_state': job['state'],
'tool_id': job['tool_id'],
'job_metrics': gi.jobs.get_metrics(job['id'])
})
else:
print(f"ERROR: Unrecognized argument {arg}")
else:
job = gi.jobs.show_job(args[0])
metrics = [{
'job_id': job['id'],
'job_state': job['state'],
'tool_id': job['tool_id'],
'job_metrics': gi.jobs.get_metrics(args[0])
}]
print(json.dumps(metrics, indent=4))
# metrics = {}
# for m in gi.jobs.get_metrics(args[0]):
Expand All @@ -78,4 +104,4 @@ def problems(context: Context, args: list):
print('ERROR: no job ID provided.')
return
gi = connect(context)
pprint(gi.jobs.get_common_problems(args[0]))
pprint(gi.jobs.get_common_problems(args[0]))
26 changes: 21 additions & 5 deletions abm/lib/menu.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
- name:
- name:
- benchmark
- bench
help: 'manage benchmarks'
Expand All @@ -15,6 +15,10 @@
handler: benchmark.validate
help: validate that workflow and dataset names can be translated into IDs
params: PATH
- name: [test]
handler: benchmark.test
help: experimental code
params: VARIES
- name:
- workflow
- wf
Expand Down Expand Up @@ -55,6 +59,14 @@
handler: workflow.rename
params: "ID 'new workflow name'"
help: "rename a workflow on the Galaxy server"
- name: ['invocation']
handler: workflow.invocation
params: "--workflow WORKFLOW_ID --invocation INVOCATION_ID"
help: show details about a specific workflow invocation
- name: ['inputs']
help: list inputs required by a workflow
params: WORKFLOW_ID
handler: workflow.inputs
- name: ['test']
handler: workflow.test
help: run some test code
Expand Down Expand Up @@ -86,6 +98,10 @@
handler: dataset.show
params: ID
help: show detailed information on a dataset
- name: ['get']
handler: dataset.get
params: NAME_OR_ID
help: show information for a given dataset name or ID
- name: [cleanup, clean, clear]
handler: dataset.clean
params: "[STATE [STATE...]]"
Expand Down Expand Up @@ -158,9 +174,9 @@
help: manage jobs on the server
menu:
- name: [ list, ls ]
help: list all jobs, or jobs in a particular state
help: list all jobs, or jobs in a particular state. Can filter by a history.
handler: job.list
params: "[-s|--state ok|running|error|waiting]"
params: "[-s|--state ok|running|error|waiting] [-h|--history_id historyID]"
- name: [ show ]
help: show detailed information about a job
handler: job.show
Expand All @@ -174,9 +190,9 @@
handler: job.cancel
params: ID
- name: [ metrics, stats ]
help: display runtime metrics for the job
help: display runtime metrics for the job, or a list of jobs contained in a history
handler: job.metrics
params: ID
params: "[ID | -h|--history historyID]"
- name: [users, user]
help: manage users on the Galaxy instance
menu:
Expand Down
Loading

0 comments on commit d749a40

Please sign in to comment.