Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate workflow test from invocation id #1209

Merged
merged 11 commits into from
Jan 5, 2022
6 changes: 3 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ['3.6']
python-version: ['3.7']
tox-action:
- lint
- lint_docs
Expand All @@ -30,8 +30,8 @@ jobs:
#- unit-diagnostic-servetraining
#- unit-diagnostic-servecmd
#- unit-diagnostic-trainingwfcmd
- unit-nonredundant-noclientbuild-noshed-gx-2005
- unit-nonredundant-noclientbuild-noshed-gx-2009
- unit-nonredundant-noclientbuild-noshed-gx-2105
- unit-nonredundant-noclientbuild-noshed-gx-2109
- unit-nonredundant-noclientbuild-noshed-gx-dev
- unit-nonredundant-noclientbuild-noshed
- unit-diagnostic-serveclientcmd
Expand Down
23 changes: 20 additions & 3 deletions planemo/commands/cmd_workflow_job_init.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
"""Module describing the planemo ``workflow_job_init`` command."""
import os

import click
import yaml

from planemo import options
from planemo.cli import command_function
from planemo.galaxy.workflows import job_template, new_workflow_associated_path
from planemo.galaxy.workflows import (
get_workflow_from_invocation_id,
job_template,
new_workflow_associated_path
)
from planemo.io import can_write_to_path


@click.command('workflow_job_init')
@options.required_workflow_arg()
@options.force_option()
@options.workflow_output_artifact()
@options.galaxy_url_option()
@options.galaxy_user_key_option()
@options.from_invocation()
@options.profile_option()
@command_function
def cli(ctx, workflow_identifier, output=None, **kwds):
"""Initialize a Galaxy workflow job description for supplied workflow.
Expand All @@ -25,9 +35,16 @@ def cli(ctx, workflow_identifier, output=None, **kwds):
as well so this command may be renamed to to job_init at something along those
lines at some point.
"""
job = job_template(workflow_identifier)
if kwds["from_invocation"]:
if not os.path.isdir('test-data'):
ctx.log("Creating test-data directory.")
os.makedirs("test-data")
path_basename = get_workflow_from_invocation_id(workflow_identifier, kwds["galaxy_url"], kwds["galaxy_user_key"])

job = job_template(workflow_identifier, **kwds)

