Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
Re-queueing a task now erases its failed_by_workers list
Browse files Browse the repository at this point in the history
The 'failed_by_workers' list is also erased when the entire job is
re-queued.

Erasing the list ensures that workers that previously failed the task
can try again. This is essential when the task was failing due to
external conditions, for example failure due to a bug in Flamenco Worker
that's since been resolved.
  • Loading branch information
sybrenstuvel committed Apr 25, 2019
1 parent 982bb44 commit 4ca2b11
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 20 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ Flamenco Server Changelog
## Version 2.3 (in development)

- Support for Flamenco Manager settings version 2; this is introduced in Flamenco Manager 2.5.
- Re-queueing a task erases the `failed_by_workers` list, so that workers that previously failed
the task can try again. This is essential when the task was failing due to external conditions,m
for example failure due to a crash that was fixed by upgrading Blender.


## Version 2.2 (released 2019-03-25)
Expand Down
29 changes: 18 additions & 11 deletions flamenco/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,28 +314,34 @@ def db(self, collection_name):

def update_status(self, collection_name, document_id, new_status, *,
extra_updates: typing.Optional[dict] = None,
extra_unset: typing.Optional[typing.Set[str]] = None,
now: datetime.datetime = None):
"""Updates a document's status, avoiding Eve.
Doesn't use Eve patch_internal to avoid Eve's authorisation. For
example, Eve doesn't know certain PATCH operations are allowed by
Flamenco managers.
:param extra_updates: dictionary of extra updates for the document.
:param extra_updates: dictionary of extra updates to set on the document(s).
:param extra_unset: set of fields to unset.
:param now: the _updated field is set to this timestamp; use this to set multiple
objects to the same _updated field.
:rtype: pymongo.results.UpdateResult
"""

return self.update_status_q(collection_name, {'_id': document_id}, new_status,
extra_updates=extra_updates, now=now)
extra_updates=extra_updates,
extra_unset=extra_unset,
now=now)

