Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code documentation #278

Merged
merged 3 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions abm/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

sys.path.append(os.path.dirname(os.path.realpath(__file__)))

# from common import parse_profile

# Where the workflow invocation data returned by Galaxy will be saved.
INVOCATIONS_DIR = "invocations"
# Where workflow runtime metrics will be saved.
METRICS_DIR = "metrics"

# Global instance of a YAML parser so we can reuse it if needed.
parser = None


# Keys used in various dictionaries.
class Keys:
NAME = 'name'
RUNS = 'runs'
Expand Down
82 changes: 67 additions & 15 deletions abm/lib/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@

def run_cli(context: Context, args: list):
"""
Runs a single workflow defined by *args[0]*
Command line handler to run a single benchmark.

:param args: a list that contains:
args[0] - the path to the benchmark configuration file
args[1] - the prefix to use when creating the new history in Galaxy
args[2] - the name of the experiment, if part of one. This is used to
generate output folder names.
:param context: a context object the defines how to connect to the Galaxy server.
:param args: parameters from the command line

:return: True if the workflows completed sucessfully. False otherwise.
"""
Expand All @@ -43,11 +40,15 @@ def run_cli(context: Context, args: list):


def run(context: Context, workflow_path, history_prefix: str, experiment: str):
# if len(args) > 1:
# history_prefix = args[1]
# if len(args) > 2:
# experiment = args[2].replace(' ', '_').lower()
"""
Does the actual work of running a benchmark.

:param context: a context object the defines how to connect to the Galaxy server.
:param workflow_path: path to the ABM workflow file. (benchmark really). NOTE this is NOT the Galaxy .ga file.
:param history_prefix: a prefix value used when generating new history names.
:param experiment: the name of the experiment (arbitrary string). Used to generate new history names.
:return: True if the workflow run completed successfully. False otherwise.
"""
if os.path.exists(INVOCATIONS_DIR):
if not os.path.isdir(INVOCATIONS_DIR):
print('ERROR: Can not save invocation status, directory name in use.')
Expand Down Expand Up @@ -76,7 +77,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
workflows = parse_workflow(workflow_path)
if not workflows:
print(f"Unable to load any workflow definitions from {workflow_path}")
return
return False

print(f"Found {len(workflows)} workflow definitions")
for workflow in workflows:
Expand Down Expand Up @@ -173,7 +174,7 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):
histories = gi.histories.get_histories(name=spec['history'])
if len(histories) == 0:
print(f"ERROR: History {spec['history']} not foune")
return
return False
hid = histories[0]['id']
pairs = 0
paired_list = spec['paired']
Expand Down Expand Up @@ -273,6 +274,14 @@ def run(context: Context, workflow_path, history_prefix: str, experiment: str):


def translate(context: Context, args: list):
"""
Translates the human readable names of datasets and workflows in to the Galaxy
ID that is unique to each server.

:param context: the conext object used to connect to the Galaxy server
:param args: [0] the path to the benchmarking YAML file to translate
:return: Nothing. Prints the translated workflow file to stdout.
"""
if len(args) == 0:
print('ERROR: no workflow configuration specified')
return
Expand Down Expand Up @@ -315,6 +324,14 @@ def translate(context: Context, args: list):


def validate(context: Context, args: list):
"""
Checks to see if the workflow and all datasets defined in the benchmark can
be found on the server.

:param context: the context object used to connect to the Galaxy instance
:param args: [0] the benchmark YAML file to be validated.
:return:
"""
if len(args) == 0:
print('ERROR: no workflow configuration specified')
return
Expand Down Expand Up @@ -420,10 +437,10 @@ def validate(context: Context, args: list):


def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict):
"""Blocks until all jobs defined in the *invocations* to complete.
"""Blocks until all jobs defined in *invocations* are complete (in a terminal state).

:param gi: The *GalaxyInstance** running the jobs
:param invocations:
:param invocations: a dictionary containing information about the jobs invoked
:return:
"""
wfid = invocations['workflow_id']
Expand Down Expand Up @@ -493,6 +510,11 @@ def wait_for_jobs(context, gi: GalaxyInstance, invocations: dict):


def parse_workflow(workflow_path: str):
"""
Loads the benchmark YAML file.
:param workflow_path: the path to the file to be loaded.
:return: a dictionary containing the benchmark.
"""
if not os.path.exists(workflow_path):
print(f'ERROR: could not find workflow file {workflow_path}')
return None
Expand All @@ -511,6 +533,14 @@ def parse_workflow(workflow_path: str):


def find_workflow_id(gi, name_or_id):
"""
Resolves the human-readable name for a workflow into the unique ID on the
Galaxy instance.

:param gi: the connection object to the Galaxy instance
:param name_or_id: the name of the workflow
:return: The Galaxy workflow ID or None if the workflow could not be located
"""
try:
wf = gi.workflows.show_workflow(name_or_id)
return wf['id']
Expand All @@ -527,7 +557,14 @@ def find_workflow_id(gi, name_or_id):


def find_dataset_id(gi, name_or_id):
# print(f"Finding dataset {name_or_id}")
"""
Resolves the human-readable name if a dataset into the unique ID on the
Galaxy instance

:param gi: the connection object to the Galaxy instance
:param name_or_id: the name of the dataset.
:return: the Galaxy dataset ID or None if the dataset could not be located.
"""
try:
ds = gi.datasets.show_dataset(name_or_id)
return ds['id']
Expand All @@ -552,6 +589,14 @@ def find_dataset_id(gi, name_or_id):


def find_collection_id(gi, name):
"""
Resolves a human-readable collection name into the unique Galaxy ID.

