Skip to content

Commit

Permalink
Updating query to only fail one time tasks that have exceeded max att…
Browse files Browse the repository at this point in the history
…empts
  • Loading branch information
ymao1 committed Oct 20, 2020
1 parent 6445a72 commit 877319f
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,12 @@
*/

import _ from 'lodash';
import {
asUpdateByQuery,
shouldBeOneOf,
mustBeAllOf,
ExistsFilter,
TermFilter,
RangeFilter,
} from './query_clauses';
import { asUpdateByQuery, shouldBeOneOf, mustBeAllOf } from './query_clauses';

import {
updateFields,
updateFieldsAndMarkAsFailed,
IdleTaskWithExpiredRunAt,
RunningOrClaimingTaskWithExpiredRetryAt,
TaskWithSchedule,
taskWithLessThanMaxAttempts,
SortByRunAtAndRetryAt,
} from './mark_available_tasks_as_claimed';

Expand All @@ -43,26 +34,24 @@ describe('mark_available_tasks_as_claimed', () => {
const defaultMaxAttempts = 1;
const taskManagerId = '3478fg6-82374f6-83467gf5-384g6f';
const claimOwnershipUntil = '2019-02-12T21:01:22.479Z';
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};

expect(
asUpdateByQuery({
query: mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt),
// Either task has an schedule or the attempts < the maximum configured
shouldBeOneOf<ExistsFilter | TermFilter | RangeFilter>(
TaskWithSchedule,
...Array.from(definitions).map(([type, { maxAttempts }]) =>
taskWithLessThanMaxAttempts(type, maxAttempts || defaultMaxAttempts)
)
)
shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt)
),
update: updateFieldsAndMarkAsFailed(
fieldUpdates,
Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => {
return { ...accumulator, [type]: maxAttempts || defaultMaxAttempts };
}, {})
),
update: updateFields({
ownerId: taskManagerId,
status: 'claiming',
retryAt: claimOwnershipUntil,
}),
sort: SortByRunAtAndRetryAt,
})
).toEqual({
Expand Down Expand Up @@ -100,42 +89,6 @@ describe('mark_available_tasks_as_claimed', () => {
],
},
},
// Either task has an recurring schedule or the attempts < the maximum configured
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'sampleTask' } },
{
range: {
'task.attempts': {
lt: 5,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'otherTask' } },
{
range: {
'task.attempts': {
lt: 1,
},
},
},
],
},
},
],
},
},
],
},
},
Expand All @@ -158,12 +111,25 @@ if (doc['task.runAt'].size()!=0) {
},
seq_no_primary_term: true,
script: {
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
fieldUpdates: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
},
taskMaxAttempts: {
sampleTask: 5,
otherTask: 1,
},
},
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export const updateFieldsAndMarkAsFailed = (
taskMaxAttempts: { [field: string]: number }
): ScriptClause => ({
source: `
if (params.taskMaxAttempts[ctx._source.task.taskType] == null || (params.taskMaxAttempts[ctx._source.task.taskType] != null && ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType])) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
Expand Down
93 changes: 19 additions & 74 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,41 +415,6 @@ describe('TaskStore', () => {
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
Expand Down Expand Up @@ -557,41 +522,6 @@ describe('TaskStore', () => {
],
},
},
{
bool: {
should: [
{ exists: { field: 'task.schedule' } },
{
bool: {
must: [
{ term: { 'task.taskType': 'foo' } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
},
{
bool: {
must: [
{ term: { 'task.taskType': 'bar' } },
{
range: {
'task.attempts': {
lt: customMaxAttempts,
},
},
},
],
},
},
],
},
},
],
},
},
Expand Down Expand Up @@ -646,6 +576,10 @@ if (doc['task.runAt'].size()!=0) {
test('it claims tasks by setting their ownerId, status and retryAt', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const fieldUpdates = {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
};
const {
args: {
updateByQuery: { body: { script } = {} },
Expand All @@ -660,12 +594,23 @@ if (doc['task.runAt'].size()!=0) {
},
});
expect(script).toMatchObject({
source: `ctx._source.task.ownerId=params.ownerId; ctx._source.task.status=params.status; ctx._source.task.retryAt=params.retryAt;`,
source: `
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
`,
lang: 'painless',
params: {
ownerId: taskManagerId,
retryAt: claimOwnershipUntil,
status: 'claiming',
fieldUpdates,
taskMaxAttempts: {
dernstraight: 2,
report: 2,
yawn: 2,
},
},
});
});
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ export class TaskStore {
size: OwnershipClaimingOpts['size']
): Promise<number> {
const taskMaxAttempts = [...this.definitions].reduce((accumulator, [type, { maxAttempts }]) => {
return { ...accumulator, [type]: maxAttempts };
return { ...accumulator, [type]: maxAttempts || this.maxAttempts };
}, {});
const queryForScheduledTasks = mustBeAllOf(
// Either a task with idle status and runAt <= now or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import _ from 'lodash';
import { Plugin, CoreSetup, CoreStart } from 'src/core/server';
import { EventEmitter } from 'events';
import { Subject } from 'rxjs';
Expand Down Expand Up @@ -103,9 +104,8 @@ export class SampleTaskManagerFixturePlugin
// fail after the first failed run
maxAttempts: 1,
},
sampleTaskTimingOut: {
type: 'sampleTaskTimingOut',
title: 'Sample Task that Times Out',
sampleRecurringTaskTimingOut: {
title: 'Sample Recurring Task that Times Out',
description: 'A sample task that times out each run.',
maxAttempts: 3,
timeout: '1s',
Expand All @@ -115,6 +115,18 @@ export class SampleTaskManagerFixturePlugin
},
}),
},
sampleOneTimeTaskTimingOut: {
title: 'Sample One-Time Task that Times Out',
description: 'A sample task that times out each run.',
maxAttempts: 3,
timeout: '1s',
getRetry: (attempts: number, error: object) => new Date(Date.now() + _.random(2, 5) * 1000),
createTaskRunner: () => ({
async run() {
return await new Promise((resolve) => {});
},
}),
},
});

taskManager.addMiddleware({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,10 +608,9 @@ export default function ({ getService }) {
});
});

it('should mark task as failed if task is still running but maxAttempts has been reached', async () => {
it('should mark non-recurring task as failed if task is still running but maxAttempts has been reached', async () => {
const task = await scheduleTask({
taskType: 'sampleTaskTimingOut',
schedule: { interval: '1s' },
taskType: 'sampleRecurringTaskTimingOut',
params: {},
});

Expand All @@ -622,5 +621,20 @@ export default function ({ getService }) {
expect(scheduledTask.attempts).to.eql(3);
});
});

it('should continue claiming recurring task even if maxAttempts has been reached', async () => {
const task = await scheduleTask({
taskType: 'sampleTaskTimingOut',
schedule: { interval: '1s' },
params: {},
});

await retry.try(async () => {
const [scheduledTask] = (await currentTasks()).docs;
expect(scheduledTask.id).to.eql(task.id);
expect(scheduledTask.status).to.eql('claiming');
expect(scheduledTask.attempts).to.eql(4);
});
});
});
}

0 comments on commit 877319f

Please sign in to comment.