diff --git a/x-pack/legacy/plugins/task_manager/plugin.test.ts b/x-pack/legacy/plugins/task_manager/plugin.test.ts index f8ca6bd7a9ab3..bb77dc2fe6e29 100644 --- a/x-pack/legacy/plugins/task_manager/plugin.test.ts +++ b/x-pack/legacy/plugins/task_manager/plugin.test.ts @@ -46,6 +46,7 @@ describe('Task Manager Plugin', () => { "registerTaskDefinitions": [Function], "remove": [Function], "schedule": [Function], + "scheduleIfNotExists": [Function], } `); }); diff --git a/x-pack/legacy/plugins/task_manager/plugin.ts b/x-pack/legacy/plugins/task_manager/plugin.ts index f8d95f4880c6e..61566b1de5181 100644 --- a/x-pack/legacy/plugins/task_manager/plugin.ts +++ b/x-pack/legacy/plugins/task_manager/plugin.ts @@ -11,6 +11,7 @@ export interface PluginSetupContract { fetch: TaskManager['fetch']; remove: TaskManager['remove']; schedule: TaskManager['schedule']; + scheduleIfNotExists: TaskManager['scheduleIfNotExists']; addMiddleware: TaskManager['addMiddleware']; registerTaskDefinitions: TaskManager['registerTaskDefinitions']; } @@ -59,6 +60,7 @@ export class Plugin { fetch: (...args) => taskManager.fetch(...args), remove: (...args) => taskManager.remove(...args), schedule: (...args) => taskManager.schedule(...args), + scheduleIfNotExists: (...args) => taskManager.scheduleIfNotExists(...args), addMiddleware: (...args) => taskManager.addMiddleware(...args), registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args), }; diff --git a/x-pack/legacy/plugins/task_manager/task_manager.test.ts b/x-pack/legacy/plugins/task_manager/task_manager.test.ts index 9ae2f5e1e027b..80e64a16b2975 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.test.ts @@ -121,6 +121,85 @@ describe('TaskManager', () => { expect(savedObjectsClient.create).toHaveBeenCalled(); }); + test('allows scheduling existing tasks that may have already been scheduled', async () => { + const client = new TaskManager(taskManagerOpts); + client.registerTaskDefinitions({ + foo: { + type: 'foo', + title: 'Foo', + createTaskRunner: jest.fn(), + }, + }); + savedObjectsClient.create.mockRejectedValueOnce({ + statusCode: 409, + }); + + client.start(); + + const result = await client.scheduleIfNotExists({ + id: 'my-foo-id', + taskType: 'foo', + params: {}, + state: {}, + }); + + expect(result.id).toEqual('my-foo-id'); + }); + + test('doesnt ignore failure to scheduling existing tasks for reasons other than already being scheduled', async () => { + const client = new TaskManager(taskManagerOpts); + client.registerTaskDefinitions({ + foo: { + type: 'foo', + title: 'Foo', + createTaskRunner: jest.fn(), + }, + }); + savedObjectsClient.create.mockRejectedValueOnce({ + statusCode: 500, + }); + + client.start(); + + return expect( + client.scheduleIfNotExists({ + id: 'my-foo-id', + taskType: 'foo', + params: {}, + state: {}, + }) + ).rejects.toMatchObject({ + statusCode: 500, + }); + }); + + test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => { + const client = new TaskManager(taskManagerOpts); + client.registerTaskDefinitions({ + foo: { + type: 'foo', + title: 'Foo', + createTaskRunner: jest.fn(), + }, + }); + savedObjectsClient.create.mockRejectedValueOnce({ + statusCode: 409, + }); + + client.start(); + + return expect( + client.schedule({ + id: 'my-foo-id', + taskType: 'foo', + params: {}, + state: {}, + }) + ).rejects.toMatchObject({ + statusCode: 409, + }); + }); + test('allows and queues removing tasks before starting', async () => { const client = new TaskManager(taskManagerOpts); savedObjectsClient.delete.mockResolvedValueOnce({}); diff --git a/x-pack/legacy/plugins/task_manager/task_manager.ts b/x-pack/legacy/plugins/task_manager/task_manager.ts index 4ddb18c7cfe74..2f2becd202d74 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.ts @@ -14,6 +14,7 @@ import { TaskDefinition, TaskDictionary, ConcreteTaskInstance, + ExistingTaskInstance, RunContext, TaskInstance, } from './task'; @@ -29,6 +30,8 @@ import { } from './task_store'; import { identifyEsError } from './lib/identify_es_error'; +const VERSION_CONFLICT_STATUS = 409; + export interface TaskManagerOpts { logger: Logger; config: any; @@ -219,6 +222,26 @@ export class TaskManager { return result; } + /** + * Schedules a task with an Id + * + * @param task - The task being scheduled. + * @returns {Promise} + */ + public async scheduleIfNotExists( + taskInstance: ExistingTaskInstance, + options?: any + ): Promise { + try { + return await this.schedule(taskInstance, options); + } catch (err) { + if (err.statusCode === VERSION_CONFLICT_STATUS) { + return taskInstance; + } + throw err; + } + } + /** * Fetches a paginatable list of scheduled tasks. * diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js index ad06fb15fd9ae..07d0c8391fce6 100644 --- a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js @@ -32,21 +32,34 @@ export function initRoutes(server) { config: { validate: { payload: Joi.object({ - taskType: Joi.string().required(), - interval: Joi.string().optional(), - params: Joi.object().required(), - state: Joi.object().optional(), - id: Joi.string().optional(), + task: Joi.object({ + taskType: Joi.string().required(), + interval: Joi.string().optional(), + params: Joi.object().required(), + state: Joi.object().optional(), + id: Joi.string().optional() + }), + scheduleIfNotExists: Joi.boolean() + .default(false) + .optional(), }), }, }, async handler(request) { try { - const task = await taskManager.schedule({ - ...request.payload, + const { scheduleIfNotExists = false, task: taskFields } = request.payload; + const task = { + ...taskFields, scope: [scope], - }, { request }); - return task; + }; + + const taskResult = await ( + scheduleIfNotExists + ? taskManager.scheduleIfNotExists(task, { request }) + : taskManager.schedule(task, { request }) + ); + + return taskResult; } catch (err) { return err; } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index 30d830cd6c919..ef21dd2786294 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -60,7 +60,15 @@ export default function ({ getService }) { function scheduleTask(task) { return supertest.post('/api/sample_tasks') .set('kbn-xsrf', 'xxx') - .send(task) + .send({ task }) + .expect(200) + .then((response) => response.body); + } + + function scheduleTaskIfNotExists(task) { + return supertest.post('/api/sample_tasks') + .set('kbn-xsrf', 'xxx') + .send({ task, scheduleIfNotExists: true }) .expect(200) .then((response) => response.body); } @@ -116,6 +124,24 @@ export default function ({ getService }) { expect(result.id).to.be('test-task-for-sample-task-plugin-to-test-task-manager'); }); + it('should allow a task with a given ID to be scheduled multiple times', async () => { + const result = await scheduleTaskIfNotExists({ + id: 'test-task-to-reschedule-in-task-manager', + taskType: 'sampleTask', + params: { }, + }); + + expect(result.id).to.be('test-task-to-reschedule-in-task-manager'); + + const rescheduleResult = await scheduleTaskIfNotExists({ + id: 'test-task-to-reschedule-in-task-manager', + taskType: 'sampleTask', + params: { }, + }); + + expect(rescheduleResult.id).to.be('test-task-to-reschedule-in-task-manager'); + }); + it('should reschedule if task errors', async () => { const task = await scheduleTask({ taskType: 'sampleTask',