:param gi: the connection object to the Galaxy instance
:param name: the name of the collection to resolve
:return: The unique Galaxy ID of the collection or None if the collection
can not be located.
"""
kwargs = {'limit': 10000, 'offset': 0}
datasets = gi.datasets.get_datasets(**kwargs)
if len(datasets) == 0:
Expand All @@ -573,6 +618,13 @@ def find_collection_id(gi, name):


def test(context: Context, args: list):
"""
Allows running testing code from the command line.

:param context: a connection object to a Galaxy instance
:param args: varies
:return: varies, typically None.
"""
id = 'c90fffcf98b31cd3'
gi = connect(context)
inputs = gi.workflows.get_workflow_inputs(id, 'PE fastq input')
Expand Down
2 changes: 2 additions & 0 deletions abm/lib/cloudlaunch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from cloudlaunch_cli.main import create_api_client
from common import Context

# DEPRECATED - Cloudlaunch is no longer used to manage Galaxy clusters.

BOLD = '\033[1m'
CLEAR = '\033[0m'

Expand Down
68 changes: 60 additions & 8 deletions abm/lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from bioblend.galaxy import dataset_collections
from ruamel.yaml import YAML

# Where we will look for our configuration file.
PROFILE_SEARCH_PATH = ['~/.abm/profile.yml', '.abm-profile.yml']

# Deprecated. Do not use.
datasets = {
"dna": [
"ftp://ftp.sra.ebi.ac.uk/vol1/fastq/ERR013/ERR013101/ERR013101_1.fastq.gz",
Expand All @@ -25,6 +27,14 @@


def try_for(f, limit=3):
"""
Tries to invoke the function f. If the function f fails it will be retried
*limit* number of times.

:param f: the function to invoke
:param limit: how many times the function will be retried
:return: the result of calling f()
"""
count = 0
running = True
result = None
Expand All @@ -40,6 +50,13 @@ def try_for(f, limit=3):


class Context:
"""
The context object that contains information to connect to a Galaxy instance.

GALAXY_SERVER: the URL of the Galaxy server to connect to
API_KEY : a user's API key to make API calls on the Galaxy instance
KUBECONFIG: : the kubeconfig file needed to make changes via Helm
"""
def __init__(self, *args):
if len(args) == 1:
arg = args[0]
Expand Down Expand Up @@ -90,7 +107,12 @@ def connect(context: Context):


def _set_active_profile(profile_name: str):
# print(f"Parsing profile for {profile_name}")
"""
Unused.

:param profile_name:
:return:
"""
lib.GALAXY_SERVER, lib.API_KEY, lib.KUBECONFIG = parse_profile(profile_name)
return lib.GALAXY_SERVER != None

Expand All @@ -100,6 +122,11 @@ def get_context(profile_name: str):


def get_yaml_parser():
"""
Returns a singleton instance of a YAML parser.

:return: a YAML parser.
"""
if lib.parser is None:
lib.parser = YAML()
return lib.parser
Expand All @@ -124,6 +151,12 @@ def load_profiles():


def save_profiles(profiles: dict):
"""
Write the ABM configuration file.

:param profiles: the configuration to be saved.
:return: None
"""
yaml = get_yaml_parser()
for profile_path in PROFILE_SEARCH_PATH:
path = os.path.expanduser(profile_path)
Expand Down Expand Up @@ -161,22 +194,28 @@ def parse_profile(profile_name: str):


def run(command, env: dict = None):
"""
Runs a command on the local machine. Used to invoke the helm and kubectl
executables.

:param command: the command to be invoked
:param env: environment variables for the command.
:return:
"""
if env is None:
env = os.environ
# if env is not None:
# for name,value in env.items():
# os.environ[name] = value
# if lib.KUBECONFIG is not None:
# os.environ['KUBECONFIG'] = lib.KUBECONFIG
# local_env = os.environ.copy()
# local_env.update(env)
result = subprocess.run(command.split(), capture_output=True, env=env)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode('utf-8').strip())
return result.stdout.decode('utf-8').strip()


def get_env(context: Context):
"""
Creates a copy of the environment variables as returned by os.environ.
:param context: Ignored
:return: a dictionary of the environment variables
"""
copy = os.environ.copy()
for key, value in context.__dict__.items():
if value is not None:
Expand All @@ -185,6 +224,13 @@ def get_env(context: Context):


def find_executable(name):
"""
Used the which command on the local machine to find the full path to an
executable.

:param name: the name of a command line executable or script.
:return: the full path to the executable or an empty string if the executable is not found.
"""
return run(f"which {name}")


Expand All @@ -208,6 +254,7 @@ def find_executable(name):
# "swaptotal",
# "uname"

# Columns to be defined when generating CSV files.
table_header = [
"id",
"history_id",
Expand Down Expand Up @@ -237,6 +284,11 @@ def find_executable(name):
]

def print_table_header():
"""
Prints the table header suitable for inclusion in CSV files.

:return: None. The table header is printed to stdout.
"""
print(','.join(table_header))


Expand Down
6 changes: 3 additions & 3 deletions abm/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

def do_list(context: Context, argv: list):
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--state', help='list jobs in this state')
parser.add_argument('--history', help='show jobs in the given history')
parser.add_argument('-t', '--tool', help='only show jobs generate by this tool')
parser.add_argument('-s', '--state', help='list datasets in this state')
parser.add_argument('--history', help='show datasets in the given history')
parser.add_argument('-t', '--tool', help='only show datasets generate by this tool')
args = parser.parse_args(argv)
kwargs = {'limit': 10000, 'offset': 0, 'deleted': False}
gi = connect(context)
Expand Down