Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

global: allow operational options in reana.yaml #400

Merged
merged 5 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion reana_client/cli/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
filter_data, parse_parameters)
from reana_client.config import ERROR_MESSAGES, JSON, URL
from reana_client.errors import FileDeletionError, FileUploadError
from reana_client.utils import (get_reana_yaml_file_path, load_reana_spec,
from reana_client.utils import (get_reana_yaml_file_path,
workflow_uuid_or_name)
from reana_commons.utils import click_table_printer

Expand Down
15 changes: 15 additions & 0 deletions reana_client/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ def validate_workflow_name(ctx, _, workflow_name):
return workflow_name


def key_value_to_dict(ctx, param, value):
"""Convert tuple params to dictionary. e.g `(foo=bar)` to `{'foo': 'bar'}`.

:param options: A tuple with CLI operational options.
:returns: A dictionary representation of the given options.
"""
try:
return dict(op.split('=') for op in value)
except ValueError as err:
click.secho('==> ERROR: Input parameter "{0}" is not valid. '
'It must follow format "param=value".'
.format(' '.join(value)), err=True, fg='red'),
sys.exit(1)


class NotRequiredIf(click.Option):
"""Allow only one of two arguments to be missing."""

Expand Down
70 changes: 37 additions & 33 deletions reana_client/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
from reana_client.cli.utils import (add_access_token_options,
add_workflow_option, check_connection,
filter_data, format_session_uri,
parse_parameters, validate_workflow_name)
key_value_to_dict, parse_parameters,
validate_workflow_name)
from reana_client.config import ERROR_MESSAGES, TIMECHECK
from reana_client.utils import (get_reana_yaml_file_path,
get_workflow_name_and_run_number,
get_workflow_status_change_msg, is_uuid_v4,
load_reana_spec,
validate_cwl_operational_options,
validate_input_parameters,
validate_serial_operational_options,
workflow_uuid_or_name)
from reana_commons.config import INTERACTIVE_SESSION_TYPES
from reana_commons.errors import REANAValidationError
from reana_commons.operational_options import validate_operational_options
from reana_commons.utils import click_table_printer


