Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

platforms: fix unreachable hosts not reset on platform group failure #6109

Merged
merged 13 commits into from
Jun 12, 2024
1 change: 1 addition & 0 deletions changes.d/fix.6109.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that if all hosts on all platforms in a group are exhausted at job preparation time that they will be reset.
wxtim marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 4 additions & 1 deletion cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Callable,
Dict,
Iterable,
List,
NoReturn,
Optional,
Tuple,
Expand Down Expand Up @@ -449,10 +450,12 @@ class NoPlatformsError(PlatformLookupError):
place: Where the attempt to get the platform failed.
wxtim marked this conversation as resolved.
Show resolved Hide resolved
"""
def __init__(
self, identity: str, set_type: str = 'group', place: str = ''
self, identity: str, set_type: str = 'group', place: str = '',
hosts_consumed: Optional[List[str]] = None
):
self.identity = identity
self.type = set_type
self.hosts_consumed = set(hosts_consumed) if hosts_consumed else set()
wxtim marked this conversation as resolved.
Show resolved Hide resolved
if place:
self.place = f' during {place}.'
else:
Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,12 @@ def get_platform_from_group(

# Return False if there are no platforms available to be selected.
wxtim marked this conversation as resolved.
Show resolved Hide resolved
if not platform_names:
raise NoPlatformsError(group_name)
hosts_consumed = [
host
for platform in group['platforms']
for host in platform_from_name(platform)['hosts']]
raise NoPlatformsError(
group_name, hosts_consumed=hosts_consumed)
wxtim marked this conversation as resolved.
Show resolved Hide resolved

# Get the selection method
method = group['selection']['method']
Expand Down
33 changes: 27 additions & 6 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
"""
prepared_tasks = []
bad_tasks = []
out_of_hosts_tasks: list = []
wxtim marked this conversation as resolved.
Show resolved Hide resolved
for itask in itasks:
if not itask.state(TASK_STATUS_PREPARING):
# bump the submit_num *before* resetting the state so that the
Expand All @@ -239,11 +240,15 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
self.data_store_mgr.delta_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
if prep_task:
if prep_task is True:
# This is a task whose platform has run out of hosts
# it's neither bad or good.
out_of_hosts_tasks.append(itask)
elif prep_task:
prepared_tasks.append(itask)
elif prep_task is False:
bad_tasks.append(itask)
return [prepared_tasks, bad_tasks]
return [prepared_tasks, bad_tasks, out_of_hosts_tasks]

def submit_task_jobs(self, workflow, itasks, curve_auth,
client_pub_key_dir, is_simulation=False):
Expand All @@ -265,13 +270,17 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
if is_simulation:
return self._simulation_submit_task_jobs(itasks, workflow)
# Prepare tasks for job submission
prepared_tasks, bad_tasks = self.prep_submit_task_jobs(
workflow, itasks)
(
prepared_tasks,
bad_tasks,
out_of_hosts_tasks
) = self.prep_submit_task_jobs(workflow, itasks)
# Reset consumed host selection results
self.task_remote_mgr.subshell_eval_reset()

if not prepared_tasks:
if not prepared_tasks and not out_of_hosts_tasks:
return bad_tasks
MetRonnie marked this conversation as resolved.
Show resolved Hide resolved

auth_itasks = {} # {platform: [itask, ...], ...}

for itask in prepared_tasks:
Expand All @@ -281,6 +290,9 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
# Submit task jobs for each platform
done_tasks = bad_tasks

# Out of host tasks can be considered done for now:
[done_tasks.append(itask) for itask in out_of_hosts_tasks]
wxtim marked this conversation as resolved.
Show resolved Hide resolved

for _, itasks in sorted(auth_itasks.items()):
# Find the first platform where >1 host has not been tried and
# found to be unreachable.
Expand Down Expand Up @@ -1087,7 +1099,8 @@ def _prep_submit_task_job(
Returns:
* itask - preparation complete.
* None - preparation in progress.
* False - perparation failed.
* False - preparation failed.
wxtim marked this conversation as resolved.
Show resolved Hide resolved
* True - preparation failed because no platforms were found.
wxtim marked this conversation as resolved.
Show resolved Hide resolved

"""
if itask.local_job_file_path:
Expand Down Expand Up @@ -1181,6 +1194,14 @@ def _prep_submit_task_job(
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
if isinstance(exc, NoPlatformsError):
# Todo = need to clear all hosts from all platforms
# in group.
self.bad_hosts -= exc.hosts_consumed
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
workflow, itask, '(no platforms available)', exc)
return True
self._prep_submit_task_job_error(
workflow, itask, '(platform not defined)', exc)
return False
Expand Down
60 changes: 60 additions & 0 deletions tests/integration/test_platforms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Integration testing for platforms functionality.
"""


async def test_foo(flow, scheduler, run, mock_glbl_cfg, validate, monkeypatch):
wxtim marked this conversation as resolved.
Show resolved Hide resolved
global_conf = '''
[platforms]
[[myplatform]]
hosts = broken
[[anotherbad]]
hosts = broken2
[platform groups]
[[mygroup]]
platforms = myplatform, anotherbad'''
mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_conf)

wid = flow({
"scheduling": {
"graph": {
"R1": "foo"
}
},
"runtime": {
"root": {},
"print-config": {
"script": "cylc config"
},
wxtim marked this conversation as resolved.
Show resolved Hide resolved
"foo": {
"script": "sleep 10",
"platform": "mygroup",
"submission retry delays": '3*PT5S'
}
}
})
validate(wid)
schd = scheduler(wid, paused_start=False, run_mode='live')
async with run(schd) as log:
wxtim marked this conversation as resolved.
Show resolved Hide resolved
itask = schd.pool.get_tasks()[0]

# Avoid breaking on trying to create log file path:
schd.task_job_mgr._create_job_log_path = lambda *_: None
schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
wxtim marked this conversation as resolved.
Show resolved Hide resolved
res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
assert res is True
assert not schd.task_job_mgr.bad_hosts
wxtim marked this conversation as resolved.
Show resolved Hide resolved
Loading