Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Mark task as failed if maxAttempts has been met. #80681

Merged
merged 14 commits into from
Oct 27, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,12 @@
*/

import _ from 'lodash';
import {
asUpdateByQuery,
shouldBeOneOf,
mustBeAllOf,
ExistsFilter,
TermFilter,
RangeFilter,
} from './query_clauses';
import { asUpdateByQuery, shouldBeOneOf, mustBeAllOf } from './query_clauses';

import {
updateFields,
updateFieldsAndMarkAsFailed,
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
TaskWithSchedule,
taskWithLessThanMaxAttempts,
SortByRunAtAndRetryAt,
} from './mark_available_tasks_as_claimed';

Expand All @@ -40,29 +31,29 @@ describe('mark_available_tasks_as_claimed', () => {
createTaskRunner: () => ({ run: () => Promise.resolve() }),
},
});
const claimTasksById = undefined;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it could be just array of strings like:
const claimTasksById: string[] = [];

const defaultMaxAttempts = 1;
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};

expect(
asUpdateByQuery({
query: mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
// Either task has an schedule or the attempts < the maximum configured
shouldBeOneOf<ExistsFilter | TermFilter | RangeFilter>(
TaskWithSchedule,
...Array.from(definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || defaultMaxAttempts)
)
)
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)
),
update: updateFieldsAndMarkAsFailed(
fieldUpdates,
claimTasksById || [],
Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => {
return { ...accumulator, [type]: maxAttempts || defaultMaxAttempts };
}, {})
),
update: updateFields({
ownerId: taskManagerId,
status: 'claiming',
retryAt: claimOwnershipUntil,
}),
sort: SortByRunAtAndRetryAt,
})
).toEqual({
Expand Down Expand Up @@ -100,42 +91,6 @@ describe('mark_available_tasks_as_claimed', () => {
],
},
},
// Either task has an recurring schedule or the attempts < the maximum configured
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'sampleTask' } },
{
range: {
'task.attempts': {
lt: 5,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'otherTask' } },
{
range: {
'task.attempts': {
lt: 1,
},
},
},
],
},
},
],
},
},
],
},
},
Expand All @@ -158,12 +113,26 @@ if (doc['task.runAt'].size()!=0) {
},
seq_no_primary_term: true,
script: {
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
fieldUpdates: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById: [],
taskMaxAttempts: {
sampleTask: 5,
otherTask: 1,
},
},
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,26 @@ if (doc['task.runAt'].size()!=0) {
},
};

export const updateFields = (fieldUpdates: {
[field: string]: string | number | Date;
}): ScriptClause => ({
source: Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.${field};`)
.join(' '),
export const updateFieldsAndMarkAsFailed = (
ymao1 marked this conversation as resolved.
Show resolved Hide resolved
fieldUpdates: {
[field: string]: string | number | Date;
},
claimTasksById: string[],
taskMaxAttempts: { [field: string]: number }
): ScriptClause => ({
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: fieldUpdates,
params: {
fieldUpdates,
claimTasksById,
taskMaxAttempts,
},
});
7 changes: 6 additions & 1 deletion x-pack/plugins/task_manager/server/queries/query_clauses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,12 @@ export interface ScriptClause {
source: string;
lang: string;
params: {
[field: string]: string | number | Date;
[field: string]:
| string
| number
| Date
| string[]
| { [field: string]: string | number | Date };
};
}

Expand Down
126 changes: 51 additions & 75 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,41 +415,6 @@ describe('TaskStore', () => {
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
Expand Down Expand Up @@ -501,6 +466,11 @@ if (doc['task.runAt'].size()!=0) {
const maxAttempts = _.random(2, 43);
const customMaxAttempts = _.random(44, 100);
const definitions = new TaskTypeDictionary(mockLogger());
const taskManagerId = uuid.v1();
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: new Date(Date.now()),
};
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
Expand All @@ -514,10 +484,11 @@ if (doc['task.runAt'].size()!=0) {
});
const {
args: {
updateByQuery: { body: { query, sort } = {} },
updateByQuery: { body: { query, script, sort } = {} },
},
} = await testClaimAvailableTasks({
opts: {
taskManagerId,
maxAttempts,
definitions,
},
Expand Down Expand Up @@ -576,41 +547,6 @@ if (doc['task.runAt'].size()!=0) {
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
Expand Down Expand Up @@ -640,6 +576,30 @@ if (doc['task.runAt'].size()!=0) {
},
});

expect(script).toMatchObject({
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: {
fieldUpdates,
claimTasksById: [
'task:33c6977a-ed6d-43bd-98d9-3f827f7b7cd8',
'task:a208b22c-14ec-4fb4-995f-d2ff7a3b03b8',
],
taskMaxAttempts: {
bar: customMaxAttempts,
foo: maxAttempts,
},
},
});

expect(sort).toMatchObject([
'_score',
{
Expand All @@ -665,6 +625,10 @@ if (doc['task.runAt'].size()!=0) {
test('it claims tasks by setting their ownerId, status and retryAt', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};
const {
args: {
updateByQuery: { body: { script } = {} },
Expand All @@ -679,12 +643,24 @@ if (doc['task.runAt'].size()!=0) {
},
});
expect(script).toMatchObject({
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType] || params.claimTasksById.contains(ctx._id)) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
fieldUpdates,
claimTasksById: [],
taskMaxAttempts: {
dernstraight: 2,
report: 2,
yawn: 2,
},
},
});
});
Expand Down
Loading