if output is None:
output = new_workflow_associated_path(workflow_identifier, suffix="job")
output = new_workflow_associated_path(path_basename if kwds["from_invocation"] else workflow_identifier, suffix="job")
if not can_write_to_path(output, **kwds):
ctx.exit(1)
with open(output, "w") as f_job:
Expand Down
23 changes: 17 additions & 6 deletions planemo/commands/cmd_workflow_test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from planemo import options
from planemo.cli import command_function
from planemo.galaxy.workflows import (
get_workflow_from_invocation_id,
job_template,
new_workflow_associated_path,
output_stubs_for_workflow,
Expand All @@ -19,6 +20,10 @@
@options.force_option()
@options.workflow_output_artifact()
@options.split_job_and_test()
@options.galaxy_url_option()
@options.galaxy_user_key_option()
@options.from_invocation()
@options.profile_option()
@command_function
def cli(ctx, workflow_identifier, output=None, split_test=False, **kwds):
"""Initialize a Galaxy workflow test description for supplied workflow.
Expand All @@ -27,21 +32,27 @@ def cli(ctx, workflow_identifier, output=None, split_test=False, **kwds):
to ensure inputs and outputs comply with best practices that make workflow
testing easier.
"""
path_basename = os.path.basename(workflow_identifier)
job = job_template(workflow_identifier)
if kwds["from_invocation"]:
if not os.path.isdir('test-data'):
ctx.log("Creating test-data directory.")
os.makedirs("test-data")
path_basename = get_workflow_from_invocation_id(workflow_identifier, kwds["galaxy_url"], kwds["galaxy_user_key"])
else:
path_basename = os.path.basename(workflow_identifier)
job = job_template(workflow_identifier, **kwds)
if output is None:
output = new_workflow_associated_path(workflow_identifier)
job_output = new_workflow_associated_path(workflow_identifier, suffix="job1")
output = new_workflow_associated_path(path_basename if kwds["from_invocation"] else workflow_identifier)
job_output = new_workflow_associated_path(path_basename if kwds["from_invocation"] else workflow_identifier, suffix="job1")
if not can_write_to_path(output, **kwds):
ctx.exit(1)

test_description = [{
'doc': 'Test outline for %s' % path_basename,
'job': job,
'outputs': output_stubs_for_workflow(workflow_identifier),
'outputs': output_stubs_for_workflow(workflow_identifier, **kwds),
}]
if split_test:
job_output = new_workflow_associated_path(workflow_identifier, suffix="job1")
job_output = new_workflow_associated_path(path_basename if kwds["from_invocation"] else workflow_identifier, suffix="job1")
if not can_write_to_path(job_output, **kwds):
ctx.exit(1)

Expand Down
90 changes: 88 additions & 2 deletions planemo/galaxy/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from gxformat2.interface import ImporterGalaxyInterface
from gxformat2.normalize import inputs_normalized, outputs_normalized

from planemo.galaxy.api import gi
from planemo.io import warn

FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories."
Expand Down Expand Up @@ -201,23 +202,28 @@ def output_labels(workflow_path):
return [o["id"] for o in outputs]


def output_stubs_for_workflow(workflow_path):
def output_stubs_for_workflow(workflow_path, **kwds):
"""
Return output labels and class.
"""
if kwds.get("from_invocation"):
return _job_outputs_template_from_invocation(workflow_path, kwds["galaxy_url"], kwds["galaxy_user_key"])
outputs = {}
for label in output_labels(workflow_path):
if not label.startswith('_anonymous_'):
outputs[label] = {'class': ''}
return outputs


def job_template(workflow_path):
def job_template(workflow_path, **kwds):
"""Return a job template for specified workflow.

A dictionary describing non-optional inputs that must be specified to
run the workflow.
"""
if kwds.get("from_invocation"):
return _job_inputs_template_from_invocation(workflow_path, kwds["galaxy_url"], kwds["galaxy_user_key"])

template = {}
for required_input_step in required_input_steps(workflow_path):
i_label = input_label(required_input_step)
Expand Down Expand Up @@ -280,6 +286,86 @@ def rewrite_job_file(input_file, output_file, job):
yaml.dump(job_contents, f)


def get_workflow_from_invocation_id(invocation_id, galaxy_url, galaxy_api_key):
user_gi = gi(url=galaxy_url, key=galaxy_api_key)
workflow_id = user_gi.invocations.show_invocation(invocation_id)['workflow_id']
workflow = user_gi.workflows._get(workflow_id, params={'instance': 'true'})
workflow_name = '-'.join(workflow["name"].split())
user_gi.workflows.export_workflow_to_local_path(use_default_filename=False, file_local_path=f'./{workflow_name}.ga', workflow_id=workflow["id"])

return workflow_name


def _job_inputs_template_from_invocation(invocation_id, galaxy_url, galaxy_api_key):
def _template_from_collection(user_gi, collection_id):
collection = user_gi.dataset_collections.show_dataset_collection(collection_id)
template = {
"class": "Collection",
"collection_type": collection["collection_type"],
"elements": []
}
for element in collection["elements"]:
if element["element_type"] == "hdca":
template['elements'].append(_template_from_collection(element["object"]["id"]))
elif element["element_type"] == "hda":
user_gi.datasets.download_dataset(element["object"]["id"], use_default_filename=False,
file_path=f"test-data/{input_step['label']}_{element['element_identifier']}.{ext}")
template['elements'].append(
{
"class": "File",
"identifier": element['element_identifier'],
"path": f"test-data/{input_step['label']}_{element['element_identifier']}.{ext}",
}
)
return template

user_gi = gi(url=galaxy_url, key=galaxy_api_key)
invocation = user_gi.invocations.show_invocation(invocation_id)
template = {}
for input_step in invocation['inputs'].values():
if input_step["src"] == "hda":
ext = user_gi.datasets.show_dataset(input_step["id"])["extension"]
user_gi.datasets.download_dataset(input_step["id"], use_default_filename=False, file_path=f"test-data/{input_step['label']}.{ext}")
template[input_step['label']] = {
"class": "File",
"path": f"test-data/{input_step['label']}.{ext}",
"filetype": ext
}
elif input_step["src"] == "hdca":
template[input_step['label']] = _template_from_collection(user_gi, input_step["id"])
for param, param_step in invocation['input_step_parameters'].items():
template[param] = param_step["parameter_value"]

return template


def _job_outputs_template_from_invocation(invocation_id, galaxy_url, galaxy_api_key):
user_gi = gi(url=galaxy_url, key=galaxy_api_key)
invocation = user_gi.invocations.show_invocation(invocation_id)
outputs = {}
for label, output in invocation["outputs"].items():
ext = user_gi.datasets.show_dataset(output["id"])["extension"]
user_gi.datasets.download_dataset(output["id"], use_default_filename=False, file_path=f"test-data/{label}.{ext}")
outputs[label] = {
'file': f"test-data/{label}.{ext}"
}
for label, output in invocation["output_collections"].items():
collection = user_gi.dataset_collections.show_dataset_collection(output['id'])
if ':' not in collection["collection_type"]:
user_gi.datasets.download_dataset(collection["elements"][0]["object"]["id"], use_default_filename=False,
file_path=f"test-data/{label}.{collection['elements'][0].get('extension', 'txt')}")
outputs[label] = {
'element_tests': { # only check the first element
collection["elements"][0]["element_identifier"]: f"test-data/{label}.{collection['elements'][0]['extension']}"
}
}
else:
outputs[label] = {
'element_tests': 'nested_collection_todo'
}
return outputs


__all__ = (
"import_workflow",
"describe_outputs",
Expand Down
10 changes: 10 additions & 0 deletions planemo/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,16 @@ def split_job_and_test():
return click.option("--split_test/--no_split_test", default=False, help="Write workflow job and test definitions to separate files.")


def from_invocation():
return planemo_option(
"--from_invocation/--not_from_invocation",
simonbray marked this conversation as resolved.
Show resolved Hide resolved
is_flag=True,
default=False,
help="Build a workflow test or job description from an invocation ID run on an external Galaxy."
"A Galaxy URL and API key must also be specified."
)


def required_job_arg():
"""Decorate click method as requiring the path to a single tool.
"""
Expand Down
6 changes: 5 additions & 1 deletion planemo/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ def create_command(virtualenv_path, galaxy_python_version=None):
python = os.path.abspath(python)
else:
python = sys.executable or 'python'
command = [python, '-m', 'venv', virtualenv_path]
virtualenv_on_path = which('virtualenv')
if virtualenv_on_path:
command = [virtualenv_on_path, virtualenv_path, '-p', python]
else:
command = [python, '-m', 'venv', virtualenv_path]
return " ".join(command)
12 changes: 11 additions & 1 deletion tests/test_external_galaxy_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def test_plain_init(self):
rerun_cmd = ["rerun", "--invocation", "invocation_id", "--profile", "test_ext_profile"]
upload_data_cmd = ["upload_data", "test_wf_alias", os.path.join(TEST_DATA_DIR, "wf2-job.yml"), "new-job.yml",
"--profile", "test_ext_profile"]
workflow_test_init_cmd = ["workflow_test_init", "invocation_id", "--from_invocation", "--profile", "test_ext_profile"]
test_workflow_test_init_cmd = ["test", "TestWorkflow1.ga", "--profile", "test_ext_profile"]

# test alias and profile creation
result = self._check_exit_code(profile_list_cmd)
Expand Down Expand Up @@ -69,10 +71,18 @@ def test_plain_init(self):
assert '1 jobs ok' in result.output or '"ok": 1' in result.output # so it passes regardless if tabulate is installed or not

# test rerun
rerun_cmd[2] = config.user_gi.workflows.get_invocations(wfid)[0]['id']
invocation_id = config.user_gi.workflows.get_invocations(wfid)[0]['id']
rerun_cmd[2] = invocation_id
result = self._check_exit_code(rerun_cmd)
assert 'No jobs matching the specified invocation' in result.output

# test generating test case from invocation_id
workflow_test_init_cmd[1] = invocation_id
self._check_exit_code(workflow_test_init_cmd)
assert os.path.exists('TestWorkflow1.ga')
assert os.path.exists('TestWorkflow1-tests.yml')
self._check_exit_code(test_workflow_test_init_cmd)

# test alias and profile deletion
result = self._check_exit_code(alias_delete_cmd)
assert 'Alias test_wf_alias was successfully deleted from profile test_ext_profile' in result.output
Expand Down
6 changes: 3 additions & 3 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ setenv =
quick: PLANEMO_SKIP_GALAXY_TESTS=1
master: PLANEMO_TEST_GALAXY_BRANCH=master
dev: PLANEMO_TEST_GALAXY_BRANCH=dev
2009: PLANEMO_TEST_GALAXY_BRANCH=release_20.09
2005: PLANEMO_TEST_GALAXY_BRANCH=release_20.05
2001: PLANEMO_TEST_GALAXY_BRANCH=release_20.01
2109: PLANEMO_TEST_GALAXY_BRANCH=release_21.09
2105: PLANEMO_TEST_GALAXY_BRANCH=release_21.05
2101: PLANEMO_TEST_GALAXY_BRANCH=release_21.01
skip_install =
doc_test,lint,lint_docs,lint_docstrings,mypy,gxwf_test_test: True
whitelist_externals =
Expand Down