From 64af6f65fad30685b88c411df8b19573f7725e77 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Thu, 14 Nov 2019 13:44:00 -0500 Subject: [PATCH] [Task manager] Adds ensureScheduling api to allow safer rescheduling of existing tasks (#50232) Adds an ensureScheduling api to Task Manager which allow safer rescheduling of existing tasks by handling the case where a Task with a known ID is scheduled and clashes with an existing schedule of that same task. --- .../legacy/plugins/lens/server/usage/task.ts | 2 +- .../server/maps_telemetry/telemetry_task.js | 2 +- .../legacy/plugins/oss_telemetry/index.d.ts | 2 +- .../oss_telemetry/server/lib/tasks/index.ts | 2 +- .../plugins/oss_telemetry/test_utils/index.ts | 2 +- x-pack/legacy/plugins/task_manager/README.md | 11 +++ .../plugins/task_manager/plugin.test.ts | 1 + x-pack/legacy/plugins/task_manager/plugin.ts | 2 + x-pack/legacy/plugins/task_manager/task.ts | 19 +++++ .../plugins/task_manager/task_manager.mock.ts | 1 + .../plugins/task_manager/task_manager.test.ts | 79 +++++++++++++++++++ .../plugins/task_manager/task_manager.ts | 23 ++++++ .../plugins/task_manager/init_routes.js | 31 +++++--- .../task_manager/task_manager_integration.js | 28 ++++++- 14 files changed, 190 insertions(+), 15 deletions(-) diff --git a/x-pack/legacy/plugins/lens/server/usage/task.ts b/x-pack/legacy/plugins/lens/server/usage/task.ts index 90887ae5d0c11..64ffaec9917c3 100644 --- a/x-pack/legacy/plugins/lens/server/usage/task.ts +++ b/x-pack/legacy/plugins/lens/server/usage/task.ts @@ -86,7 +86,7 @@ function scheduleTasks(server: Server) { // function block. (async () => { try { - await taskManager.schedule({ + await taskManager.ensureScheduled({ id: TASK_ID, taskType: TELEMETRY_TASK_TYPE, state: { byDate: {}, suggestionsByDate: {}, saved: {}, runs: 0 }, diff --git a/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.js b/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.js index 3702bc8e29539..78b04543e72f2 100644 --- a/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.js +++ b/x-pack/legacy/plugins/maps/server/maps_telemetry/telemetry_task.js @@ -29,7 +29,7 @@ export function scheduleTask(server) { // function block. (async () => { try { - await taskManager.schedule({ + await taskManager.ensureScheduled({ id: TASK_ID, taskType: TELEMETRY_TASK_TYPE, state: { stats: {}, runs: 0 }, diff --git a/x-pack/legacy/plugins/oss_telemetry/index.d.ts b/x-pack/legacy/plugins/oss_telemetry/index.d.ts index 9f735c676fe6d..012f987627369 100644 --- a/x-pack/legacy/plugins/oss_telemetry/index.d.ts +++ b/x-pack/legacy/plugins/oss_telemetry/index.d.ts @@ -46,7 +46,7 @@ export interface HapiServer { }; task_manager: { registerTaskDefinitions: (opts: any) => void; - schedule: (opts: any) => Promise; + ensureScheduled: (opts: any) => Promise; fetch: ( opts: any ) => Promise<{ diff --git a/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/index.ts b/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/index.ts index eaa8cc7405821..16e83a7938e60 100644 --- a/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/index.ts +++ b/x-pack/legacy/plugins/oss_telemetry/server/lib/tasks/index.ts @@ -42,7 +42,7 @@ export function scheduleTasks(server: HapiServer) { // function block. (async () => { try { - await taskManager.schedule({ + await taskManager.ensureScheduled({ id: `${PLUGIN_ID}-${VIS_TELEMETRY_TASK}`, taskType: VIS_TELEMETRY_TASK, state: { stats: {}, runs: 0 }, diff --git a/x-pack/legacy/plugins/oss_telemetry/test_utils/index.ts b/x-pack/legacy/plugins/oss_telemetry/test_utils/index.ts index 7168f598dca23..998a1d2beeab1 100644 --- a/x-pack/legacy/plugins/oss_telemetry/test_utils/index.ts +++ b/x-pack/legacy/plugins/oss_telemetry/test_utils/index.ts @@ -50,7 +50,7 @@ export const getMockKbnServer = ( xpack_main: {}, task_manager: { registerTaskDefinitions: (opts: any) => undefined, - schedule: (opts: any) => Promise.resolve(), + ensureScheduled: (opts: any) => Promise.resolve(), fetch: mockTaskFetch, }, }, diff --git a/x-pack/legacy/plugins/task_manager/README.md b/x-pack/legacy/plugins/task_manager/README.md index 63c92102af251..744643458e136 100644 --- a/x-pack/legacy/plugins/task_manager/README.md +++ b/x-pack/legacy/plugins/task_manager/README.md @@ -222,6 +222,9 @@ The data stored for a task instance looks something like this: The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected. +### schedule +Using `schedule` you can instruct TaskManger to schedule an instance of a TaskType at some point in the future. + ```js const taskManager = server.plugins.task_manager; // Schedules a task. All properties are as documented in the previous @@ -256,6 +259,14 @@ const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids'] } ``` +### ensureScheduling +When using the `schedule` api to schedule a Task you can provide a hard coded `id` on the Task. This tells TaskManager to use this `id` to identify the Task Instance rather than generate an `id` on its own. +The danger is that in such a situation, a Task with that same `id` might already have been scheduled at some earlier point, and this would result in an error. In some cases, this is the expected behavior, but often you only care about ensuring the task has been _scheduled_ and don't need it to be scheduled a fresh. + +To achieve this you should use the `ensureScheduling` api which has the exact same behavior as `schedule`, except it allows the scheduling of a Task with an `id` that's already in assigned to another Task and it will assume that the existing Task is the one you wished to `schedule`, treating this as a successful operation. + +### more options + More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time. ## Middleware diff --git a/x-pack/legacy/plugins/task_manager/plugin.test.ts b/x-pack/legacy/plugins/task_manager/plugin.test.ts index f8ca6bd7a9ab3..4f2effb5da3a8 100644 --- a/x-pack/legacy/plugins/task_manager/plugin.test.ts +++ b/x-pack/legacy/plugins/task_manager/plugin.test.ts @@ -42,6 +42,7 @@ describe('Task Manager Plugin', () => { expect(setupResult).toMatchInlineSnapshot(` Object { "addMiddleware": [Function], + "ensureScheduled": [Function], "fetch": [Function], "registerTaskDefinitions": [Function], "remove": [Function], diff --git a/x-pack/legacy/plugins/task_manager/plugin.ts b/x-pack/legacy/plugins/task_manager/plugin.ts index f8d95f4880c6e..3e1514bd5234f 100644 --- a/x-pack/legacy/plugins/task_manager/plugin.ts +++ b/x-pack/legacy/plugins/task_manager/plugin.ts @@ -11,6 +11,7 @@ export interface PluginSetupContract { fetch: TaskManager['fetch']; remove: TaskManager['remove']; schedule: TaskManager['schedule']; + ensureScheduled: TaskManager['ensureScheduled']; addMiddleware: TaskManager['addMiddleware']; registerTaskDefinitions: TaskManager['registerTaskDefinitions']; } @@ -59,6 +60,7 @@ export class Plugin { fetch: (...args) => taskManager.fetch(...args), remove: (...args) => taskManager.remove(...args), schedule: (...args) => taskManager.schedule(...args), + ensureScheduled: (...args) => taskManager.ensureScheduled(...args), addMiddleware: (...args) => taskManager.addMiddleware(...args), registerTaskDefinitions: (...args) => taskManager.registerTaskDefinitions(...args), }; diff --git a/x-pack/legacy/plugins/task_manager/task.ts b/x-pack/legacy/plugins/task_manager/task.ts index dd74acc2636e9..3eeb23685f377 100644 --- a/x-pack/legacy/plugins/task_manager/task.ts +++ b/x-pack/legacy/plugins/task_manager/task.ts @@ -10,6 +10,20 @@ import Joi from 'joi'; * Type definitions and validations for tasks. */ +/** + * Require + * @desc Create a Subtype of type T `T` such that the property under key `P` becomes required + * @example + * type TaskInstance = { + * id?: string; + * name: string; + * }; + * + * // This type is now defined as { id: string; name: string; } + * type TaskInstanceWithId = Require; + */ +type Require = Omit & Required>; + /** * A loosely typed definition of the elasticjs wrapper. It's beyond the scope * of this work to try to make a comprehensive type definition of this. @@ -216,6 +230,11 @@ export interface TaskInstance { ownerId?: string | null; } +/** + * A task instance that has an id. + */ +export type TaskInstanceWithId = Require; + /** * A task instance that has an id and is ready for storage. */ diff --git a/x-pack/legacy/plugins/task_manager/task_manager.mock.ts b/x-pack/legacy/plugins/task_manager/task_manager.mock.ts index 2737e83f0ba4a..515099a8bd479 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.mock.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.mock.ts @@ -10,6 +10,7 @@ const createTaskManagerMock = () => { const mocked: jest.Mocked = { registerTaskDefinitions: jest.fn(), addMiddleware: jest.fn(), + ensureScheduled: jest.fn(), schedule: jest.fn(), fetch: jest.fn(), remove: jest.fn(), diff --git a/x-pack/legacy/plugins/task_manager/task_manager.test.ts b/x-pack/legacy/plugins/task_manager/task_manager.test.ts index ca09716a38c48..d2b3cca111f5c 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.test.ts @@ -95,6 +95,85 @@ describe('TaskManager', () => { expect(savedObjectsClient.create).toHaveBeenCalled(); }); + test('allows scheduling existing tasks that may have already been scheduled', async () => { + const client = new TaskManager(taskManagerOpts); + client.registerTaskDefinitions({ + foo: { + type: 'foo', + title: 'Foo', + createTaskRunner: jest.fn(), + }, + }); + savedObjectsClient.create.mockRejectedValueOnce({ + statusCode: 409, + }); + + client.start(); + + const result = await client.ensureScheduled({ + id: 'my-foo-id', + taskType: 'foo', + params: {}, + state: {}, + }); + + expect(result.id).toEqual('my-foo-id'); + }); + + test('doesnt ignore failure to scheduling existing tasks for reasons other than already being scheduled', async () => { + const client = new TaskManager(taskManagerOpts); + client.registerTaskDefinitions({ + foo: { + type: 'foo', + title: 'Foo', + createTaskRunner: jest.fn(), + }, + }); + savedObjectsClient.create.mockRejectedValueOnce({ + statusCode: 500, + }); + + client.start(); + + return expect( + client.ensureScheduled({ + id: 'my-foo-id', + taskType: 'foo', + params: {}, + state: {}, + }) + ).rejects.toMatchObject({ + statusCode: 500, + }); + }); + + test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => { + const client = new TaskManager(taskManagerOpts); + client.registerTaskDefinitions({ + foo: { + type: 'foo', + title: 'Foo', + createTaskRunner: jest.fn(), + }, + }); + savedObjectsClient.create.mockRejectedValueOnce({ + statusCode: 409, + }); + + client.start(); + + return expect( + client.schedule({ + id: 'my-foo-id', + taskType: 'foo', + params: {}, + state: {}, + }) + ).rejects.toMatchObject({ + statusCode: 409, + }); + }); + test('allows and queues removing tasks before starting', async () => { const client = new TaskManager(taskManagerOpts); savedObjectsClient.delete.mockResolvedValueOnce({}); diff --git a/x-pack/legacy/plugins/task_manager/task_manager.ts b/x-pack/legacy/plugins/task_manager/task_manager.ts index 8092c36f371f9..383fa39515672 100644 --- a/x-pack/legacy/plugins/task_manager/task_manager.ts +++ b/x-pack/legacy/plugins/task_manager/task_manager.ts @@ -16,6 +16,7 @@ import { TaskDictionary, ConcreteTaskInstance, RunContext, + TaskInstanceWithId, TaskInstance, } from './task'; import { TaskPoller } from './task_poller'; @@ -30,6 +31,8 @@ import { } from './task_store'; import { identifyEsError } from './lib/identify_es_error'; +const VERSION_CONFLICT_STATUS = 409; + export interface TaskManagerOpts { logger: Logger; config: any; @@ -216,6 +219,26 @@ export class TaskManager { return result; } + /** + * Schedules a task with an Id + * + * @param task - The task being scheduled. + * @returns {Promise} + */ + public async ensureScheduled( + taskInstance: TaskInstanceWithId, + options?: any + ): Promise { + try { + return await this.schedule(taskInstance, options); + } catch (err) { + if (err.statusCode === VERSION_CONFLICT_STATUS) { + return taskInstance; + } + throw err; + } + } + /** * Fetches a paginatable list of scheduled tasks. * diff --git a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js index ad06fb15fd9ae..a9dfabae6d609 100644 --- a/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js +++ b/x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js @@ -32,21 +32,34 @@ export function initRoutes(server) { config: { validate: { payload: Joi.object({ - taskType: Joi.string().required(), - interval: Joi.string().optional(), - params: Joi.object().required(), - state: Joi.object().optional(), - id: Joi.string().optional(), + task: Joi.object({ + taskType: Joi.string().required(), + interval: Joi.string().optional(), + params: Joi.object().required(), + state: Joi.object().optional(), + id: Joi.string().optional() + }), + ensureScheduled: Joi.boolean() + .default(false) + .optional(), }), }, }, async handler(request) { try { - const task = await taskManager.schedule({ - ...request.payload, + const { ensureScheduled = false, task: taskFields } = request.payload; + const task = { + ...taskFields, scope: [scope], - }, { request }); - return task; + }; + + const taskResult = await ( + ensureScheduled + ? taskManager.ensureScheduled(task, { request }) + : taskManager.schedule(task, { request }) + ); + + return taskResult; } catch (err) { return err; } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index 250bdb11f6c3a..bdac03af5ae6c 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -60,7 +60,15 @@ export default function ({ getService }) { function scheduleTask(task) { return supertest.post('/api/sample_tasks') .set('kbn-xsrf', 'xxx') - .send(task) + .send({ task }) + .expect(200) + .then((response) => response.body); + } + + function scheduleTaskIfNotExists(task) { + return supertest.post('/api/sample_tasks') + .set('kbn-xsrf', 'xxx') + .send({ task, ensureScheduled: true }) .expect(200) .then((response) => response.body); } @@ -116,6 +124,24 @@ export default function ({ getService }) { expect(result.id).to.be('test-task-for-sample-task-plugin-to-test-task-manager'); }); + it('should allow a task with a given ID to be scheduled multiple times', async () => { + const result = await scheduleTaskIfNotExists({ + id: 'test-task-to-reschedule-in-task-manager', + taskType: 'sampleTask', + params: { }, + }); + + expect(result.id).to.be('test-task-to-reschedule-in-task-manager'); + + const rescheduleResult = await scheduleTaskIfNotExists({ + id: 'test-task-to-reschedule-in-task-manager', + taskType: 'sampleTask', + params: { }, + }); + + expect(rescheduleResult.id).to.be('test-task-to-reschedule-in-task-manager'); + }); + it('should reschedule if task errors', async () => { const task = await scheduleTask({ taskType: 'sampleTask',