Skip to content

Commit

Permalink
add Cylc VRO
Browse files Browse the repository at this point in the history
  • Loading branch information
wxtim committed Dec 14, 2022
1 parent b224a6a commit 86acb05
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 24 deletions.
29 changes: 29 additions & 0 deletions cylc/flow/pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,32 @@ def get_workflow_name_from_id(workflow_id: str) -> str:
name_path = id_path

return str(name_path.relative_to(cylc_run_dir))


def get_source_conf_from_id(workflow_id):
"""Give a workflow id, get the flow.cylc file of the source.
Additionally Check
1. Flow.cylc or suite.rc exists at source.
"""
# Avoid circular import:
from cylc.flow.workflow_files import WorkflowFiles
# Get path of source:
run_dir = Path(get_workflow_run_dir(workflow_id))
relative_path = run_dir.relative_to(get_cylc_run_dir())
src = (
Path(get_cylc_run_dir())
/ relative_path.parts[0]
/ WorkflowFiles.Install.DIRNAME
/ WorkflowFiles.Install.SOURCE
)

# Test whether there is a config file at source:
if (src / WorkflowFiles.FLOW_FILE).exists():
return src.resolve() / WorkflowFiles.FLOW_FILE
elif (src / WorkflowFiles.SUITE_RC).exists():
return src.resolve() / WorkflowFiles.SUITE_RC
else:
raise WorkflowFilesError(
f'Source file not present at: {src.resolve()}'
)
2 changes: 1 addition & 1 deletion cylc/flow/scripts/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async def _main(
workflow_id,
flow_file,
options,
get_template_vars(options)
get_template_vars(options, flow_file)
)

config.pcfg.idump(
Expand Down
3 changes: 2 additions & 1 deletion cylc/flow/scripts/cylc.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ def get_version(long=False):
'shutdown': 'stop',
'task-message': 'message',
'unhold': 'release',
'validate-install-play': 'vip'
'validate-install-play': 'vip',
'validate-reinstall-reload': 'vro',
}


Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scripts/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def _get_inheritance_nodes_and_edges(

def get_config(workflow_id: str, opts: 'Values', flow_file) -> WorkflowConfig:
"""Return a WorkflowConfig object for the provided reg / path."""
template_vars = get_template_vars(opts)
template_vars = get_template_vars(opts, flow_file)
return WorkflowConfig(
workflow_id, flow_file, opts, template_vars=template_vars
)
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scripts/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def _main(parser: COP, options: 'Values', workflow_id: str) -> None:
src=True,
constraint='workflows',
)
template_vars = get_template_vars(options)
template_vars = get_template_vars(options, flow_file)

