Skip to content

Commit

Permalink
Fix bug where bad platforms found at job init are not reset.
Browse files Browse the repository at this point in the history
added test which includes break

return list of all hosts consumed in a platform group.
  • Loading branch information
wxtim committed May 23, 2024
1 parent 2ae50b2 commit 13034d6
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 8 deletions.
5 changes: 4 additions & 1 deletion cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Callable,
Dict,
Iterable,
List,
NoReturn,
Optional,
Tuple,
Expand Down Expand Up @@ -449,10 +450,12 @@ class NoPlatformsError(PlatformLookupError):
place: Where the attempt to get the platform failed.
"""
def __init__(
self, identity: str, set_type: str = 'group', place: str = ''
self, identity: str, set_type: str = 'group', place: str = '',
hosts_consumed: Optional[List[str]] = None
):
self.identity = identity
self.type = set_type
self.hosts_consumed = set(hosts_consumed) if hosts_consumed else set()
if place:
self.place = f' during {place}.'
else:
Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,12 @@ def get_platform_from_group(

# Return False if there are no platforms available to be selected.
if not platform_names:
raise NoPlatformsError(group_name)
hosts_consumed = [
host
for platform in group['platforms']
for host in platform_from_name(platform)['hosts']]
raise NoPlatformsError(
group_name, hosts_consumed=hosts_consumed)

# Get the selection method
method = group['selection']['method']
Expand Down
33 changes: 27 additions & 6 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
"""
prepared_tasks = []
bad_tasks = []
out_of_hosts_tasks: list = []
for itask in itasks:
if not itask.state(TASK_STATUS_PREPARING):
# bump the submit_num *before* resetting the state so that the
Expand All @@ -239,11 +240,15 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
self.data_store_mgr.delta_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
if prep_task:
if prep_task is True:
# This is a task whose platform has run out of hosts
# it's neither bad or good.
out_of_hosts_tasks.append(itask)
elif prep_task:
prepared_tasks.append(itask)
elif prep_task is False:
bad_tasks.append(itask)
return [prepared_tasks, bad_tasks]
return [prepared_tasks, bad_tasks, out_of_hosts_tasks]

def submit_task_jobs(self, workflow, itasks, curve_auth,
client_pub_key_dir, is_simulation=False):
Expand All @@ -265,13 +270,17 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
if is_simulation:
return self._simulation_submit_task_jobs(itasks, workflow)
# Prepare tasks for job submission
prepared_tasks, bad_tasks = self.prep_submit_task_jobs(
workflow, itasks)
(
prepared_tasks,
bad_tasks,
out_of_hosts_tasks
) = self.prep_submit_task_jobs(workflow, itasks)
# Reset consumed host selection results
self.task_remote_mgr.subshell_eval_reset()

if not prepared_tasks:
if not prepared_tasks and not out_of_hosts_tasks:
return bad_tasks

auth_itasks = {} # {platform: [itask, ...], ...}

for itask in prepared_tasks:
Expand All @@ -281,6 +290,9 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
# Submit task jobs for each platform
done_tasks = bad_tasks

# Out of host tasks can be considered done for now:
[done_tasks.append(itask) for itask in out_of_hosts_tasks]

for _, itasks in sorted(auth_itasks.items()):
# Find the first platform where >1 host has not been tried and
# found to be unreachable.
Expand Down Expand Up @@ -1087,7 +1099,8 @@ def _prep_submit_task_job(
Returns:
* itask - preparation complete.
* None - preparation in progress.
* False - perparation failed.
* False - preparation failed.
* True - preparation failed because no platforms were found.
"""
if itask.local_job_file_path:
Expand Down Expand Up @@ -1181,6 +1194,14 @@ def _prep_submit_task_job(
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
if isinstance(exc, NoPlatformsError):
# Todo = need to clear all hosts from all platforms
# in group.
self.bad_hosts -= exc.hosts_consumed
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
workflow, itask, '(no platforms available)', exc)
return True
self._prep_submit_task_job_error(
workflow, itask, '(platform not defined)', exc)
return False
Expand Down
60 changes: 60 additions & 0 deletions tests/integration/test_platforms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Integration testing for platforms functionality.
"""


async def test_foo(flow, scheduler, run, mock_glbl_cfg, validate, monkeypatch):
global_conf = '''
[platforms]
[[myplatform]]
hosts = broken
[[anotherbad]]
hosts = broken2
[platform groups]
[[mygroup]]
platforms = myplatform, anotherbad'''
mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_conf)

wid = flow({
"scheduling": {
"graph": {
"R1": "foo"
}
},
"runtime": {
"root": {},
"print-config": {
"script": "cylc config"
},
"foo": {
"script": "sleep 10",
"platform": "mygroup",
"submission retry delays": '3*PT5S'
}
}
})
validate(wid)
schd = scheduler(wid, paused_start=False, run_mode='live')
async with run(schd) as log:
itask = schd.pool.get_tasks()[0]

# Avoid breaking on trying to create log file path:
schd.task_job_mgr._create_job_log_path = lambda *_: None
schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
assert res is True
assert not schd.task_job_mgr.bad_hosts

0 comments on commit 13034d6

Please sign in to comment.