Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable broadcast from UI Edit-Runtime form #5232

Merged
merged 3 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from cylc.flow.id import Tokens
from cylc.flow.network.schema import (
DEF_TYPES,
RUNTIME_FIELD_TO_CFG_MAP,
NodesEdges,
PROXY_NODES,
SUB_RESOLVERS,
Expand Down Expand Up @@ -697,14 +698,22 @@ async def _mutation_mapper(
return None

def broadcast(
self,
mode,
cycle_points=None,
namespaces=None,
settings=None,
cutoff=None
self,
mode: str,
cycle_points: Optional[List[str]] = None,
namespaces: Optional[List[str]] = None,
settings: Optional[List[Dict[str, str]]] = None,
cutoff: Any = None
):
"""Put or clear broadcasts."""
if settings is not None:
# Convert schema field names to workflow config setting names if
# applicable:
for i, dict_ in enumerate(settings):
settings[i] = {
RUNTIME_FIELD_TO_CFG_MAP.get(key, key): value
for key, value in dict_.items()
}
if mode == 'put_broadcast':
return self.schd.task_events_mgr.broadcast_mgr.put_broadcast(
cycle_points, namespaces, settings)
Expand Down
123 changes: 73 additions & 50 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,16 +703,23 @@ class Meta:
pruned = Boolean()


class RuntimeSetting(ObjectType):
"""Key = val setting, e.g. for [runtime][X][environment]"""
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
key = String(default_value=None)
value = String(default_value=None)


class Runtime(ObjectType):
class Meta:
description = """
Subset of runtime fields.

Existing on 3 different node types:
- Task/family definition (from the flow.cylc file)
- Task/family cycle instance (includes any broadcasts on top of the definition)
- Job (a record of what was run for a particular submit)
"""
description = sstrip("""
Subset of runtime fields.

Existing on 3 different node types:
- Task/family definition (from the flow.cylc file)
- Task/family cycle instance (includes any broadcasts on top
of the definition)
- Job (a record of what was run for a particular submit)
""")
platform = String(default_value=None)
script = String(default_value=None)
init_script = String(default_value=None)
Expand All @@ -727,9 +734,25 @@ class Meta:
execution_time_limit = String(default_value=None)
submission_polling_intervals = String(default_value=None)
submission_retry_delays = String(default_value=None)
directives = GenericScalar(resolver=resolve_json_dump)
environment = GenericScalar(resolver=resolve_json_dump)
outputs = GenericScalar(resolver=resolve_json_dump)
directives = graphene.List(RuntimeSetting, resolver=resolve_json_dump)
environment = graphene.List(RuntimeSetting, resolver=resolve_json_dump)
outputs = graphene.List(RuntimeSetting, resolver=resolve_json_dump)


RUNTIME_FIELD_TO_CFG_MAP = {
**{
k: k.replace('_', ' ') for k in Runtime.__dict__
if not k.startswith('_')
},
'init_script': 'init-script',
'env_script': 'env-script',
'err_script': 'err-script',
'exit_script': 'exit-script',
'pre_script': 'pre-script',
'post_script': 'post-script',
'work_sub_dir': 'work sub-directory',
}
"""Map GQL Runtime fields' names to workflow config setting names."""


class Job(ObjectType):
Expand Down Expand Up @@ -1226,7 +1249,7 @@ async def mutator(
root: Optional[Any],
info: 'ResolveInfo',
*,
command: str,
command: Optional[str] = None,
workflows: Optional[List[str]] = None,
exworkflows: Optional[List[str]] = None,
**kwargs: Any
Expand All @@ -1241,11 +1264,13 @@ async def mutator(
root: Parent field (if any) value object.
info: GraphQL execution info.
command: Mutation command name (name of method of
cylc.flow.network.resolvers.Resolvers or
Scheduler command_<name> method).
cylc.flow.network.resolvers.Resolvers or Scheduler command_<name>
method). If None, uses mutation class name converted to snake_case.
workflows: List of workflow IDs.
exworkflows: List of workflow IDs.
"""
if command is None:
command = to_snake_case(info.field_name)
if workflows is None:
workflows = []
if exworkflows is None:
Expand Down Expand Up @@ -1465,7 +1490,7 @@ class Meta:
*not* clear all-cycle or all-namespace broadcasts.

''')
resolver = partial(mutator, command='broadcast')
resolver = mutator

class Arguments:
workflows = graphene.List(WorkflowID, required=True)
Expand Down Expand Up @@ -1524,7 +1549,7 @@ class Meta:
Set workflow hold after cycle point. All tasks after this point
will be held.
''')
resolver = partial(mutator, command='set_hold_point')
resolver = mutator

class Arguments:
workflows = graphene.List(WorkflowID, required=True)
Expand All @@ -1543,7 +1568,7 @@ class Meta:

This suspends submission of tasks.
''')
resolver = partial(mutator, command='pause')
resolver = mutator

class Arguments:
workflows = graphene.List(WorkflowID, required=True)
Expand Down Expand Up @@ -1588,7 +1613,7 @@ class Meta:

Held tasks do not submit their jobs even if ready to run.
''')
resolver = partial(mutator, command='release_hold_point')
resolver = mutator

class Arguments:
workflows = graphene.List(WorkflowID, required=True)
Expand All @@ -1603,7 +1628,7 @@ class Meta:

See also the opposite command `pause`.
''')
resolver = partial(mutator, command='resume')
resolver = mutator

class Arguments:
workflows = graphene.List(WorkflowID, required=True)
Expand Down Expand Up @@ -1646,7 +1671,7 @@ class Meta:
for example, if you choose `WARNING`, only warnings and critical
messages will be logged.
''')
resolver = partial(mutator, command='set_verbosity')
resolver = mutator

class Arguments:
workflows = graphene.List(WorkflowID, required=True)
Expand All @@ -1662,7 +1687,7 @@ class Meta:
of the data-store graph window.

''')
resolver = partial(mutator, command='set_graph_window_extent')
resolver = mutator

class Arguments:
workflows = graphene.List(WorkflowID, required=True)
Expand All @@ -1687,7 +1712,7 @@ class Meta:
be executed prior to shutdown, unless
the stop mode is `{WorkflowStopMode.Now.name}`.
''')
resolver = partial(mutator, command='stop')
resolver = mutator

class Arguments:
workflows = graphene.List(WorkflowID, required=True)
Expand Down Expand Up @@ -1775,15 +1800,35 @@ class FlowMutationArguments:
This should be a list of flow numbers OR a single-item list
containing one of the following three strings:

Alternatively this may be a single-item list containing one of
the following values:

* {FLOW_ALL} - Triggered tasks belong to all active flows
(default).
* {FLOW_NEW} - Triggered tasks are assigned to a new flow.
* {FLOW_NONE} - Triggered tasks do not belong to any flow.
''')
)
flow_wait = Boolean(
default_value=False,
description=sstrip('''
Should the workflow "wait" or "continue on" from this task?

If `false` the scheduler will spawn and run downstream tasks
as normal (default).

If `true` the scheduler will not spawn the downstream tasks
unless it has been caught by the same flow at a later time.

For example you might set this to True to trigger a task
ahead of a flow, where you don't want the scheduled to
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved
"continue on" from this task until the flow has caught up
with it.
''')
)
flow_descr = String(
description=sstrip('''
If starting a new flow, this field can be used to provide the
new flow with a description for later reference.
''')
)


class Hold(Mutation, TaskMutation):
Expand All @@ -1793,7 +1838,7 @@ class Meta:

Held tasks do not submit their jobs even if ready to run.
''')
resolver = partial(mutator, command='hold')
resolver = mutator


class Release(Mutation, TaskMutation):
Expand All @@ -1803,7 +1848,7 @@ class Meta:

See also the opposite command `hold`.
''')
resolver = partial(mutator, command='release')
resolver = mutator


class Kill(Mutation, TaskMutation):
Expand Down Expand Up @@ -1877,29 +1922,7 @@ class Meta:
resolver = partial(mutator, command='force_trigger_tasks')

class Arguments(TaskMutation.Arguments, FlowMutationArguments):
flow_wait = Boolean(
default_value=False,
description=sstrip('''
Should the workflow "wait" or "continue on" from this task?

If `false` the scheduler will spawn and run downstream tasks
as normal (default).

If `true` the scheduler will not spawn the downstream tasks
unless it has been caught by the same flow at a later time.

For example you might set this to True to trigger a task
ahead of a flow, where you don't want the scheduled to
"continue on" from this task until the flow has caught up
with it.
''')
)
flow_descr = String(
description=sstrip('''
If starting a new flow, this field can be used to provide the
new flow with a description for later reference.
''')
)
...


def _mut_field(cls):
Expand Down
19 changes: 18 additions & 1 deletion tests/unit/network/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@

import pytest

from cylc.flow.network.schema import sort_elements, SortArgs
from cylc.flow.cfgspec.workflow import SPEC as WORKFLOW_SPEC
from cylc.flow.network.schema import (
RUNTIME_FIELD_TO_CFG_MAP,
Runtime,
sort_elements,
SortArgs,
)


@dataclass
Expand Down Expand Up @@ -82,3 +88,14 @@ def test_sort_args(elements, sort_args, expected_result):
else:
sort_elements(elements, args)
assert elements == expected_result


@pytest.mark.parametrize(
'field_name', RUNTIME_FIELD_TO_CFG_MAP.keys()
)
def test_runtime_field_to_cfg_map(field_name: str):
"""Ensure the Runtime type's fields can be mapped back to the workflow
config."""
cfg_name = RUNTIME_FIELD_TO_CFG_MAP[field_name]
assert field_name in Runtime.__dict__
assert WORKFLOW_SPEC.get('runtime', '__MANY__', cfg_name)