def update_status_q(self, collection_name, query, new_status, *,
extra_updates: typing.Optional[dict] = None,
extra_unset: typing.Optional[typing.Set[str]] = None,
now: datetime.datetime = None):
"""Updates the status for the queried objects.
:param extra_updates: dictionary of extra updates for the document(s).
:param extra_updates: dictionary of extra updates to set on the document(s).
:param extra_unset: set of fields to unset.
:param now: the _updated field is set to this timestamp; use this to set multiple
objects to the same _updated field.
:returns: the result of the collection.update_many() call
Expand All @@ -361,15 +367,16 @@ def update_status_q(self, collection_name, query, new_status, *,
now = datetime.datetime.now(tz=tz_util.utc)

collection = current_flamenco.db(collection_name)
result = collection.update_many(
query,
{'$set': {
**(extra_updates or {}),
'status': new_status,
'_updated': now,
'_etag': etag}}
)
update = {'$set': {
**(extra_updates or {}),
'status': new_status,
'_updated': now,
'_etag': etag,
}}
if extra_unset:
update['$unset'] = {field_name: True for field_name in extra_unset}

result = collection.update_many(query, update)
self._log.debug('Updated status of %i %s %s to %s',
result.modified_count, singular_name, query, new_status)

Expand Down
3 changes: 2 additions & 1 deletion flamenco/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ def _do_requeue(self, job_id, old_status, new_status) -> str:

# Update the tasks.
query['job'] = job_id
current_flamenco.update_status_q('tasks', query, 'queued')
current_flamenco.update_status_q('tasks', query, 'queued',
extra_unset={'failed_by_workers'})
return 'queued'

def _do_check_completion(self, job_id, new_status) -> str:
Expand Down
22 changes: 22 additions & 0 deletions flamenco/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,28 @@ def tasks_for_project(self, project_id):

return tasks

def api_set_task_status(self, task: dict, new_status: str):
"""Update the task with the new status + other status-dependent changes.
Also updates the job status to reflect the new task status.
"""

extra_unset = set() # type: typing.Set[str]
if new_status == 'queued' and task['status'] != 'queued':
# The task was requeued, so clear out the 'failed_by_workers' list.
# There is a reason this task was requeued, which could include a fix
# for the reason those workers failed.
self._log.debug('Task %s was requeued, clearing out failed_by_workers', task['_id'])
extra_unset.add('failed_by_workers')

from flamenco import current_flamenco
current_flamenco.update_status('tasks', task['_id'], new_status,
extra_unset=extra_unset)

# Also inspect other tasks of the same job, and possibly update the job status as well.
current_flamenco.job_manager.update_job_after_task_status_change(
task['job'], task['_id'], new_status)

def web_set_task_status(self, task_id, new_status):
"""Web-level call to updates the task status."""
from .sdk import Task
Expand Down
10 changes: 3 additions & 7 deletions flamenco/tasks/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def patch_set_task_status(self, task_id: bson.ObjectId, patch: dict):
from pillar.api.utils.authentication import current_user_id

tasks_coll = current_flamenco.db('tasks')
task = tasks_coll.find_one({'_id': task_id}, projection={'job': 1, 'manager': 1})
task = tasks_coll.find_one({'_id': task_id},
projection={'job': 1, 'manager': 1, 'status': 1})

if not current_flamenco.manager_manager.user_may_use(mngr_doc_id=task['manager']):
log.warning('patch_set_task_status(%s, %r): User %s is not allowed to use manager %s!',
Expand All @@ -36,15 +37,10 @@ def patch_set_task_status(self, task_id: bson.ObjectId, patch: dict):

new_status = patch['status']
try:
current_flamenco.update_status('tasks', task_id, new_status)
current_flamenco.task_manager.api_set_task_status(task, new_status)
except ValueError:
raise wz_exceptions.UnprocessableEntity('Invalid status')

# also inspect other tasks of the same job, and possibly update the job status as well.
current_flamenco.job_manager.update_job_after_task_status_change(task['job'],
task_id,
new_status)

@authorization.require_login()
def patch_requeue(self, task_id: bson.ObjectId, patch: dict):
"""Re-queue a task and its successors."""
Expand Down
15 changes: 15 additions & 0 deletions tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ def test_status_from_canceled_to_queued(self):
self.assert_task_status(8, 'soft-failed') # was: soft-failed

def test_status_from_canceled_to_requeued(self):
# Add a list of failed workers to the failed task.
with self.app.app_context():
tasks_coll = self.app.db('flamenco_tasks')
tasks_coll.update_one({'_id': self.task_ids[5]},
{'$set': {
'failed_by_workers': [
{'id': 'je moeder',
'identifier': 'op je hoofd'},
],
}})

# This should re-queue all non-completed tasks.
self.force_job_status('canceled')
self.set_job_status('requeued')
Expand All @@ -218,6 +229,10 @@ def test_status_from_canceled_to_requeued(self):

self.assert_job_status('queued')

# Check that the previously-failed task had its failed_by_workers list cleared.
task5 = tasks_coll.find_one({'_id': self.task_ids[5]})
self.assertNotIn('failed_by_workers', task5)

def test_status_from_canceled_job_but_completed_tasks_to_requeued(self):
# Force the job to be cancelled with all tasks at 'completed'.
# This is not a state that should be possible, but if it happens,
Expand Down
45 changes: 44 additions & 1 deletion tests/test_task_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from bson import ObjectId

from pillar.api.utils import remove_private_keys
from pillar.tests import common_test_data as ctd
from abstract_flamenco_test import AbstractFlamencoTest

Expand All @@ -16,7 +17,8 @@ def setUp(self, **kwargs):
self.mngr_id = self.mngr_doc['_id']
self.mngr_token = token['token']

uid = self.create_user(user_id=24 * 'f', roles={'flamenco-admin'})
uid = self.create_user(user_id=24 * 'f', roles={'flamenco-admin'},
groups=[self.mngr_doc['owner'], ctd.EXAMPLE_ADMIN_GROUP_ID])
self.create_valid_auth_token(uid, 'fladmin-token')

with self.app.test_request_context():
Expand Down Expand Up @@ -116,6 +118,47 @@ def test_job_status_change_due_to_task_patch(self):
)
self.assert_job_status('queued')

def test_requeue_clears_failed_by(self):
"""The 'failed by workers' list should be emptied when a task is re-queued."""

self.assert_job_status('queued')

# The test job consists of 4 tasks; get their IDs through the scheduler.
# This should set the job status to active.
tasks = self.get('/api/flamenco/managers/%s/depsgraph' % self.mngr_id,
auth_token=self.mngr_token).json['depsgraph']
self.assertEqual(4, len(tasks))

# Mark a task as failed by a few workers.
task = {
**tasks[0],
"failed_by_workers": [
{"id": "5cc05ee49fbac13c12dee430",
"identifier": "40.68.245.202 (82f813d38f594bcc86913bfb15db1f07000001)"},
{"id": "5cc06df19fbac16ca117c40d",
"identifier": "40.68.245.202 (82f813d38f594bcc86913bfb15db1f07000003)"},
]
}
task_url = f'/api/flamenco/tasks/{task["_id"]}'
self.put(
task_url,
etag=task['_etag'],
json=remove_private_keys(task),
auth_token='fladmin-token',
expected_status=200,
)

# Re-queueing the task should clear the `failed_by_workers` list.
self.patch(
task_url,
json={'op': 'set-task-status', 'status': 'queued'},
auth_token='fladmin-token',
expected_status=204,
)

requeued_task = self.get(task_url, auth_token='fladmin-token')
self.assertNotIn('failed_by_workers', requeued_task.json)

def test_requeue_task_of_completed_job(self):
"""Re-queueing a single task of a completed job should keep the other tasks completed."""

Expand Down

0 comments on commit 4ca2b11

Please sign in to comment.