From d30ac43af521226cf8afe05a1841efbc64e2f059 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Mon, 20 May 2024 15:10:45 +0100
Subject: [PATCH 01/13] Fix bug where bad platforms found at job init are not
reset.
added test which includes break
return list of all hosts consumed in a platform group.
---
changes.d/fix.6109.md | 1 +
cylc/flow/exceptions.py | 5 ++-
cylc/flow/platforms.py | 7 +++-
cylc/flow/task_job_mgr.py | 33 +++++++++++++---
tests/integration/test_platforms.py | 60 +++++++++++++++++++++++++++++
5 files changed, 98 insertions(+), 8 deletions(-)
create mode 100644 changes.d/fix.6109.md
create mode 100644 tests/integration/test_platforms.py
diff --git a/changes.d/fix.6109.md b/changes.d/fix.6109.md
new file mode 100644
index 00000000000..95067bc980a
--- /dev/null
+++ b/changes.d/fix.6109.md
@@ -0,0 +1 @@
+Ensure that if all hosts on all platforms in a group are exhausted at job preparation time that they will be reset.
\ No newline at end of file
diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py
index 79a726d7bbe..ee8243240c7 100644
--- a/cylc/flow/exceptions.py
+++ b/cylc/flow/exceptions.py
@@ -21,6 +21,7 @@
Callable,
Dict,
Iterable,
+ List,
NoReturn,
Optional,
Tuple,
@@ -449,10 +450,12 @@ class NoPlatformsError(PlatformLookupError):
place: Where the attempt to get the platform failed.
"""
def __init__(
- self, identity: str, set_type: str = 'group', place: str = ''
+ self, identity: str, set_type: str = 'group', place: str = '',
+ hosts_consumed: Optional[List[str]] = None
):
self.identity = identity
self.type = set_type
+ self.hosts_consumed = set(hosts_consumed) if hosts_consumed else set()
if place:
self.place = f' during {place}.'
else:
diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py
index 72ccebffe52..723a10202e8 100644
--- a/cylc/flow/platforms.py
+++ b/cylc/flow/platforms.py
@@ -304,7 +304,12 @@ def get_platform_from_group(
# Return False if there are no platforms available to be selected.
if not platform_names:
- raise NoPlatformsError(group_name)
+ hosts_consumed = [
+ host
+ for platform in group['platforms']
+ for host in platform_from_name(platform)['hosts']]
+ raise NoPlatformsError(
+ group_name, hosts_consumed=hosts_consumed)
# Get the selection method
method = group['selection']['method']
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index e41e05dfd30..b833a4ca6d8 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -230,6 +230,7 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
"""
prepared_tasks = []
bad_tasks = []
+ out_of_hosts_tasks: list = []
for itask in itasks:
if not itask.state(TASK_STATUS_PREPARING):
# bump the submit_num *before* resetting the state so that the
@@ -239,11 +240,15 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
self.data_store_mgr.delta_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
- if prep_task:
+ if prep_task is True:
+ # This is a task whose platform has run out of hosts
+ # it's neither bad or good.
+ out_of_hosts_tasks.append(itask)
+ elif prep_task:
prepared_tasks.append(itask)
elif prep_task is False:
bad_tasks.append(itask)
- return [prepared_tasks, bad_tasks]
+ return [prepared_tasks, bad_tasks, out_of_hosts_tasks]
def submit_task_jobs(self, workflow, itasks, curve_auth,
client_pub_key_dir, is_simulation=False):
@@ -265,13 +270,17 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
if is_simulation:
return self._simulation_submit_task_jobs(itasks, workflow)
# Prepare tasks for job submission
- prepared_tasks, bad_tasks = self.prep_submit_task_jobs(
- workflow, itasks)
+ (
+ prepared_tasks,
+ bad_tasks,
+ out_of_hosts_tasks
+ ) = self.prep_submit_task_jobs(workflow, itasks)
# Reset consumed host selection results
self.task_remote_mgr.subshell_eval_reset()
- if not prepared_tasks:
+ if not prepared_tasks and not out_of_hosts_tasks:
return bad_tasks
+
auth_itasks = {} # {platform: [itask, ...], ...}
for itask in prepared_tasks:
@@ -281,6 +290,9 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
# Submit task jobs for each platform
done_tasks = bad_tasks
+ # Out of host tasks can be considered done for now:
+ [done_tasks.append(itask) for itask in out_of_hosts_tasks]
+
for _, itasks in sorted(auth_itasks.items()):
# Find the first platform where >1 host has not been tried and
# found to be unreachable.
@@ -1087,7 +1099,8 @@ def _prep_submit_task_job(
Returns:
* itask - preparation complete.
* None - preparation in progress.
- * False - perparation failed.
+ * False - preparation failed.
+ * True - preparation failed because no platforms were found.
"""
if itask.local_job_file_path:
@@ -1181,6 +1194,14 @@ def _prep_submit_task_job(
itask.summary['platforms_used'][itask.submit_num] = ''
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
+ if isinstance(exc, NoPlatformsError):
+ # Todo = need to clear all hosts from all platforms
+ # in group.
+ self.bad_hosts -= exc.hosts_consumed
+ self._set_retry_timers(itask, rtconfig)
+ self._prep_submit_task_job_error(
+ workflow, itask, '(no platforms available)', exc)
+ return True
self._prep_submit_task_job_error(
workflow, itask, '(platform not defined)', exc)
return False
diff --git a/tests/integration/test_platforms.py b/tests/integration/test_platforms.py
new file mode 100644
index 00000000000..671b31adffd
--- /dev/null
+++ b/tests/integration/test_platforms.py
@@ -0,0 +1,60 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+"""Integration testing for platforms functionality.
+"""
+
+
+async def test_foo(flow, scheduler, run, mock_glbl_cfg, validate, monkeypatch):
+ global_conf = '''
+ [platforms]
+ [[myplatform]]
+ hosts = broken
+ [[anotherbad]]
+ hosts = broken2
+ [platform groups]
+ [[mygroup]]
+ platforms = myplatform, anotherbad'''
+ mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_conf)
+
+ wid = flow({
+ "scheduling": {
+ "graph": {
+ "R1": "foo"
+ }
+ },
+ "runtime": {
+ "root": {},
+ "print-config": {
+ "script": "cylc config"
+ },
+ "foo": {
+ "script": "sleep 10",
+ "platform": "mygroup",
+ "submission retry delays": '3*PT5S'
+ }
+ }
+ })
+ validate(wid)
+ schd = scheduler(wid, paused_start=False, run_mode='live')
+ async with run(schd) as log:
+ itask = schd.pool.get_tasks()[0]
+
+ # Avoid breaking on trying to create log file path:
+ schd.task_job_mgr._create_job_log_path = lambda *_: None
+ schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
+ res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
+ assert res is True
+ assert not schd.task_job_mgr.bad_hosts
From 14ed16af0c8c7a2f84b64fe31db7a3e4e06e7ed4 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 28 May 2024 09:43:05 +0100
Subject: [PATCH 02/13] Apply suggestions from code review
Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
---
cylc/flow/platforms.py | 2 +-
cylc/flow/task_job_mgr.py | 15 +++++------
tests/integration/test_platforms.py | 41 +++++++++++++----------------
3 files changed, 25 insertions(+), 33 deletions(-)
diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py
index 723a10202e8..706d3459009 100644
--- a/cylc/flow/platforms.py
+++ b/cylc/flow/platforms.py
@@ -302,7 +302,7 @@ def get_platform_from_group(
else:
platform_names = group['platforms']
- # Return False if there are no platforms available to be selected.
+ # If there are no platforms available to be selected:
if not platform_names:
hosts_consumed = [
host
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index b833a4ca6d8..2ef4c3650f9 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -36,7 +36,7 @@
)
from shutil import rmtree
from time import time
-from typing import TYPE_CHECKING, Any, Union, Optional
+from typing import TYPE_CHECKING, Any, List, Union, Optional
from cylc.flow import LOG
from cylc.flow.job_runner_mgr import JobPollContext
@@ -230,7 +230,7 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
"""
prepared_tasks = []
bad_tasks = []
- out_of_hosts_tasks: list = []
+ out_of_hosts_tasks: List[TaskProxy] = []
for itask in itasks:
if not itask.state(TASK_STATUS_PREPARING):
# bump the submit_num *before* resetting the state so that the
@@ -240,9 +240,8 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
self.data_store_mgr.delta_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
- if prep_task is True:
+ if isinstance(prep_task, NoPlatformsError):
# This is a task whose platform has run out of hosts
- # it's neither bad or good.
out_of_hosts_tasks.append(itask)
elif prep_task:
prepared_tasks.append(itask)
@@ -288,10 +287,8 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
auth_itasks.setdefault(platform_name, [])
auth_itasks[platform_name].append(itask)
# Submit task jobs for each platform
- done_tasks = bad_tasks
-
- # Out of host tasks can be considered done for now:
- [done_tasks.append(itask) for itask in out_of_hosts_tasks]
+ # Non-prepared tasks can be considered done for now:
+ done_tasks = [*bad_tasks, *out_of_hosts_tasks]
for _, itasks in sorted(auth_itasks.items()):
# Find the first platform where >1 host has not been tried and
@@ -1201,7 +1198,7 @@ def _prep_submit_task_job(
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
workflow, itask, '(no platforms available)', exc)
- return True
+ return exc
self._prep_submit_task_job_error(
workflow, itask, '(platform not defined)', exc)
return False
diff --git a/tests/integration/test_platforms.py b/tests/integration/test_platforms.py
index 671b31adffd..d88d6ad1646 100644
--- a/tests/integration/test_platforms.py
+++ b/tests/integration/test_platforms.py
@@ -16,8 +16,20 @@
"""Integration testing for platforms functionality.
"""
+from contextlib import suppress
-async def test_foo(flow, scheduler, run, mock_glbl_cfg, validate, monkeypatch):
+from cylc.flow.exceptions import NoPlatformsError
+from cylc.flow.task_job_logs import get_task_job_activity_log
+
+
+async def test_prep_submit_task_tries_multiple_platforms(
+ flow, scheduler, run, mock_glbl_cfg, monkeypatch, tmp_path
+):
+ """Preparation tries multiple platforms within a group if the
+ task platform setting matches a group, and that after all platforms
+ have been tried that the hosts matching that platform group are
+ cleared.
+ """
global_conf = '''
[platforms]
[[myplatform]]
@@ -30,31 +42,14 @@ async def test_foo(flow, scheduler, run, mock_glbl_cfg, validate, monkeypatch):
mock_glbl_cfg('cylc.flow.platforms.glbl_cfg', global_conf)
wid = flow({
- "scheduling": {
- "graph": {
- "R1": "foo"
- }
- },
- "runtime": {
- "root": {},
- "print-config": {
- "script": "cylc config"
- },
- "foo": {
- "script": "sleep 10",
- "platform": "mygroup",
- "submission retry delays": '3*PT5S'
- }
- }
+ "scheduling": {"graph": {"R1": "foo"}},
+ "runtime": {"foo": {"platform": "mygroup"}}
})
- validate(wid)
schd = scheduler(wid, paused_start=False, run_mode='live')
- async with run(schd) as log:
+ async with run(schd):
itask = schd.pool.get_tasks()[0]
-
- # Avoid breaking on trying to create log file path:
- schd.task_job_mgr._create_job_log_path = lambda *_: None
+ itask.submit_num = 1
schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
- assert res is True
+ assert isinstance(res, NoPlatformsError)
assert not schd.task_job_mgr.bad_hosts
From 44809e23bfedb6d45e0da2c747e93147b9f4630c Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 29 May 2024 12:55:48 +0100
Subject: [PATCH 03/13] Update cylc/flow/task_job_mgr.py
---
cylc/flow/task_job_mgr.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 2ef4c3650f9..c90a8a33bee 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -1097,7 +1097,8 @@ def _prep_submit_task_job(
* itask - preparation complete.
* None - preparation in progress.
* False - preparation failed.
- * True - preparation failed because no platforms were found.
+ * NoPlatformsError - preparation failed because no
+ platforms were found.
"""
if itask.local_job_file_path:
From b9b337fb3dd3807292b5ac01b834c499f1356abc Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 29 May 2024 13:18:43 +0100
Subject: [PATCH 04/13] Simplify change in response to Oliver's review.
---
cylc/flow/task_job_mgr.py | 31 ++++++++++-------------------
tests/integration/test_platforms.py | 2 +-
2 files changed, 12 insertions(+), 21 deletions(-)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index c90a8a33bee..3efa5e18ca8 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -36,7 +36,7 @@
)
from shutil import rmtree
from time import time
-from typing import TYPE_CHECKING, Any, List, Union, Optional
+from typing import TYPE_CHECKING, Any, Union, Optional
from cylc.flow import LOG
from cylc.flow.job_runner_mgr import JobPollContext
@@ -230,7 +230,6 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
"""
prepared_tasks = []
bad_tasks = []
- out_of_hosts_tasks: List[TaskProxy] = []
for itask in itasks:
if not itask.state(TASK_STATUS_PREPARING):
# bump the submit_num *before* resetting the state so that the
@@ -240,14 +239,11 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
self.data_store_mgr.delta_task_state(itask)
prep_task = self._prep_submit_task_job(
workflow, itask, check_syntax=check_syntax)
- if isinstance(prep_task, NoPlatformsError):
- # This is a task whose platform has run out of hosts
- out_of_hosts_tasks.append(itask)
- elif prep_task:
+ if prep_task:
prepared_tasks.append(itask)
elif prep_task is False:
bad_tasks.append(itask)
- return [prepared_tasks, bad_tasks, out_of_hosts_tasks]
+ return [prepared_tasks, bad_tasks]
def submit_task_jobs(self, workflow, itasks, curve_auth,
client_pub_key_dir, is_simulation=False):
@@ -269,15 +265,13 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
if is_simulation:
return self._simulation_submit_task_jobs(itasks, workflow)
# Prepare tasks for job submission
- (
- prepared_tasks,
- bad_tasks,
- out_of_hosts_tasks
- ) = self.prep_submit_task_jobs(workflow, itasks)
+ prepared_tasks, bad_tasks = self.prep_submit_task_jobs(
+ workflow, itasks)
+
# Reset consumed host selection results
self.task_remote_mgr.subshell_eval_reset()
- if not prepared_tasks and not out_of_hosts_tasks:
+ if not prepared_tasks:
return bad_tasks
auth_itasks = {} # {platform: [itask, ...], ...}
@@ -288,7 +282,7 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
auth_itasks[platform_name].append(itask)
# Submit task jobs for each platform
# Non-prepared tasks can be considered done for now:
- done_tasks = [*bad_tasks, *out_of_hosts_tasks]
+ done_tasks = bad_tasks
for _, itasks in sorted(auth_itasks.items()):
# Find the first platform where >1 host has not been tried and
@@ -1097,9 +1091,6 @@ def _prep_submit_task_job(
* itask - preparation complete.
* None - preparation in progress.
* False - preparation failed.
- * NoPlatformsError - preparation failed because no
- platforms were found.
-
"""
if itask.local_job_file_path:
return itask
@@ -1193,13 +1184,13 @@ def _prep_submit_task_job(
# Retry delays, needed for the try_num
self._create_job_log_path(workflow, itask)
if isinstance(exc, NoPlatformsError):
- # Todo = need to clear all hosts from all platforms
- # in group.
+ # Clear all hosts from all platforms in group from
+ # bad_hosts:
self.bad_hosts -= exc.hosts_consumed
self._set_retry_timers(itask, rtconfig)
self._prep_submit_task_job_error(
workflow, itask, '(no platforms available)', exc)
- return exc
+ return False
self._prep_submit_task_job_error(
workflow, itask, '(platform not defined)', exc)
return False
diff --git a/tests/integration/test_platforms.py b/tests/integration/test_platforms.py
index d88d6ad1646..8d16ac19cfe 100644
--- a/tests/integration/test_platforms.py
+++ b/tests/integration/test_platforms.py
@@ -51,5 +51,5 @@ async def test_prep_submit_task_tries_multiple_platforms(
itask.submit_num = 1
schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
- assert isinstance(res, NoPlatformsError)
+ assert res is False
assert not schd.task_job_mgr.bad_hosts
From 03bb69773960bb1ce2682b6bb66324be3ae9c41c Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Fri, 31 May 2024 09:29:26 +0100
Subject: [PATCH 05/13] Update changes.d/fix.6109.md
Co-authored-by: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
---
changes.d/fix.6109.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/changes.d/fix.6109.md b/changes.d/fix.6109.md
index 95067bc980a..36f22c3d4fc 100644
--- a/changes.d/fix.6109.md
+++ b/changes.d/fix.6109.md
@@ -1 +1 @@
-Ensure that if all hosts on all platforms in a group are exhausted at job preparation time that they will be reset.
\ No newline at end of file
+Fixed bug affecting job submission where the list of bad hosts was not always reset correctly.
\ No newline at end of file
From c00e9b4d9455b711e35c21a53691979047649ab7 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 11 Jun 2024 15:01:41 +0100
Subject: [PATCH 06/13] Apply suggestions from code review
Co-authored-by: Oliver Sanders
---
tests/integration/test_platforms.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/tests/integration/test_platforms.py b/tests/integration/test_platforms.py
index 8d16ac19cfe..444c61ce2b9 100644
--- a/tests/integration/test_platforms.py
+++ b/tests/integration/test_platforms.py
@@ -52,4 +52,5 @@ async def test_prep_submit_task_tries_multiple_platforms(
schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
assert res is False
+ # ensure the bad hosts have been cleared
assert not schd.task_job_mgr.bad_hosts
From c8cc9c5cdf5205e5070cc2353e892d9d041e9162 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 11 Jun 2024 15:02:25 +0100
Subject: [PATCH 07/13] Update cylc/flow/platforms.py
Co-authored-by: Oliver Sanders
---
cylc/flow/platforms.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py
index 706d3459009..60cfeda82d6 100644
--- a/cylc/flow/platforms.py
+++ b/cylc/flow/platforms.py
@@ -304,12 +304,12 @@ def get_platform_from_group(
# If there are no platforms available to be selected:
if not platform_names:
- hosts_consumed = [
+ hosts_consumed = {
host
for platform in group['platforms']
- for host in platform_from_name(platform)['hosts']]
- raise NoPlatformsError(
- group_name, hosts_consumed=hosts_consumed)
+ for host in platform_from_name(platform)['hosts']
+ }
+ raise NoPlatformsError(group_name, hosts_consumed)
# Get the selection method
method = group['selection']['method']
From 37fd520c5d3386d45435414dd4285fb4fb439078 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Tue, 11 Jun 2024 15:02:44 +0100
Subject: [PATCH 08/13] Apply suggestions from code review
Co-authored-by: Oliver Sanders
---
cylc/flow/task_job_mgr.py | 1 +
tests/integration/test_platforms.py | 17 +++++++----------
2 files changed, 8 insertions(+), 10 deletions(-)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 3efa5e18ca8..019de580ea6 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -1091,6 +1091,7 @@ def _prep_submit_task_job(
* itask - preparation complete.
* None - preparation in progress.
* False - preparation failed.
+
"""
if itask.local_job_file_path:
return itask
diff --git a/tests/integration/test_platforms.py b/tests/integration/test_platforms.py
index 444c61ce2b9..f8187227ceb 100644
--- a/tests/integration/test_platforms.py
+++ b/tests/integration/test_platforms.py
@@ -13,22 +13,18 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Integration testing for platforms functionality.
-"""
-
-from contextlib import suppress
-
-from cylc.flow.exceptions import NoPlatformsError
-from cylc.flow.task_job_logs import get_task_job_activity_log
+"""Integration testing for platforms functionality."""
async def test_prep_submit_task_tries_multiple_platforms(
- flow, scheduler, run, mock_glbl_cfg, monkeypatch, tmp_path
+ flow, scheduler, start, mock_glbl_cfg
):
"""Preparation tries multiple platforms within a group if the
task platform setting matches a group, and that after all platforms
have been tried that the hosts matching that platform group are
cleared.
+
+ See https://github.com/cylc/cylc-flow/pull/6109
"""
global_conf = '''
[platforms]
@@ -45,10 +41,11 @@ async def test_prep_submit_task_tries_multiple_platforms(
"scheduling": {"graph": {"R1": "foo"}},
"runtime": {"foo": {"platform": "mygroup"}}
})
- schd = scheduler(wid, paused_start=False, run_mode='live')
- async with run(schd):
+ schd = scheduler(wid, run_mode='live')
+ async with start(schd):
itask = schd.pool.get_tasks()[0]
itask.submit_num = 1
+ # simulate failed attempts to contact the job hosts
schd.task_job_mgr.bad_hosts = {'broken', 'broken2'}
res = schd.task_job_mgr._prep_submit_task_job(schd.workflow, itask)
assert res is False
From 9a1401f639035bd6de789da6533ed037ef9768c3 Mon Sep 17 00:00:00 2001
From: WXTIM <26465611+wxtim@users.noreply.github.com>
Date: Tue, 11 Jun 2024 15:20:02 +0100
Subject: [PATCH 09/13] response to review
---
cylc/flow/exceptions.py | 12 ++++++++----
cylc/flow/platforms.py | 6 +++---
2 files changed, 11 insertions(+), 7 deletions(-)
diff --git a/cylc/flow/exceptions.py b/cylc/flow/exceptions.py
index ee8243240c7..1f9092c9a29 100644
--- a/cylc/flow/exceptions.py
+++ b/cylc/flow/exceptions.py
@@ -21,7 +21,7 @@
Callable,
Dict,
Iterable,
- List,
+ Set,
NoReturn,
Optional,
Tuple,
@@ -445,17 +445,21 @@ class NoPlatformsError(PlatformLookupError):
Args:
identity: The name of the platform group or install target
+ hosts_consumed: Hosts which have already been tried.
set_type: Whether the set of platforms is a platform group or an
install target
place: Where the attempt to get the platform failed.
"""
def __init__(
- self, identity: str, set_type: str = 'group', place: str = '',
- hosts_consumed: Optional[List[str]] = None
+ self,
+ identity: str,
+ hosts_consumed: Set[str],
+ set_type: str = 'group',
+ place: str = '',
):
self.identity = identity
self.type = set_type
- self.hosts_consumed = set(hosts_consumed) if hosts_consumed else set()
+ self.hosts_consumed = hosts_consumed
if place:
self.place = f' during {place}.'
else:
diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py
index 60cfeda82d6..67ceb7a52c6 100644
--- a/cylc/flow/platforms.py
+++ b/cylc/flow/platforms.py
@@ -307,9 +307,9 @@ def get_platform_from_group(
hosts_consumed = {
host
for platform in group['platforms']
- for host in platform_from_name(platform)['hosts']
- }
- raise NoPlatformsError(group_name, hosts_consumed)
+ for host in platform_from_name(platform)['hosts']}
+ raise NoPlatformsError(
+ group_name, hosts_consumed
# Get the selection method
method = group['selection']['method']
From 5549ae2589c334dd6b24d3948671c6b0e6ffcbae Mon Sep 17 00:00:00 2001
From: WXTIM <26465611+wxtim@users.noreply.github.com>
Date: Tue, 11 Jun 2024 15:25:32 +0100
Subject: [PATCH 10/13] smallfix
---
cylc/flow/platforms.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cylc/flow/platforms.py b/cylc/flow/platforms.py
index 67ceb7a52c6..d06c84ade92 100644
--- a/cylc/flow/platforms.py
+++ b/cylc/flow/platforms.py
@@ -309,7 +309,7 @@ def get_platform_from_group(
for platform in group['platforms']
for host in platform_from_name(platform)['hosts']}
raise NoPlatformsError(
- group_name, hosts_consumed
+ group_name, hosts_consumed)
# Get the selection method
method = group['selection']['method']
From 7820da81ba1c1d783e4ac14d29ccbde7d5c8adf4 Mon Sep 17 00:00:00 2001
From: WXTIM <26465611+wxtim@users.noreply.github.com>
Date: Tue, 11 Jun 2024 15:38:11 +0100
Subject: [PATCH 11/13] mypy fix
---
cylc/flow/task_remote_mgr.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py
index 2797672617b..b610580051e 100644
--- a/cylc/flow/task_remote_mgr.py
+++ b/cylc/flow/task_remote_mgr.py
@@ -388,7 +388,7 @@ def remote_tidy(self) -> None:
else:
LOG.error(
NoPlatformsError(
- install_target, 'install target', 'remote tidy'))
+ install_target, Set(), 'install target', 'remote tidy'))
# Wait for commands to complete for a max of 10 seconds
timeout = time() + 10.0
while queue and time() < timeout:
From 2bc3d3ae376e7695f208ce3568df703d449e7fdb Mon Sep 17 00:00:00 2001
From: WXTIM <26465611+wxtim@users.noreply.github.com>
Date: Tue, 11 Jun 2024 16:48:58 +0100
Subject: [PATCH 12/13] fix
---
cylc/flow/task_remote_mgr.py | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py
index b610580051e..2ac9fd7c61d 100644
--- a/cylc/flow/task_remote_mgr.py
+++ b/cylc/flow/task_remote_mgr.py
@@ -388,7 +388,10 @@ def remote_tidy(self) -> None:
else:
LOG.error(
NoPlatformsError(
- install_target, Set(), 'install target', 'remote tidy'))
+ install_target,
+ Set(),
+ 'install target',
+ 'remote tidy'))
# Wait for commands to complete for a max of 10 seconds
timeout = time() + 10.0
while queue and time() < timeout:
From cd5ffcb0edddcfac85b91ef5bf82fb7bfe57aae9 Mon Sep 17 00:00:00 2001
From: Tim Pillinger <26465611+wxtim@users.noreply.github.com>
Date: Wed, 12 Jun 2024 10:40:52 +0100
Subject: [PATCH 13/13] Update cylc/flow/task_remote_mgr.py
Co-authored-by: Oliver Sanders
---
cylc/flow/task_remote_mgr.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/cylc/flow/task_remote_mgr.py b/cylc/flow/task_remote_mgr.py
index 2ac9fd7c61d..5d7c092336b 100644
--- a/cylc/flow/task_remote_mgr.py
+++ b/cylc/flow/task_remote_mgr.py
@@ -389,7 +389,7 @@ def remote_tidy(self) -> None:
LOG.error(
NoPlatformsError(
install_target,
- Set(),
+ set(),
'install target',
'remote tidy'))
# Wait for commands to complete for a max of 10 seconds