Expand Down Expand Up @@ -280,13 +281,15 @@ def workflow_create(ctx, file, name,
@click.option(
'-p', '--parameter', 'parameters',
multiple=True,
callback=key_value_to_dict,
help='Additional input parameters to override '
'original ones from reana.yaml. '
'E.g. -p myparam1=myval1 -p myparam2=myval2.',
)
@click.option(
'-o', '--option', 'options',
multiple=True,
callback=key_value_to_dict,
help='Additional operational options for the workflow execution. '
'E.g. CACHE=off. (workflow engine - serial) '
'E.g. --debug (workflow engine - cwl)',
Expand Down Expand Up @@ -320,29 +323,28 @@ def workflow_start(ctx, workflow, access_token,
for p in ctx.params:
logging.debug('{param}: {value}'.format(param=p, value=ctx.params[p]))

parsed_parameters = {'input_parameters':
dict(p.split('=') for p in parameters)}
parsed_parameters['operational_options'] = ' '.join(options).split()
parsed_parameters = {'input_parameters': parameters,
'operational_options': options}
if workflow:
if parameters or options:
try:
response = get_workflow_parameters(workflow, access_token)
if response['type'] == 'cwl':
validate_cwl_operational_options(
parsed_parameters['operational_options'])
if response['type'] == 'serial':
parsed_parameters['operational_options'] = \
validate_serial_operational_options(
parsed_parameters['operational_options'])
workflow_type = response['type']
original_parameters = response['parameters']
validate_operational_options(
workflow_type,
parsed_parameters['operational_options'])

parsed_parameters['input_parameters'] = \
validate_input_parameters(
parsed_parameters['input_parameters'],
response['parameters'])
original_parameters)
except REANAValidationError as e:
click.secho(e.message, err=True, fg='red')
sys.exit(1)
except Exception as e:
click.echo(
click.style('Could not apply given input parameters: '
'{0} \n{1}'.format(parameters, str(e))),
err=True)
click.secho('Could not apply given input parameters: '
'{0} \n{1}'.format(parameters, str(e)), err=True)
try:
logging.info('Connecting to {0}'.format(get_api_url()))
response = start_workflow(workflow,
Expand Down Expand Up @@ -392,13 +394,15 @@ def workflow_start(ctx, workflow, access_token,
@click.option(
'-p', '--parameter', 'parameters',
multiple=True,
callback=key_value_to_dict,
help='Additional input parameters to override '
'original ones from reana.yaml. '
'E.g. -p myparam1=myval1 -p myparam2=myval2.',
)
@click.option(
'-o', '--option', 'options',
multiple=True,
callback=key_value_to_dict,
help='Additional operational options for the workflow execution. '
'E.g. CACHE=off. (workflow engine - serial) '
'E.g. --debug (workflow engine - cwl)',
Expand Down Expand Up @@ -439,10 +443,8 @@ def workflow_restart(ctx, workflow, access_token,
for p in ctx.params:
logging.debug('{param}: {value}'.format(param=p, value=ctx.params[p]))

parsed_parameters = {'input_parameters':
dict(p.split('=') for p in parameters)}
parsed_parameters['operational_options'] = ' '.join(options).split()
parsed_parameters['restart'] = True
parsed_parameters = {'input_parameters': parameters,
'operational_options': options, 'restart': True}
if file:
parsed_parameters['reana_specification'] = \
load_reana_spec(click.format_filename(file))
Expand All @@ -460,22 +462,22 @@ def workflow_restart(ctx, workflow, access_token,
response = get_workflow_parameters(workflow, access_token)
workflow_type = response['type']
original_parameters = response['parameters']
if workflow_type == 'cwl':
validate_cwl_operational_options(

parsed_parameters['operational_options'] = \
validate_operational_options(
workflow_type,
parsed_parameters['operational_options'])
if workflow_type == 'serial':
parsed_parameters['operational_options'] = \
validate_serial_operational_options(
parsed_parameters['operational_options'])
parsed_parameters['input_parameters'] = \
validate_input_parameters(
parsed_parameters['input_parameters'],
original_parameters)

except REANAValidationError as e:
click.secho(e.message, err=True, fg='red')
sys.exit(1)
except Exception as e:
click.echo(
click.style('Could not apply given input parameters: '
'{0} \n{1}'.format(parameters, str(e))),
err=True)
click.secho('Could not apply given input parameters: '
'{0} \n{1}'.format(parameters, str(e)), err=True)
try:
logging.info('Connecting to {0}'.format(get_api_url()))
response = start_workflow(workflow,
Expand Down Expand Up @@ -811,14 +813,16 @@ def workflow_stop(ctx, workflow, force_stop, access_token): # noqa: D301
@click.option(
'-p', '--parameter', 'parameters',
multiple=True,
callback=key_value_to_dict,
help='Additional input parameters to override '
'original ones from reana.yaml. '
'E.g. -p myparam1=myval1 -p myparam2=myval2.',
)
@click.option(
'-o', '--option', 'options',
multiple=True,
help='Additional operatioal options for the workflow execution. '
callback=key_value_to_dict,
help='Additional operational options for the workflow execution. '
'E.g. CACHE=off.',
)
@click.option(
Expand Down
6 changes: 6 additions & 0 deletions reana_client/schemas/reana_analysis_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@
"title": "Analysis parameters.",
"description": "Key value data structure which represents the analysis parameters.",
"optional": true
},
"options": {
"id": "/properties/workflow/properties/options",
"type": "object",
"title": "Workflow operational options.",
"description": "Extra operational options accepted by workflow engines."
}
}
},
Expand Down
117 changes: 63 additions & 54 deletions reana_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import yadageschemas
import yaml
from jsonschema import ValidationError, validate
from reana_commons.errors import REANAValidationError
from reana_commons.operational_options import validate_operational_options
from reana_commons.serial import serial_load
from reana_commons.utils import get_workflow_status_change_verb

Expand All @@ -38,7 +40,7 @@ def workflow_uuid_or_name(ctx, param, value):
return value


def yadage_load(workflow_file, toplevel='.'):
def yadage_load(workflow_file, toplevel='.', **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is consuming these **kwargs? We already unpack toplevel which is the only one we are interested in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we set other inputs.options such as initdir, its value will get here and this function would fail. With **kwargs we can filter out these options that are not needed on validation time. I agree it's a bit hacky but it's cleanest and safest way I could come up with. If you have any other idea please let me know 🙂

"""Validate and return yadage workflow specification.

:param workflow_file: A specification file compliant with
Expand All @@ -65,37 +67,28 @@ def yadage_load(workflow_file, toplevel='.'):
}

try:
return yadageschemas.load(spec=workflow_file, specopts=specopts,
validopts=validopts, validate=True)
yadageschemas.load(spec=workflow_file, specopts=specopts,
validopts=validopts, validate=True)

except ValidationError as e:
e.message = str(e)
raise e


def cwl_load(workflow_file):
def cwl_load(workflow_file, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. For CWL workflows, when passing TARGET: foo for example, it will end up here.

"""Validate and return cwl workflow specification.

:param workflow_file: A specification file compliant with
`cwl` workflow specification.
:returns: A dictionary which represents the valid `cwl` workflow.
"""
result = \
subprocess.run(
['cwltool', '--pack', '--quiet', workflow_file],
stdout=subprocess.PIPE)
value = result.stdout.decode('utf-8')
subprocess.check_output(
['cwltool', '--pack', '--quiet', workflow_file])
value = result.decode('utf-8')
return json.loads(value)


workflow_load = {
'yadage': yadage_load,
'cwl': cwl_load,
'serial': serial_load,
}
"""Dictionary to extend with new workflow specification loaders."""


def load_workflow_spec(workflow_type, workflow_file, **kwargs):
"""Validate and return machine readable workflow specifications.

Expand All @@ -104,16 +97,44 @@ def load_workflow_spec(workflow_type, workflow_file, **kwargs):
specification.
:returns: A dictionary which represents the valid workflow specification.
"""
workflow_load = {
'yadage': yadage_load,
'cwl': cwl_load,
'serial': serial_load,
}
"""Dictionary to extend with new workflow specification loaders."""

return workflow_load[workflow_type](workflow_file, **kwargs)


def load_reana_spec(filepath, skip_validation=False):
"""Load and validate reana specification file.

:raises IOError: Error while reading REANA spec file from given filepath`.
:raises IOError: Error while reading REANA spec file from given `filepath`.
:raises ValidationError: Given REANA spec file does not validate against
REANA specification.
"""
def _prepare_kwargs(reana_yaml):
kwargs = {}
workflow_type = reana_yaml['workflow']['type']
if workflow_type == 'serial':
kwargs['specification'] = reana_yaml['workflow'].\
get('specification')
kwargs['parameters'] = \
reana_yaml.get('inputs', {}).get('parameters', {})
kwargs['original'] = True

if 'options' in reana_yaml['inputs']:
try:
reana_yaml['inputs']['options'] = validate_operational_options(
workflow_type,
reana_yaml['inputs']['options'])
except REANAValidationError as e:
click.secho(e.message, err=True, fg='red')
sys.exit(1)
kwargs.update(reana_yaml['inputs']['options'])
return kwargs

try:
with open(filepath) as f:
reana_yaml = yaml.load(f.read(), Loader=yaml.FullLoader)
Expand All @@ -123,21 +144,14 @@ def load_reana_spec(filepath, skip_validation=False):
.format(filepath=filepath))
_validate_reana_yaml(reana_yaml)

kwargs = {}
if reana_yaml['workflow']['type'] == 'serial':
kwargs['specification'] = reana_yaml['workflow'].\
get('specification')
kwargs['parameters'] = \
reana_yaml.get('inputs', {}).get('parameters', {})
kwargs['original'] = True

workflow_type = reana_yaml['workflow']['type']
reana_yaml['workflow']['specification'] = load_workflow_spec(
reana_yaml['workflow']['type'],
workflow_type,
reana_yaml['workflow'].get('file'),
**kwargs
**_prepare_kwargs(reana_yaml)
)

if reana_yaml['workflow']['type'] == 'cwl' and \
if workflow_type == 'cwl' and \
'inputs' in reana_yaml:
with open(reana_yaml['inputs']['parameters']['input']) as f:
reana_yaml['inputs']['parameters'] = \
Expand Down Expand Up @@ -224,32 +238,27 @@ def get_workflow_name_and_run_number(workflow_name):
return workflow_name, ''


def validate_cwl_operational_options(operational_options):
"""Validate cwl operational options."""
forbidden_args = ['--debug', '--tmpdir-prefix', '--tmp-outdir-prefix'
'--default-container', '--outdir']
for option in operational_options:
if option in forbidden_args:
click.echo('Operational option {0} are not allowed. \n'
.format(operational_options), err=True)
sys.exit(1)
cmd = 'cwltool --version {0}'.format(' '.join(operational_options))
try:
subprocess.check_call(cmd, shell=True)
except subprocess.CalledProcessError as err:
click.echo('Operational options {0} are not valid. \n'
'{1}'.format(operational_options, err), err=True)
sys.exit(1)


def validate_serial_operational_options(operational_options):
"""Return validated serial operational options."""
try:
return dict(p.split('=') for p in operational_options)
except Exception as err:
click.echo('Operational options {0} are not valid. \n'
'{1}'.format(operational_options, err), err=True)
sys.exit(1)
def get_workflow_root():
"""Return the current workflow root directory."""
reana_yaml = get_reana_yaml_file_path()
workflow_root = os.getcwd()
while True:
file_list = os.listdir(workflow_root)
parent_dir = os.path.dirname(workflow_root)
if reana_yaml in file_list:
break
else:
if workflow_root == parent_dir:
click.echo(click.style(
'Not a workflow directory (or any of the parent'
' directories).\nPlease upload from inside'
' the directory containing the reana.yaml '
'file of your workflow.', fg='red'))
sys.exit(1)
else:
workflow_root = parent_dir
workflow_root += '/'
return workflow_root


def validate_input_parameters(live_parameters, original_parameters):
Expand Down
Loading