From 7a06a524ff5bc96236caeaf09d93bc6b78c7c146 Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Fri, 20 May 2022 15:24:39 +1200 Subject: [PATCH 1/2] Fix data-store DB job load --- cylc/flow/data_store_mgr.py | 35 +++++++++++++-- cylc/flow/rundb.py | 23 ++++------ tests/functional/restart/03-retrying.t | 57 ++++++++++++++++++++++-- tests/integration/test_data_store_mgr.py | 5 ++- 4 files changed, 95 insertions(+), 25 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 8c677ff8461..cd9ad940a0a 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -77,6 +77,7 @@ from cylc.flow.task_proxy import TaskProxy from cylc.flow.task_state import ( TASK_STATUS_WAITING, + TASK_STATUS_PREPARING, TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_RUNNING, @@ -149,6 +150,7 @@ DELTA_FIELDS = {DELTA_ADDED, DELTA_UPDATED, DELTA_PRUNED} JOB_STATUSES_ALL = [ + TASK_STATUS_PREPARING, TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_FAILED, TASK_STATUS_RUNNING, @@ -1191,10 +1193,35 @@ def insert_db_job(self, row_idx, row): """Load job element from DB post restart.""" if row_idx == 0: LOG.info("LOADING job data") - (point_string, name, status, submit_num, time_submit, time_run, - time_run_exit, job_runner_name, job_id, platform_name) = row - if status not in JOB_STATUS_SET: - return + ( + point_string, + name, + submit_num, + time_submit, + submit_status, + time_run, + time_run_exit, + run_status, + job_runner_name, + job_id, + platform_name + ) = row + + if run_status is not None: + if run_status == 0: + status = TASK_STATUS_SUCCEEDED + else: + status = TASK_STATUS_FAILED + elif time_run is not None: + status = TASK_STATUS_RUNNING + elif submit_status is not None: + if submit_status == 0: + status = TASK_STATUS_SUBMITTED + else: + status = TASK_STATUS_SUBMIT_FAILED + else: + status = TASK_STATUS_PREPARING + tp_id, tproxy = self.store_node_fetcher(name, point_string) if not tproxy: return diff --git a/cylc/flow/rundb.py b/cylc/flow/rundb.py index 2b4b3db2774..c295bb9c6fa 100644 --- a/cylc/flow/rundb.py +++ b/cylc/flow/rundb.py @@ -639,19 +639,18 @@ def select_task_job(self, cycle, name, submit_num=None): def select_jobs_for_restart(self, callback): """Select from task_pool+task_states+task_jobs for restart. - Invoke callback(row_idx, row) on each row, where each row contains: - [cycle, name, status, submit_num, time_submit, time_run, - time_run_exit, job_runner_name, job_id, platform_name] + Invoke callback(row_idx, row) on each row of the result. """ form_stmt = r""" SELECT %(task_pool)s.cycle, %(task_pool)s.name, - %(task_pool)s.status, - %(task_states)s.submit_num, + %(task_jobs)s.submit_num, %(task_jobs)s.time_submit, + %(task_jobs)s.submit_status, %(task_jobs)s.time_run, %(task_jobs)s.time_run_exit, + %(task_jobs)s.run_status, %(task_jobs)s.job_runner_name, %(task_jobs)s.job_id, %(task_jobs)s.platform_name @@ -661,15 +660,9 @@ def select_jobs_for_restart(self, callback): %(task_pool)s ON %(task_jobs)s.cycle == %(task_pool)s.cycle AND %(task_jobs)s.name == %(task_pool)s.name - JOIN - %(task_states)s - ON %(task_jobs)s.cycle == %(task_states)s.cycle AND - %(task_jobs)s.name == %(task_states)s.name AND - %(task_jobs)s.submit_num == %(task_states)s.submit_num """ form_data = { "task_pool": self.TABLE_TASK_POOL, - "task_states": self.TABLE_TASK_STATES, "task_jobs": self.TABLE_TASK_JOBS, } stmt = form_stmt % form_data @@ -970,11 +963,12 @@ def select_jobs_for_datastore( SELECT %(task_states)s.cycle, %(task_states)s.name, - %(task_states)s.status, - %(task_states)s.submit_num, + %(task_jobs)s.submit_num, %(task_jobs)s.time_submit, + %(task_jobs)s.submit_status, %(task_jobs)s.time_run, %(task_jobs)s.time_run_exit, + %(task_jobs)s.run_status, %(task_jobs)s.job_runner_name, %(task_jobs)s.job_id, %(task_jobs)s.platform_name @@ -983,8 +977,7 @@ def select_jobs_for_datastore( JOIN %(task_states)s ON %(task_jobs)s.cycle == %(task_states)s.cycle AND - %(task_jobs)s.name == %(task_states)s.name AND - %(task_jobs)s.submit_num == %(task_states)s.submit_num + %(task_jobs)s.name == %(task_states)s.name WHERE %(task_states)s.cycle || '/' || %(task_states)s.name IN ( %(task_ids)s diff --git a/tests/functional/restart/03-retrying.t b/tests/functional/restart/03-retrying.t index dec8aed7794..16944ed8a5f 100755 --- a/tests/functional/restart/03-retrying.t +++ b/tests/functional/restart/03-retrying.t @@ -17,7 +17,7 @@ #------------------------------------------------------------------------------- # Test restarting with a task waiting to retry (was retrying state). . "$(dirname "$0")/test_header" -set_test_number 5 +set_test_number 8 init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' [scheduler] [[events]] @@ -32,14 +32,17 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' [[t1]] script = """ cylc__job__wait_cylc_message_started - if ((CYLC_TASK_TRY_NUMBER == 1)); then + if ((CYLC_TASK_TRY_NUMBER < 3)); then + exit 1 + elif ((CYLC_TASK_TRY_NUMBER == 3)); then cylc stop "${CYLC_WORKFLOW_ID}" exit 1 fi """ [[[job]]] - execution retry delays = PT0S + execution retry delays = 3*PT0S __FLOW_CONFIG__ + #------------------------------------------------------------------------------- run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" workflow_run_ok "${TEST_NAME_BASE}-run" \ @@ -48,8 +51,54 @@ sqlite3 "${WORKFLOW_RUN_DIR}/log/db" 'SELECT cycle, name, status FROM task_pool' cmp_ok 'sqlite3.out' <<'__DB_DUMP__' 1|t1|waiting __DB_DUMP__ -workflow_run_ok "${TEST_NAME_BASE}-restart" \ + + +workflow_run_ok "${TEST_NAME_BASE}-restart-pause" \ + cylc play --debug --pause "${WORKFLOW_NAME}" + +# query jobs +TEST_NAME="${TEST_NAME_BASE}-jobs-query" + +read -r -d '' jobsQuery <<_args_ +{ + "request_string": " +query { + jobs (sort: {keys: [\"submitNum\"]}) { + state + submitNum + } +}", + "variables": null +} +_args_ + +run_graphql_ok "${TEST_NAME}" "${WORKFLOW_NAME}" "${jobsQuery}" + +cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-jobs-query.stdout" << __HERE__ +{ + "jobs": [ + { + "state": "failed", + "submitNum": 1 + }, + { + "state": "failed", + "submitNum": 2 + }, + { + "state": "failed", + "submitNum": 3 + } + ] +} +__HERE__ + +# stop workflow +cylc stop --max-polls=10 --interval=2 "${WORKFLOW_NAME}" + +workflow_run_ok "${TEST_NAME_BASE}-restart-resume" \ cylc play --debug --no-detach "${WORKFLOW_NAME}" + sqlite3 "${WORKFLOW_RUN_DIR}/log/db" 'SELECT * FROM task_pool' >'sqlite3.out' cmp_ok 'sqlite3.out' Date: Fri, 20 May 2022 16:13:03 +1200 Subject: [PATCH 2/2] Update changelog --- CHANGES.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 6b87f639ff5..fbf0ce00c96 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -46,6 +46,9 @@ in `global.cylc[install]source dirs`. ### Fixes +[#4891](https://github.com/cylc/cylc-flow/pull/4891) - Fix bug that could cause +past jobs to be omitted in the UI. + [#4860](https://github.com/cylc/cylc-flow/pull/4860) - Workflow config parsing will fail if [owner setting](https://cylc.github.io/cylc-doc/latest/html/reference/config/workflow.html#flow.cylc[runtime][%3Cnamespace%3E][remote]owner) @@ -94,6 +97,8 @@ option for `cylc install` (the functionality has been merged into the workflow source argument), and rename the `--flow-name` option to `--workflow-name`. +### Fixes + [#4873](https://github.com/cylc/cylc-flow/pull/4873) - `cylc show`: don't show prerequisites of past tasks recalled from the DB as unsatisfied.