Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Mark task as failed if maxAttempts has been met. #80681

Merged
merged 14 commits into from
Oct 27, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,25 @@ export const updateFields = (fieldUpdates: {
lang: 'painless',
params: fieldUpdates,
});

export const updateFieldsAndMarkAsFailed = (
ymao1 marked this conversation as resolved.
Show resolved Hide resolved
fieldUpdates: {
[field: string]: string | number | Date;
},
taskMaxAttempts: { [field: string]: number }
): ScriptClause => ({
source: `
if (params.taskMaxAttempts[ctx._source.task.taskType] == null || (params.taskMaxAttempts[ctx._source.task.taskType] != null && ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType])) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: {
fieldUpdates,
taskMaxAttempts,
},
});
ymao1 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ export interface ScriptClause {
source: string;
lang: string;
params: {
[field: string]: string | number | Date;
[field: string]: string | number | Date | { [field: string]: string | number | Date };
};
}

Expand Down
30 changes: 11 additions & 19 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,15 @@ import {
shouldBeOneOf,
mustBeAllOf,
filterDownBy,
ExistsFilter,
TermFilter,
RangeFilter,
asPinnedQuery,
matchesClauses,
} from './queries/query_clauses';

import {
updateFields,
updateFieldsAndMarkAsFailed,
IdleTaskWithExpiredRunAt,
InactiveTasks,
RunningOrClaimingTaskWithExpiredRetryAt,
TaskWithSchedule,
taskWithLessThanMaxAttempts,
SortByRunAtAndRetryAt,
tasksClaimedByOwner,
} from './queries/mark_available_tasks_as_claimed';
Expand Down Expand Up @@ -261,14 +256,7 @@ export class TaskStore {
const queryForScheduledTasks = mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
// Either task has a schedule or the attempts < the maximum configured
shouldBeOneOf<ExistsFilter | TermFilter | RangeFilter>(
TaskWithSchedule,
...Object.entries(this.definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || this.maxAttempts)
)
)
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)
);

const apmTrans = apm.startTransaction(`taskManager markAvailableTasksAsClaimed`, 'taskManager');
Expand All @@ -282,11 +270,15 @@ export class TaskStore {
),
filterDownBy(InactiveTasks)
),
update: updateFields({
ownerId: this.taskManagerId,
status: 'claiming',
retryAt: claimOwnershipUntil,
}),
update: updateFieldsAndMarkAsFailed(
{
ownerId: this.taskManagerId,
retryAt: claimOwnershipUntil,
},
Object.entries(this.definitions).reduce((accumulator, [type, { maxAttempts }]) => {
return { ...accumulator, [type]: maxAttempts };
}, {})
),
ymao1 marked this conversation as resolved.
Show resolved Hide resolved
sort: [
// sort by score first, so the "pinned" Tasks are first
'_score',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@ export class SampleTaskManagerFixturePlugin
// fail after the first failed run
maxAttempts: 1,
},
sampleTaskTimingOut: {
type: 'sampleTaskTimingOut',
title: 'Sample Task that Times Out',
description: 'A sample task that times out each run.',
maxAttempts: 3,
timeout: '1s',
createTaskRunner: () => ({
async run() {
return await new Promise((resolve) => {});
},
}),
},
});

taskManager.addMiddleware({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,5 +607,20 @@ export default function ({ getService }) {
expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1);
});
});

it('should mark task as failed if task is still running but maxAttempts has been reached', async () => {
const task = await scheduleTask({
taskType: 'sampleTaskTimingOut',
schedule: { interval: '1s' },
params: {},
});

await retry.try(async () => {
const [scheduledTask] = (await currentTasks()).docs;
expect(scheduledTask.id).to.eql(task.id);
expect(scheduledTask.status).to.eql('failed');
expect(scheduledTask.attempts).to.eql(3);
});
});
});
}