Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add runtime fields/deltas to def, proxy, job data elements #5138

Merged
merged 13 commits into from
Nov 18, 2022
Merged
43 changes: 40 additions & 3 deletions cylc/flow/broadcast_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""Manage broadcast (and external trigger broadcast)."""

import re
from copy import deepcopy
from threading import RLock

from cylc.flow import LOG
Expand All @@ -25,9 +26,12 @@
get_broadcast_change_report,
get_broadcast_bad_options_report,
)
from cylc.flow.cfgspec.workflow import SPEC
from cylc.flow.id import Tokens
from cylc.flow.cycling.loader import get_point, standardise_point_string
from cylc.flow.exceptions import PointParsingError
from cylc.flow.parsec.util import listjoin
from cylc.flow.parsec.validate import BroadcastConfigValidator

ALL_CYCLE_POINTS_STRS = ["*", "all-cycle-points", "all-cycles"]

Expand Down Expand Up @@ -112,6 +116,10 @@ def clear_broadcast(
elif (not cancel_keys_list or
keys + [key] in cancel_keys_list):
stuff[key] = None
if isinstance(value, list):
value = listjoin(value)
else:
value = str(value)
setting = {key: value}
for rkey in reversed(keys):
setting = {rkey: setting}
Expand Down Expand Up @@ -199,6 +207,25 @@ def load_db_broadcast_states(self, row_idx, row):
"key": key,
"value": value})

# BACK COMPAT: post_load_db_coerce
# The DB at 8.0.x stores Interval values as neither ISO8601 duration
# string or DurationFloat. This has been fixed at 8.1.0.
# url:
# https://github.com/cylc/cylc-flow/pull/5138
# from:
# 8.0.x
# to:
# 8.1.x
# remove at:
# 8.x
def post_load_db_coerce(self):
"""Coerce DB loaded values to config objects, i.e. DurationFloat."""
for namespaces in self.broadcasts.values():
for settings in namespaces.values():
BroadcastConfigValidator().validate(
settings, SPEC['runtime']['__MANY__']
)

def _match_ext_trigger(self, itask):
"""Match external triggers for a waiting task proxy."""
if not self.ext_triggers or not itask.state.external_triggers:
Expand Down Expand Up @@ -261,11 +288,21 @@ def put_broadcast(
elif not bad_point:
if namespace not in self.broadcasts[point_string]:
self.broadcasts[point_string][namespace] = {}
# Keep saved/reported setting as workflow
# config format.
modified_settings.append(
(point_string, namespace, deepcopy(setting))
)
# Coerce setting to cylc runtime object,
# i.e. str to DurationFloat.
BroadcastConfigValidator().validate(
setting,
SPEC['runtime']['__MANY__']
)
addict(
self.broadcasts[point_string][namespace],
setting)
modified_settings.append(
(point_string, namespace, setting))
setting
)

# Log the broadcast
self.workflow_db_mgr.put_broadcast(modified_settings)
Expand Down
40 changes: 28 additions & 12 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,29 @@ message PbWorkflow {
optional int32 is_runahead_total = 38;
}

// Selected runtime fields
message PbRuntime {
optional string platform = 1;
optional string script = 2;
optional string init_script = 3;
optional string env_script = 4;
optional string err_script = 5;
optional string exit_script = 6;
optional string pre_script = 7;
optional string post_script = 8;
optional string work_sub_dir = 9;
optional string execution_polling_intervals = 10;
optional string execution_retry_delays = 11;
optional string execution_time_limit = 12;
optional string submission_polling_intervals = 13;
optional string submission_retry_delays = 14;
optional string directives = 15;
optional string environment = 16;
optional string outputs = 17;
}


// Nodes
message PbJob {
optional string stamp = 1;
optional string id = 2;
Expand All @@ -117,25 +140,14 @@ message PbJob {
optional string finished_time = 8;
optional string job_id = 9;
optional string job_runner_name = 10;
optional string env_script = 11;
optional string err_script = 12;
optional string exit_script = 13;
optional float execution_time_limit = 14;
optional string platform = 15;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left this in, so current API doesn't break.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to keep this as the job platform is chosen based on the runtime configuration (e.g. platform could be set to a platform-group) causing these two values to differ.

optional string init_script = 16;
optional string job_log_dir = 17;
optional string post_script = 19;
optional string pre_script = 20;
optional string script = 21;
optional string shell = 22;
optional string work_sub_dir = 23;
optional string environment = 25;
optional string directives = 26;
optional string param_var = 28;
repeated string extra_logs = 29;
optional string name = 30; /* filter item */
optional string cycle_point = 31; /* filter item */
repeated string messages = 32;
optional PbRuntime runtime = 33;
}

message PbTask {
Expand All @@ -149,6 +161,7 @@ message PbTask {
repeated string namespace = 8;
repeated string parents = 9;
optional string first_parent = 10;
optional PbRuntime runtime = 11;
}

message PbPollTask {
Expand Down Expand Up @@ -219,6 +232,7 @@ message PbTaskProxy {
optional bool is_queued = 25;
optional bool is_runahead = 26;
optional bool flow_wait = 27;
optional PbRuntime runtime = 28;
}

message PbFamily {
Expand All @@ -232,6 +246,7 @@ message PbFamily {
repeated string child_tasks = 8;
repeated string child_families = 9;
optional string first_parent = 10;
optional PbRuntime runtime = 11;
}

message PbFamilyProxy {
Expand All @@ -254,6 +269,7 @@ message PbFamilyProxy {
optional int32 is_queued_total = 18;
optional bool is_runahead = 19;
optional int32 is_runahead_total = 20;
optional PbRuntime runtime = 21;
}

message PbEdge {
Expand Down
104 changes: 53 additions & 51 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

Loading