Skip to content

Commit

Permalink
Merge pull request #278 from galaxyproject/documentation
Browse files Browse the repository at this point in the history
Add code documentation
  • Loading branch information
ksuderman authored Mar 29, 2024
2 parents abfe344 + f3011b2 commit 111fa50
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 28 deletions.
6 changes: 4 additions & 2 deletions abm/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

sys.path.append(os.path.dirname(os.path.realpath(__file__)))

# from common import parse_profile

# Where the workflow invocation data returned by Galaxy will be saved.
INVOCATIONS_DIR = "invocations"
# Where workflow runtime metrics will be saved.
METRICS_DIR = "metrics"

# Global instance of a YAML parser so we can reuse it if needed.
parser = None


# Keys used in various dictionaries.
class Keys:
NAME = 'name'
RUNS = 'runs'
Expand Down
82 changes: 67 additions & 15 deletions abm/lib/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@

def run_cli(context: Context, args: list):
"""
Runs a single workflow defined by *args[0]*
Command line handler to run a single benchmark.
:param args: a list that contains:
args[0] - the path to the benchmark configuration file
args[1] - the prefix to use when creating the new history in Galaxy
args[2] - the name of the experiment, if part of one. This is used to
generate output folder names.
:param context: a context object the defines how to connect to the Galaxy server.
:param args: parameters from the command line
:return: True if the workflows completed sucessfully. False otherwise.
"""
Expand All @@ -43,11 +40,15 @@ def run_cli(context: Context, args: list):


def run(context: Context, workflow_path, history_prefix: str, experiment: str):
# if len(args) > 1:
# history_prefix = args[1]
# if len(args) > 2:
# experiment = args[2].replace(' ', '_').lower()
"""
Does the actual work of running a benchmark.
:param context: a context object the defines how to connect to the Galaxy server.
:param workflow_path: path to the ABM workflow file. (benchmark really). NOTE this is NOT the Galaxy .ga file.
:param history_prefix: a prefix value used when generating new history names.
:param experiment: the name of the experiment (arbitrary string). Used to generate new history names.
:return: True if the workflow run completed successfully. False otherwise.
"""
if os.path.exists(INVOCATIONS_DIR):
if not os.path.isdir(INVOCATIONS_DIR):
print('ERROR: Can not save invocation status, directory name in use.')
Expand Down Expand Up @@ -76,7 +77,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
workflows = parse_workflow(workflow_path)
if not workflows:
print(f"Unable to load any workflow definitions from {workflow_path}")
return
return False

print(f"Found {len(workflows)} workflow definitions")
for workflow in workflows:
Expand Down Expand Up @@ -173,7 +174,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
histories = gi.histories.get_histories(name=spec['history'])
if len(histories) == 0:
print(f"ERROR: History {spec['history']} not foune")
return
return False
hid = histories[0]['id']
pairs = 0
paired_list = spec['paired']
Expand Down Expand Up @@ -273,6 +274,14 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):


def translate(context: Context, args: list):
"""
Translates the human readable names of datasets and workflows in to the Galaxy
ID that is unique to each server.
:param context: the conext object used to connect to the Galaxy server
:param args: [0] the path to the benchmarking YAML file to translate
:return: Nothing. Prints the translated workflow file to stdout.
"""
if len(args) == 0:
print('ERROR: no workflow configuration specified')
return
Expand Down Expand Up @@ -315,6 +324,14 @@ def translate(context: Context, args: list):


def validate(context: Context, args: list):
"""
Checks to see if the workflow and all datasets defined in the benchmark can
be found on the server.
:param context: the context object used to connect to the Galaxy instance
:param args: [0] the benchmark YAML file to be validated.
:return:
"""
if len(args) == 0:
print('ERROR: no workflow configuration specified')
return
Expand Down Expand Up @@ -420,10 +437,10 @@ def validate(context: Context, args: list):