if options.all_tasks and options.all_namespaces:
parser.error("Choose either -a or -n")
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scripts/psutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
the `psutil` on remote platforms.
Exits:
0 - If successfull.
0 - If successful.
2 - For errors in extracting results from psutil
1 - For all other errors.
"""
Expand Down
23 changes: 15 additions & 8 deletions cylc/flow/scripts/reinstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
from cylc.flow.id_cli import parse_id
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
OptionSettings,
Options,
WORKFLOW_ID_ARG_DOC,
)
Expand All @@ -100,6 +101,18 @@
_input = input # to enable testing


REINSTALL_CYLC_ROSE_OPTIONS = [
OptionSettings(
['--clear-rose-install-options'],
help="Clear options previously set by cylc-rose.",
action='store_true',
default=False,
dest="clear_rose_install_opts",
sources={'reinstall'}
)
]


def get_option_parser() -> COP:
parser = COP(
__doc__, comms=True, argdoc=[WORKFLOW_ID_ARG_DOC]
Expand All @@ -112,14 +125,8 @@ def get_option_parser() -> COP:
except ImportError:
pass
else:
parser.add_option(
"--clear-rose-install-options",
help="Clear options previously set by cylc-rose.",
action='store_true',
default=False,
dest="clear_rose_install_opts"
)

for option in REINSTALL_CYLC_ROSE_OPTIONS:
parser.add_option(*option.args, **option.kwargs)
return parser


Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/scripts/reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ async def run(options: 'Values', workflow_id: str) -> None:

@cli_function(get_option_parser)
def main(parser: COP, options: 'Values', *ids) -> None:
reload_cli(options, *ids)


def reload_cli(options: 'Values', *ids) -> None:
call_multi(
partial(run, options),
*ids,
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scripts/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async def wrapped_main(
workflow_id,
flow_file,
options,
get_template_vars(options),
get_template_vars(options, flow_file),
output_fname=options.output,
mem_log_func=profiler.log_memory
)
Expand Down
9 changes: 3 additions & 6 deletions cylc/flow/scripts/validate_install_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from cylc.flow.scripts.validate import (
VALIDATE_OPTIONS,
_main as validate_main
_main as cylc_validate
)
from cylc.flow.scripts.install import (
INSTALL_OPTIONS, install_cli as cylc_install, get_source_location
Expand Down Expand Up @@ -75,10 +75,7 @@ def get_option_parser() -> COP:
]
)
for option in VIP_OPTIONS:
# Make a special exception for option against_source which makes
# no sense in a VIP context.
if option.kwargs.get('dest') != 'against_source':
parser.add_option(*option.args, **option.kwargs)
parser.add_option(*option.args, **option.kwargs)
return parser


Expand All @@ -92,7 +89,7 @@ def main(parser: COP, options: 'Values', workflow_id: Optional[str] = None):
source = get_source_location(workflow_id)

log_subcommand('validate', source)
validate_main(parser, options, str(source))
cylc_validate(parser, options, str(source))

log_subcommand('install', source)
workflow_id = cylc_install(options, workflow_id)
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/scripts/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ def main(parser: COP, options: 'Values', workflow_id: str) -> None:

async def _main(parser: COP, options: 'Values', workflow_id: str) -> None:
workflow_id, _, flow_file = await parse_id_async(

workflow_id,
src=True,
constraint='workflows',
Expand Down
13 changes: 11 additions & 2 deletions cylc/flow/templatevars.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,18 @@ def eval_var(var):
) from None


def load_template_vars(template_vars=None, template_vars_file=None):
def load_template_vars(
template_vars=None, template_vars_file=None, flow_file=None
):
"""Load template variables from key=value strings."""
res = {}
if flow_file is not None:
srcdir = str(Path(flow_file).parent)
db_tvars = OldTemplateVars(srcdir).template_vars
if db_tvars:
for key, val in db_tvars.items():
res[key] = val

if template_vars_file:
with open(template_vars_file, 'r') as handle:
for line in handle:
Expand All @@ -101,7 +110,7 @@ def load_template_vars(template_vars=None, template_vars_file=None):
return res


def get_template_vars(options: Values) -> Dict[str, Any]:
def get_template_vars(options: Values, flow_file) -> Dict[str, Any]:
"""Convienence wrapper for ``load_template_vars``.
Args:
Expand Down
10 changes: 9 additions & 1 deletion cylc/flow/workflow_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,9 @@ def _is_process_running(
return cli_format(process['cmdline']) == command


def detect_old_contact_file(reg: str, contact_data=None) -> None:
def detect_old_contact_file(
reg: str, contact_data=None, quiet=False
) -> None:
"""Check if the workflow process is still running.
As a side-effect this should detect and rectify the situation
Expand All @@ -479,6 +481,10 @@ def detect_old_contact_file(reg: str, contact_data=None) -> None:
Args:
reg: workflow name
contact_date:
quiet: Controls whether to return already running message -
this is not required if Cylc VRO is using this function to
decide whether to resume or reload.
Raises:
CylcError:
Expand Down Expand Up @@ -514,6 +520,8 @@ def detect_old_contact_file(reg: str, contact_data=None) -> None:
fname = get_contact_file_path(reg)
if process_is_running:
# ... the process is running, raise an exception
if quiet:
raise ServiceFileError()
raise ServiceFileError(
CONTACT_FILE_EXISTS_MSG % {
"host": old_host,
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ cylc.command =
validate = cylc.flow.scripts.validate:main
view = cylc.flow.scripts.view:main
vip = cylc.flow.scripts.validate_install_play:main
vro = cylc.flow.scripts.validate_reinstall_reload:main
# async functions to run within the scheduler main loop
cylc.main_loop =
health_check = cylc.flow.main_loop.health_check
Expand Down
6 changes: 6 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ def flow(run_dir, test_dir):
yield partial(_make_flow, run_dir, test_dir)


@pytest.fixture
def flow_src(tmp_path):
"""A function for creating function-level flows."""
yield partial(_make_src_flow, tmp_path)


@pytest.fixture(scope='module')
def mod_scheduler():
"""Return a Scheduler object for a flow.
Expand Down
41 changes: 41 additions & 0 deletions tests/unit/test_pathutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
get_next_rundir_number,
get_remote_workflow_run_dir,
get_remote_workflow_run_job_dir,
get_source_conf_from_id,
get_workflow_run_dir,
get_workflow_run_job_dir,
get_workflow_run_scheduler_log_dir,
Expand Down Expand Up @@ -576,3 +577,43 @@ def test_get_workflow_name_from_id(

result = get_workflow_name_from_id(id_)
assert result == name


@pytest.fixture
def _setup_get_source_conf_from_id(tmp_path, monkeypatch):
run = tmp_path / 'cylc-run/run'
src = tmp_path / 'cylc-src/src'
src.mkdir(parents=True)
(run / '_cylc-install').mkdir(parents=True)
(run / '_cylc-install/source').symlink_to(src)
monkeypatch.setattr(
'cylc.flow.pathutil.get_workflow_run_dir',
lambda workflow_id: tmp_path / 'cylc-run/run'
)
monkeypatch.setattr(
'cylc.flow.pathutil.get_cylc_run_dir',
lambda: tmp_path / 'cylc-run'
)
yield tmp_path


@pytest.mark.parametrize(
'conf_file',
(
param('flow.cylc', id='flow.cylc'),
param('suite.rc', id='flow.cylc'),
param(None, id='no file'),
)
)
def test_get_source_conf_from_id(
_setup_get_source_conf_from_id, conf_file
):
"""It locates a flow.cylc, suite.rc from a run dir, or fails nicely.
"""
if conf_file:
expect = _setup_get_source_conf_from_id / f'cylc-src/src/{conf_file}'
expect.touch()
assert get_source_conf_from_id('run') == expect
else:
with pytest.raises(WorkflowFilesError):
get_source_conf_from_id('run')

0 comments on commit 86acb05

Please sign in to comment.