Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data-store DB job load #4891

Merged
merged 2 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ in `global.cylc[install]source dirs`.

### Fixes

[#4891](https://github.com/cylc/cylc-flow/pull/4891) - Fix bug that could cause
past jobs to be omitted in the UI.

[#4860](https://github.com/cylc/cylc-flow/pull/4860) - Workflow config parsing
will fail if
[owner setting](https://cylc.github.io/cylc-doc/latest/html/reference/config/workflow.html#flow.cylc[runtime][%3Cnamespace%3E][remote]owner)
Expand Down Expand Up @@ -94,6 +97,8 @@ option for `cylc install` (the functionality has been merged into the
workflow source argument), and rename the `--flow-name` option to
`--workflow-name`.

### Fixes

[#4873](https://github.com/cylc/cylc-flow/pull/4873) - `cylc show`: don't
show prerequisites of past tasks recalled from the DB as unsatisfied.

Expand Down
35 changes: 31 additions & 4 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_state import (
TASK_STATUS_WAITING,
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_RUNNING,
Expand Down Expand Up @@ -149,6 +150,7 @@
DELTA_FIELDS = {DELTA_ADDED, DELTA_UPDATED, DELTA_PRUNED}

JOB_STATUSES_ALL = [
TASK_STATUS_PREPARING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_RUNNING,
Expand Down Expand Up @@ -1191,10 +1193,35 @@ def insert_db_job(self, row_idx, row):
"""Load job element from DB post restart."""
if row_idx == 0:
LOG.info("LOADING job data")
(point_string, name, status, submit_num, time_submit, time_run,
time_run_exit, job_runner_name, job_id, platform_name) = row
if status not in JOB_STATUS_SET:
return
(
point_string,
name,
submit_num,
time_submit,
submit_status,
time_run,
time_run_exit,
run_status,
job_runner_name,
job_id,
platform_name
) = row

if run_status is not None:
if run_status == 0:
status = TASK_STATUS_SUCCEEDED
else:
status = TASK_STATUS_FAILED
elif time_run is not None:
status = TASK_STATUS_RUNNING
elif submit_status is not None:
if submit_status == 0:
status = TASK_STATUS_SUBMITTED
else:
status = TASK_STATUS_SUBMIT_FAILED
else:
status = TASK_STATUS_PREPARING

tp_id, tproxy = self.store_node_fetcher(name, point_string)
if not tproxy:
return
Expand Down
23 changes: 8 additions & 15 deletions cylc/flow/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,19 +639,18 @@ def select_task_job(self, cycle, name, submit_num=None):
def select_jobs_for_restart(self, callback):
"""Select from task_pool+task_states+task_jobs for restart.

Invoke callback(row_idx, row) on each row, where each row contains:
[cycle, name, status, submit_num, time_submit, time_run,
time_run_exit, job_runner_name, job_id, platform_name]
Invoke callback(row_idx, row) on each row of the result.
"""
form_stmt = r"""
SELECT
%(task_pool)s.cycle,
%(task_pool)s.name,
%(task_pool)s.status,
%(task_states)s.submit_num,
%(task_jobs)s.submit_num,
%(task_jobs)s.time_submit,
%(task_jobs)s.submit_status,
%(task_jobs)s.time_run,
%(task_jobs)s.time_run_exit,
%(task_jobs)s.run_status,
%(task_jobs)s.job_runner_name,
dwsutherland marked this conversation as resolved.
Show resolved Hide resolved
%(task_jobs)s.job_id,
%(task_jobs)s.platform_name
Expand All @@ -661,15 +660,9 @@ def select_jobs_for_restart(self, callback):
%(task_pool)s
ON %(task_jobs)s.cycle == %(task_pool)s.cycle AND
%(task_jobs)s.name == %(task_pool)s.name
JOIN
%(task_states)s
ON %(task_jobs)s.cycle == %(task_states)s.cycle AND
%(task_jobs)s.name == %(task_states)s.name AND
%(task_jobs)s.submit_num == %(task_states)s.submit_num
"""
form_data = {
"task_pool": self.TABLE_TASK_POOL,
"task_states": self.TABLE_TASK_STATES,
"task_jobs": self.TABLE_TASK_JOBS,
}
stmt = form_stmt % form_data
Expand Down Expand Up @@ -970,11 +963,12 @@ def select_jobs_for_datastore(
SELECT
%(task_states)s.cycle,
%(task_states)s.name,
%(task_states)s.status,
%(task_states)s.submit_num,
%(task_jobs)s.submit_num,
%(task_jobs)s.time_submit,
%(task_jobs)s.submit_status,
%(task_jobs)s.time_run,
%(task_jobs)s.time_run_exit,
%(task_jobs)s.run_status,
%(task_jobs)s.job_runner_name,
%(task_jobs)s.job_id,
%(task_jobs)s.platform_name
Expand All @@ -983,8 +977,7 @@ def select_jobs_for_datastore(
JOIN
%(task_states)s
ON %(task_jobs)s.cycle == %(task_states)s.cycle AND
%(task_jobs)s.name == %(task_states)s.name AND
%(task_jobs)s.submit_num == %(task_states)s.submit_num
%(task_jobs)s.name == %(task_states)s.name
WHERE
%(task_states)s.cycle || '/' || %(task_states)s.name IN (
%(task_ids)s
Expand Down
57 changes: 53 additions & 4 deletions tests/functional/restart/03-retrying.t
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#-------------------------------------------------------------------------------
# Test restarting with a task waiting to retry (was retrying state).
. "$(dirname "$0")/test_header"
set_test_number 5
set_test_number 8
init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[scheduler]
[[events]]
Expand All @@ -32,14 +32,17 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__'
[[t1]]
script = """
cylc__job__wait_cylc_message_started
if ((CYLC_TASK_TRY_NUMBER == 1)); then
if ((CYLC_TASK_TRY_NUMBER < 3)); then
exit 1
elif ((CYLC_TASK_TRY_NUMBER == 3)); then
cylc stop "${CYLC_WORKFLOW_ID}"
exit 1
fi
"""
[[[job]]]
execution retry delays = PT0S
execution retry delays = 3*PT0S
__FLOW_CONFIG__

#-------------------------------------------------------------------------------
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
Expand All @@ -48,8 +51,54 @@ sqlite3 "${WORKFLOW_RUN_DIR}/log/db" 'SELECT cycle, name, status FROM task_pool'
cmp_ok 'sqlite3.out' <<'__DB_DUMP__'
1|t1|waiting
__DB_DUMP__
workflow_run_ok "${TEST_NAME_BASE}-restart" \


workflow_run_ok "${TEST_NAME_BASE}-restart-pause" \
cylc play --debug --pause "${WORKFLOW_NAME}"

# query jobs
TEST_NAME="${TEST_NAME_BASE}-jobs-query"

read -r -d '' jobsQuery <<_args_
{
"request_string": "
query {
jobs (sort: {keys: [\"submitNum\"]}) {
state
submitNum
}
}",
"variables": null
}
_args_

run_graphql_ok "${TEST_NAME}" "${WORKFLOW_NAME}" "${jobsQuery}"

cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-jobs-query.stdout" << __HERE__
{
"jobs": [
{
"state": "failed",
"submitNum": 1
},
{
"state": "failed",
"submitNum": 2
},
{
"state": "failed",
"submitNum": 3
}
]
}
__HERE__

# stop workflow
cylc stop --max-polls=10 --interval=2 "${WORKFLOW_NAME}"

workflow_run_ok "${TEST_NAME_BASE}-restart-resume" \
cylc play --debug --no-detach "${WORKFLOW_NAME}"

sqlite3 "${WORKFLOW_RUN_DIR}/log/db" 'SELECT * FROM task_pool' >'sqlite3.out'
cmp_ok 'sqlite3.out' </dev/null
#-------------------------------------------------------------------------------
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ def job_db_row():
return [
'1',
'foo',
'running',
4,
'2020-04-03T13:40:18+13:00',
0,
'2020-04-03T13:40:20+13:00',
'2020-04-03T13:40:30+13:00',
None,
None,
'background',
'20542',
'localhost',
Expand Down