def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict):
"""Blocks until all jobs defined in the *invocations* to complete.
"""Blocks until all jobs defined in *invocations* are complete (in a terminal state).
:param gi: The *GalaxyInstance** running the jobs
:param invocations:
:param invocations: a dictionary containing information about the jobs invoked
:return:
"""
wfid = invocations['workflow_id']
Expand Down Expand Up @@ -493,6 +510,11 @@ def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict):


def parse_workflow(workflow_path: str):
"""
Loads the benchmark YAML file.
:param workflow_path: the path to the file to be loaded.
:return: a dictionary containing the benchmark.
"""
if not os.path.exists(workflow_path):
print(f'ERROR: could not find workflow file {workflow_path}')
return None
Expand All @@ -511,6 +533,14 @@ def parse_workflow(workflow_path: str):


def find_workflow_id(gi, name_or_id):
"""
Resolves the human-readable name for a workflow into the unique ID on the
Galaxy instance.
:param gi: the connection object to the Galaxy instance
:param name_or_id: the name of the workflow
:return: The Galaxy workflow ID or None if the workflow could not be located
"""
try:
wf = gi.workflows.show_workflow(name_or_id)
return wf['id']
Expand All @@ -527,7 +557,14 @@ def find_workflow_id(gi, name_or_id):


def find_dataset_id(gi, name_or_id):
# print(f"Finding dataset {name_or_id}")
"""
Resolves the human-readable name if a dataset into the unique ID on the
Galaxy instance
:param gi: the connection object to the Galaxy instance
:param name_or_id: the name of the dataset.
:return: the Galaxy dataset ID or None if the dataset could not be located.
"""
try:
ds = gi.datasets.show_dataset(name_or_id)
return ds['id']
Expand All @@ -552,6 +589,14 @@ def find_dataset_id(gi, name_or_id):


def find_collection_id(gi, name):
"""
Resolves a human-readable collection name into the unique Galaxy ID.
:param gi: the connection object to the Galaxy instance
:param name: the name of the collection to resolve
:return: The unique Galaxy ID of the collection or None if the collection
can not be located.
"""
kwargs = {'limit': 10000, 'offset': 0}
datasets = gi.datasets.get_datasets(**kwargs)
if len(datasets) == 0:
Expand All @@ -573,6 +618,13 @@ def find_collection_id(gi, name):


def test(context: Context, args: list):
"""
Allows running testing code from the command line.
:param context: a connection object to a Galaxy instance
:param args: varies
:return: varies, typically None.
"""
id = 'c90fffcf98b31cd3'
gi = connect(context)
inputs = gi.workflows.get_workflow_inputs(id, 'PE fastq input')
Expand Down
2 changes: 2 additions & 0 deletions abm/lib/cloudlaunch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from cloudlaunch_cli.main import create_api_client
from common import Context

# DEPRECATED - Cloudlaunch is no longer used to manage Galaxy clusters.

BOLD = '\033[1m'
CLEAR = '\033[0m'

Expand Down
68 changes: 60 additions & 8 deletions abm/lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from bioblend.galaxy import dataset_collections
from ruamel.yaml import YAML

# Where we will look for our configuration file.
PROFILE_SEARCH_PATH = ['~/.abm/profile.yml', '.abm-profile.yml']

# Deprecated. Do not use.
datasets = {
"dna": [
"ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR013/ERR013101/ERR013101_1.fastq.gz",
Expand All @@ -25,6 +27,14 @@


def try_for(f, limit=3):
"""
Tries to invoke the function f. If the function f fails it will be retried
*limit* number of times.
:param f: the function to invoke
:param limit: how many times the function will be retried
:return: the result of calling f()
"""
count = 0
running = True
result = None
Expand All @@ -40,6 +50,13 @@ def try_for(f, limit=3):


class Context:
"""
The context object that contains information to connect to a Galaxy instance.
GALAXY_SERVER: the URL of the Galaxy server to connect to
API_KEY : a user's API key to make API calls on the Galaxy instance
KUBECONFIG: : the kubeconfig file needed to make changes via Helm
"""
def __init__(self, *args):
if len(args) == 1:
arg = args[0]
Expand Down Expand Up @@ -90,7 +107,12 @@ def connect(context: Context):


def _set_active_profile(profile_name: str):
# print(f"Parsing profile for {profile_name}")
"""
Unused.
:param profile_name:
:return:
"""
lib.GALAXY_SERVER, lib.API_KEY, lib.KUBECONFIG = parse_profile(profile_name)
return lib.GALAXY_SERVER != None

Expand All @@ -100,6 +122,11 @@ def get_context(profile_name: str):


def get_yaml_parser():
"""
Returns a singleton instance of a YAML parser.
:return: a YAML parser.
"""
if lib.parser is None:
lib.parser = YAML()
return lib.parser
Expand All @@ -124,6 +151,12 @@ def load_profiles():


def save_profiles(profiles: dict):
"""
Write the ABM configuration file.
:param profiles: the configuration to be saved.
:return: None
"""
yaml = get_yaml_parser()
for profile_path in PROFILE_SEARCH_PATH:
path = os.path.expanduser(profile_path)
Expand Down Expand Up @@ -161,22 +194,28 @@ def parse_profile(profile_name: str):


def run(command, env: dict = None):
"""
Runs a command on the local machine. Used to invoke the helm and kubectl
executables.
:param command: the command to be invoked
:param env: environment variables for the command.
:return:
"""
if env is None:
env = os.environ
# if env is not None:
# for name,value in env.items():
# os.environ[name] = value
# if lib.KUBECONFIG is not None:
# os.environ['KUBECONFIG'] = lib.KUBECONFIG
# local_env = os.environ.copy()
# local_env.update(env)
result = subprocess.run(command.split(), capture_output=True, env=env)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode('utf-8').strip())
return result.stdout.decode('utf-8').strip()


def get_env(context: Context):
"""
Creates a copy of the environment variables as returned by os.environ.
:param context: Ignored
:return: a dictionary of the environment variables
"""
copy = os.environ.copy()
for key, value in context.__dict__.items():
if value is not None:
Expand All @@ -185,6 +224,13 @@ def get_env(context: Context):


def find_executable(name):
"""
Used the which command on the local machine to find the full path to an
executable.
:param name: the name of a command line executable or script.
:return: the full path to the executable or an empty string if the executable is not found.
"""
return run(f"which {name}")


Expand All @@ -208,6 +254,7 @@ def find_executable(name):
# "swaptotal",
# "uname"

# Columns to be defined when generating CSV files.
table_header = [
"id",
"history_id",
Expand Down Expand Up @@ -237,6 +284,11 @@ def find_executable(name):
]

def print_table_header():
"""
Prints the table header suitable for inclusion in CSV files.
:return: None. The table header is printed to stdout.
"""
print(','.join(table_header))


Expand Down
6 changes: 3 additions & 3 deletions abm/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

def do_list(context: Context, argv: list):
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--state', help='list jobs in this state')
parser.add_argument('--history', help='show jobs in the given history')
parser.add_argument('-t', '--tool', help='only show jobs generate by this tool')
parser.add_argument('-s', '--state', help='list datasets in this state')
parser.add_argument('--history', help='show datasets in the given history')
parser.add_argument('-t', '--tool', help='only show datasets generate by this tool')
args = parser.parse_args(argv)
kwargs = {'limit': 10000, 'offset': 0, 'deleted': False}
gi = connect(context)
Expand Down

0 comments on commit 111fa50

Please sign in to comment.