Skip to content

Commit

Permalink
feat(task-manager): added scheduleIfNotExists api
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Nov 11, 2019
1 parent cbad2be commit 090e094
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 10 deletions.
1 change: 1 addition & 0 deletions x-pack/legacy/plugins/task_manager/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ describe('Task Manager Plugin', () => {
"registerTaskDefinitions": [Function],
"remove": [Function],
"schedule": [Function],
"scheduleIfNotExists": [Function],
}
`);
});
Expand Down
2 changes: 2 additions & 0 deletions x-pack/legacy/plugins/task_manager/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface PluginSetupContract {
fetch: TaskManager['fetch'];
remove: TaskManager['remove'];
schedule: TaskManager['schedule'];
scheduleIfNotExists: TaskManager['scheduleIfNotExists'];
addMiddleware: TaskManager['addMiddleware'];
registerTaskDefinitions: TaskManager['registerTaskDefinitions'];
}
Expand Down Expand Up @@ -59,6 +60,7 @@ export class Plugin {
fetch: (...args) => taskManager.fetch(...args),
remove: (...args) => taskManager.remove(...args),
schedule: (...args) => taskManager.schedule(...args),
scheduleIfNotExists: (...args) => taskManager.scheduleIfNotExists(...args),
addMiddleware: (...args) => taskManager.addMiddleware(...args),
registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args),
};
Expand Down
79 changes: 79 additions & 0 deletions x-pack/legacy/plugins/task_manager/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,85 @@ describe('TaskManager', () => {
expect(savedObjectsClient.create).toHaveBeenCalled();
});

test('allows scheduling existing tasks that may have already been scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 409,
});

client.start();

const result = await client.scheduleIfNotExists({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
});

expect(result.id).toEqual('my-foo-id');
});

test('doesnt ignore failure to scheduling existing tasks for reasons other than already being scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 500,
});

client.start();

return expect(
client.scheduleIfNotExists({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 500,
});
});

test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
const client = new TaskManager(taskManagerOpts);
client.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
savedObjectsClient.create.mockRejectedValueOnce({
statusCode: 409,
});

client.start();

return expect(
client.schedule({
id: 'my-foo-id',
taskType: 'foo',
params: {},
state: {},
})
).rejects.toMatchObject({
statusCode: 409,
});
});

test('allows and queues removing tasks before starting', async () => {
const client = new TaskManager(taskManagerOpts);
savedObjectsClient.delete.mockResolvedValueOnce({});
Expand Down
23 changes: 23 additions & 0 deletions x-pack/legacy/plugins/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
TaskDefinition,
TaskDictionary,
ConcreteTaskInstance,
ExistingTaskInstance,
RunContext,
TaskInstance,
} from './task';
Expand All @@ -29,6 +30,8 @@ import {
} from './task_store';
import { identifyEsError } from './lib/identify_es_error';

const VERSION_CONFLICT_STATUS = 409;

export interface TaskManagerOpts {
logger: Logger;
config: any;
Expand Down Expand Up @@ -219,6 +222,26 @@ export class TaskManager {
return result;
}

/**
* Schedules a task with an Id
*
* @param task - The task being scheduled.
* @returns {Promise<ConcreteTaskInstance>}
*/
public async scheduleIfNotExists(
taskInstance: ExistingTaskInstance,
options?: any
): Promise<ExistingTaskInstance> {
try {
return await this.schedule(taskInstance, options);
} catch (err) {
if (err.statusCode === VERSION_CONFLICT_STATUS) {
return taskInstance;
}
throw err;
}
}

/**
* Fetches a paginatable list of scheduled tasks.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,34 @@ export function initRoutes(server) {
config: {
validate: {
payload: Joi.object({
taskType: Joi.string().required(),
interval: Joi.string().optional(),
params: Joi.object().required(),
state: Joi.object().optional(),
id: Joi.string().optional(),
task: Joi.object({
taskType: Joi.string().required(),
interval: Joi.string().optional(),
params: Joi.object().required(),
state: Joi.object().optional(),
id: Joi.string().optional()
}),
scheduleIfNotExists: Joi.boolean()
.default(false)
.optional(),
}),
},
},
async handler(request) {
try {
const task = await taskManager.schedule({
...request.payload,
const { scheduleIfNotExists = false, task: taskFields } = request.payload;
const task = {
...taskFields,
scope: [scope],
}, { request });
return task;
};

const taskResult = await (
scheduleIfNotExists
? taskManager.scheduleIfNotExists(task, { request })
: taskManager.schedule(task, { request })
);

return taskResult;
} catch (err) {
return err;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ export default function ({ getService }) {
function scheduleTask(task) {
return supertest.post('/api/sample_tasks')
.set('kbn-xsrf', 'xxx')
.send(task)
.send({ task })
.expect(200)
.then((response) => response.body);
}

function scheduleTaskIfNotExists(task) {
return supertest.post('/api/sample_tasks')
.set('kbn-xsrf', 'xxx')
.send({ task, scheduleIfNotExists: true })
.expect(200)
.then((response) => response.body);
}
Expand Down Expand Up @@ -116,6 +124,24 @@ export default function ({ getService }) {
expect(result.id).to.be('test-task-for-sample-task-plugin-to-test-task-manager');
});

it('should allow a task with a given ID to be scheduled multiple times', async () => {
const result = await scheduleTaskIfNotExists({
id: 'test-task-to-reschedule-in-task-manager',
taskType: 'sampleTask',
params: { },
});

expect(result.id).to.be('test-task-to-reschedule-in-task-manager');

const rescheduleResult = await scheduleTaskIfNotExists({
id: 'test-task-to-reschedule-in-task-manager',
taskType: 'sampleTask',
params: { },
});

expect(rescheduleResult.id).to.be('test-task-to-reschedule-in-task-manager');
});

it('should reschedule if task errors', async () => {
const task = await scheduleTask({
taskType: 'sampleTask',
Expand Down

0 comments on commit 090e094

Please sign in to comment.